MassTransit 探究初步
1. 背景
作为SOA基礎設施,企業服務總線(ESB)是一個具有高分布性、事件驅動服務的SOA架構,是當前企業集成的主流框架。
2. 簡介
網站:http://go.rritw.com/masstransit-project.com/
MassTransit (MT) is a framework forcreating distributed applications on the .Net platform. MT provides the abilityto subscribe to messages by type and then connect different processing nodesthough message subscriptions building a cohesive mesh of services.
主要特性:
l Bus architecture
l Sagas
l Exception management
l Transactions
l Serialization
l Headers
l Consumer lifecycle
l Built on top of Rabbit Mq
l IOC support
授權:
基於Apache 2.0,可以使用在任何環境中。
3. 架構
MassTransit在消息隊列(MQ)之上構建了消息總線機制,封裝了對消息隊列的操作,以及其它的組件,比如序列化、日志、Saga、持久化等。
下圖为傳入消息處理管道模型:
4. 整體分析
MassTransit的目標是作为消息機制的抽象框架,因此,它本身並不具體實現MQ,而是通過集成其它MQ產品來作为其通信層。目前官方已集成的MQ產品有MSMQ、RabbitMQ。其它非官方補充了ActiveMQ。
MassTransit在MQ之上添加了Sagas、多線程、異常處理、事務、序列化、消息頭(Header)、消息使用者生命周期管理、路由、Rx(Reactive Extension 反應式擴展)集成、NHibernate集成、調試、跟蹤、日志輸出、加密、定時服務等。
5. 特性分析
5.1. 聲明式配置
MassTransit本身使用了許多優秀的設計,比如對MSMQ、RabbitMQ的使用,通過在Bus構造配置中調用UseMSMQ()或UseRabbitMQ()來聲明式的决定。
5.2. 通過擴展方法隱藏具體實現
通過分析源碼,可以發現UseXXX函數是通過定義在MassTransit.Transports.MSMQ.dll和MassTransit.Transports.RabbitMq.dll中的擴展方法實現的,這样當用戶引用相應的DLL時,方法調用才會開放给用戶。即用戶選擇具體使用哪種Transport時,是通過引用相應的DLL來决定的。DLL引用入項目後,具體MQ產品特定的配置,函數等方法也附加到了基類對象中。這样可以避免在基類中定義大量子類特定接口
MassTransit所有擴展組件均是采用這種方法,比如日志(Logging)組件。
6. 入門
首先需要定義消息:
public class YourMessage { public string Text { get; set; } }
消息總線的創建可以通過Bus類靜態方法Initialize進行,傳入相應的配置方法,如下所示:
Bus.Initialize(sbc =>
{
sbc.UseMsmq();
sbc.VerifyMsmqConfiguration();
sbc.UseMulticastSubscriptionClient();
sbc.ReceiveFrom("msmq://localhost/test");
sbc.Subscribe(subs =>
{
subs.Handler
});
});
初始化好Bus之後,就可以調用Publish方法發布消息了:
Bus.Instance.Publish(new YourMessage { Text = "Hi" });
在初始化中,通過sbc.Subscribe函數注冊了一個Handler,其作用是在控制台中輸出消息文本。
創建一個控制台應用程序,在main函數中完成初始化和發布,可以在隨後的輸出中看到消息文本。
7. 高級功能使用
7.1. Subscription Service
参考: http://go.rritw.com/docs.masstransit-project.com/en/latest/overview/subscriptions.html
如果使用的消息隊列不提供訂閱共享功能,比如MSMQ,可以使用MT的SubscriptionService來實現此功能。在這種情況下,訂閱信息的協調功能由一個中心管理器來完成。這個中心管理器就是運行在網路中的SubscriptionService的一個實例。每個消息總線(Bus)實例通過SubscriptionClient與其進行通信並交換訂閱信息。
示例:
創建Subscription Service:
var subscriptionBus = ServiceBusFactory.New(sbc =>
{
sbc.UseStomp();
sbc.SetConcurrentConsumerLimit(1);
sbc.ReceiveFrom("stomp://localhost/mt_subscriptions");
});
var subscriptionSagas = new InMemorySagaRepository();
var subscriptionClientSagas = new InMemorySagaRepository();
var subscriptionService = new SubscriptionService(subscriptionBus, subscriptionSagas, subscriptionClientSagas);
創建Time Out Service:
var timeoutBus = ServiceBusFactory.New(sbc =>
{
sbc.UseStomp();
sbc.UseControlBus();
sbc.ReceiveFrom("stomp://localhost/mt_timeouts");
sbc.UseSubscriptionService("stomp://localhost/mt_subscriptions");
});
var timeoutService = new TimeoutService(timeoutBus, new InMemorySagaRepository
timeoutService.Start();
創建應用Bus:
var bus = ServiceBusFactory.New(sbc
{
sbc.UseStomp();
sbc.UseControlBus();
sbc.ReceiveFrom("stomp://localhost/your_awesome_application");
sbc.UseSubscriptionService("stomp://localhost/mt_subscriptions");
});
說明:
當調用UseSubscriptionService時,隱式附加了一個SubscriptionClient到bus中。首先,SubscriptionClient會發送一個AddSubscriptionClient消息给SubscriptionService隊列,然後開始監聽訂閱變更,隨後發送AddScription/RemoveSubscription消息。通過這種方式,所有的更新將傳播到應用中所有其它節點中。
7.2. Request/Response
参考: http://go.rritw.com/docs.masstransit-project.com/en/latest/overview/request.html
通常使用Message Bus是一個應用發送一個請求(Request),另一個應用接收到這個請求後做出一些回應(Response)。比如工資進程請求稅務進程執行某種所得稅,在計算完成後,返回結果。
示例:
消息定義:
請求和響應通過CorrelationId來進行關聯。
public class BasicRequest : CorrelatedBy
{
public Guid CorrelationId { get;set; }
public string Text { get; set; }
}
public class BasicResponse : CorrelatedBy
{
public Guid CorrelationId { get; set; }
public string Text { get; set; }
}
響應者:
簡單的在請求信息上添加一個RESP前綴作为響應。注意BasicResponse.CorrelationId並沒有設置,這個是由MT自動設置的。
public class Program
{
public static void Main()
{
Bus.Initialize(sbc =>
{
sbc.UseMsmq();
sbc.VerifyMsmqConfiguration();
sbc.UseMulticastSubscriptionClient();
sbc.ReceiveFrom("msmq://localhost/message_responder");
sbc.Subscribe(subs=>
{
subs.Handler
{
cxt.Respond(new BasicResponse{Text = "RESP"+msg.Text});
});
});
});
}
}
請求者:
提交請求並處理與原始請求相關聯(通過CorrelationId)的響應,處理後取消對響應的訂閱並結束這次請求。
public class Program
{
public static void Main()
{
Bus.Initialize(sbc =>
{
sbc.UseMsmq();
sbc.VerifyMsmqConfiguration();
sbc.UseMulticastSubscriptionClient();
sbc.ReceiveFrom("msmq://localhost/message_requestor");
});
Bus.Instance.PublishRequest(new BasicRequest(), x =>
{
x.Handle
x.SetTimeout(30.Seconds());
});
}
}
上述代碼會阻塞調用線程直到響應返回或超時。超時後會拋出RequestTimeoutException。如果響應處理發生異常,該異常會重新拋出给調用請求發出線程。
如果想使用異步處理,可以調用BeginPublishRequest,在處理結束後,調用EndRequest方法完成處理。
7.3. Sagas
参考: http://go.rritw.com/docs.masstransit-project.com/en/latest/overview/saga.html
一個Saga是指由協調器(coordinator)管理的一個長事務。Sagas通常由一個事件(event)启動,Sagas對事件進行編排,並維護所有事務的狀態。
7.3.1. 定義Sagas
在MT中有兩種方法定義Saga:一種是用接口和類直接定義初始化、協調交互的各種消息,或可以被Saga示例觀察到的各種消息;另一種是通過在類裏面定義事件、狀態、行为來定義一個狀態機來實現。
7.3.2. 通過狀態機實現Sagas
要通過狀態機實現Sagas,必須從SagaStateMachine類繼承:
public class AuctionSaga :
SagaStateMachine
ISaga
{
static CombineSaga()
{
Define(() =>
{
// the state machine behavior is defined here
});
}
public Guid CorrelationId { get; set; }
public IServiceBus Bus { get; set; }
}
然後定義各種狀態:
public static State Initial { get; set; }
public static State Completed { get; set; }
public static State Open { get; set; }
public static State Closed { get; set; }
定義與狀態關聯的事件:
public static Event
public static Event
定義消息:
public interface CreateAuction :
CorrelatedBy
{
string Title { get; }
string OwnerEmail { get; }
decimal OpeningBid { get; }
}
public interface PlaceBid
{
Guid BidId { get; }
Guid AuctionId { get; }
decimal MaximumBid { get; }
string BidderEmail { get; }
}
定義狀態轉換:
static AuctionSaga()
{
Define(() =>
{
Initially(
When(Create));
During(Open,
When(Bid));
});
}
定義事件發生時的行为:
static AuctionSaga()
{
Define(() =>
{
Initially(
When(Create)
.Then((saga,message) =>
{
saga.OpeningBid = message.OpeningBid;
saga.OwnerEmail = message.OwnerEmail;
saga.Title = message.Title;
})
.TransitionTo(Open));
});
}
//
public decimal OpeningBid { get; set; }
public string OwnerEmail { get; set; }
public string Title { get; set; }
完整的定義:
static SupervisorSaga()
{
Define(() =>
{
Initially(
When(Create)
.Then((saga,message) =>
{
saga.PostalCode = message.PostalCode;
})
.Publish((saga,message) => new RequestPostalCodeDetails(saga.PostalCode))
.Publish((saga,message) => new RequestGeolocation(saga.PostalCode))
.TransitionTo(Waiting));
During(Waiting,
When(PostalCodeDetailsReceived)
.Then((saga,message) =>
{
saga.City = message.City;
saga.State = message.State;
}),
When(GeolocationReceived)
.Then((saga,message) =>
{
saga.Latitude = message.Latitude;
saga.Longitude = message.Longitude;
}));
Combine(PostalCodeDetailsReceived, GeolocationReceived)
.Into(ReadyToProceed, saga => saga.ReadyFlags);
During(Waiting,
When(ReadyToProceed)
.Then((saga,message) =>
{
saga.Bus.Publish(new PostalCodeDetails(...));
})
.Complete());
});
}
//
public int ReadyFlags { get; set; }
public static Event
public static Event
public static Event
public static Event ReadyToProceed { get; set; }
當Sagas定義好後,就可以在Bus中進行訂閱:
public class Program
{
public static void Main()
{
Bus.Initialize(sbc =>
{
sbc.ReceiveFrom("loopback://localhost/my_saga_bus");
sbc.Subscribe(subs =>
{
subs.Saga
.Permanent();
});
});
}
}
本文来源:https://www.2haoxitong.net/k/doc/52dd01f7da38376baf1faefc.html
文档为doc格式