MassTransit 探究初步(繁体)

发布时间:2014-02-11 17:39:36   来源:文档文库   
字号:

MassTransit 探究初步

1.  背景

作为SOA基礎設施,企業服務總線(ESB)是一個具有高分布性、事件驅動服務的SOA架構,是當前企業集成的主流框架。

2.  簡介

網站:http://go.rritw.com/masstransit-project.com/

 

MassTransit (MT) is a framework forcreating distributed applications on the .Net platform. MT provides the abilityto subscribe to messages by type and then connect different processing nodesthough message subscriptions building a cohesive mesh of services.

 

主要特性:

l  Bus architecture

l  Sagas

l  Exception management

l  Transactions

l  Serialization

l  Headers

l  Consumer lifecycle

l  Built on top of Rabbit Mq

l  IOC support

 

授權:

基於Apache 2.0,可以使用在任何環境中。

3.  架構

MassTransit在消息隊列(MQ)之上構建了消息總線機制,封裝了對消息隊列的操作,以及其它的組件,比如序列化、日志、Saga、持久化等。

 

下圖为傳入消息處理管道模型:

 

4.  整體分析

MassTransit的目標是作为消息機制的抽象框架,因此,它本身並不具體實現MQ,而是通過集成其它MQ產品來作为其通信層。目前官方已集成的MQ產品有MSMQRabbitMQ。其它非官方補充了ActiveMQ

MassTransitMQ之上添加了Sagas多線程、異常處理、事務、序列化、消息頭(Header)、消息使用者生命周期管理、路由、RxReactive Extension 反應式擴展)集成、NHibernate集成、調試、跟蹤、日志輸出、加密、定時服務等。

5.  特性分析

5.1.  聲明式配置

MassTransit本身使用了許多優秀的設計,比如對MSMQRabbitMQ的使用,通過在Bus構造配置中調用UseMSMQ()UseRabbitMQ()來聲明式的决定。

5.2.  通過擴展方法隱藏具體實現

通過分析源碼,可以發現UseXXX函數是通過定義在MassTransit.Transports.MSMQ.dllMassTransit.Transports.RabbitMq.dll中的擴展方法實現的,這样當用戶引用相應的DLL時,方法調用才會開放给用戶。即用戶選擇具體使用哪種Transport時,是通過引用相應的DLL來决定的。DLL引用入項目後,具體MQ產品特定的配置,函數等方法也附加到了基類對象中。這样可以避免在基類中定義大量子類特定接口

MassTransit所有擴展組件均是采用這種方法,比如日志(Logging)組件。

6.  入門

首先需要定義消息:

public class YourMessage { public string Text { get; set; } }

消息總線的創建可以通過Bus類靜態方法Initialize進行,傳入相應的配置方法,如下所示:

 

Bus.Initialize(sbc =>

{

sbc.UseMsmq();

sbc.VerifyMsmqConfiguration();

sbc.UseMulticastSubscriptionClient();

sbc.ReceiveFrom("msmq://localhost/test");

sbc.Subscribe(subs =>

{

subs.Handler(msg => Console.WriteLine("From bus 1: " + msg.Text));

});

});

初始化好Bus之後,就可以調用Publish方法發布消息了:

Bus.Instance.Publish(new YourMessage { Text = "Hi" });

在初始化中,通過sbc.Subscribe函數注冊了一個Handler,其作用是在控制台中輸出消息文本。

創建一個控制台應用程序,在main函數中完成初始化和發布,可以在隨後的輸出中看到消息文本。

7.  高級功能使用

7.1.  Subscription Service

参考 http://go.rritw.com/docs.masstransit-project.com/en/latest/overview/subscriptions.html

 

如果使用的消息隊列不提供訂閱共享功能,比如MSMQ,可以使用MTSubscriptionService來實現此功能。在這種情況下,訂閱信息的協調功能由一個中心管理器來完成。這個中心管理器就是運行在網路中的SubscriptionService的一個實例。每個消息總線(Bus)實例通過SubscriptionClient與其進行通信並交換訂閱信息。

 

示例

 

創建Subscription Service

var subscriptionBus = ServiceBusFactory.New(sbc =>

{

sbc.UseStomp();

sbc.SetConcurrentConsumerLimit(1);

sbc.ReceiveFrom("stomp://localhost/mt_subscriptions");

});

var subscriptionSagas = new InMemorySagaRepository();

var subscriptionClientSagas = new InMemorySagaRepository();

var subscriptionService = new SubscriptionService(subscriptionBus, subscriptionSagas, subscriptionClientSagas);

創建Time Out Service:

var timeoutBus = ServiceBusFactory.New(sbc =>

{

sbc.UseStomp();

sbc.UseControlBus();

sbc.ReceiveFrom("stomp://localhost/mt_timeouts");

sbc.UseSubscriptionService("stomp://localhost/mt_subscriptions");

});

var timeoutService = new TimeoutService(timeoutBus, new InMemorySagaRepository());

timeoutService.Start();

創建應用Bus

var bus = ServiceBusFactory.New(sbc

{

sbc.UseStomp();

sbc.UseControlBus();

sbc.ReceiveFrom("stomp://localhost/your_awesome_application");

sbc.UseSubscriptionService("stomp://localhost/mt_subscriptions");

});


說明:

當調用UseSubscriptionService時,隱式附加了一個SubscriptionClientbus中。首先,SubscriptionClient會發送一個AddSubscriptionClient消息给SubscriptionService隊列,然後開始監聽訂閱變更,隨後發送AddScription/RemoveSubscription消息。通過這種方式,所有的更新將傳播到應用中所有其它節點中。

 

7.2.  Request/Response

参考 http://go.rritw.com/docs.masstransit-project.com/en/latest/overview/request.html

 

通常使用Message Bus是一個應用發送一個請求(Request),另一個應用接收到這個請求後做出一些回應(Response)。比如工資進程請求稅務進程執行某種所得稅,在計算完成後,返回結果。

 

示例

 

消息定義:

請求和響應通過CorrelationId來進行關聯。

public class BasicRequest : CorrelatedBy

{

public Guid CorrelationId { get;set; }

public string Text { get; set; }

}

public class BasicResponse : CorrelatedBy

{

public Guid CorrelationId { get; set; }

public string Text { get; set; }

}

響應者:

簡單的在請求信息上添加一個RESP前綴作为響應。注意BasicResponse.CorrelationId並沒有設置,這個是由MT自動設置的。

public class Program

{

public static void Main()

{

Bus.Initialize(sbc =>

{

sbc.UseMsmq();

sbc.VerifyMsmqConfiguration();

sbc.UseMulticastSubscriptionClient();

sbc.ReceiveFrom("msmq://localhost/message_responder");

sbc.Subscribe(subs=>

{

subs.Handler( (cxt, msg )=>

{

cxt.Respond(new BasicResponse{Text = "RESP"+msg.Text});

});

});

});

}

}

請求者:

提交請求並處理與原始請求相關聯(通過CorrelationId)的響應,處理後取消對響應的訂閱並結束這次請求。

public class Program

{

public static void Main()

{

Bus.Initialize(sbc =>

{

sbc.UseMsmq();

sbc.VerifyMsmqConfiguration();

sbc.UseMulticastSubscriptionClient();

sbc.ReceiveFrom("msmq://localhost/message_requestor");

});

Bus.Instance.PublishRequest(new BasicRequest(), x =>

{

x.Handle(message => Console.WriteLine(message.Text));

x.SetTimeout(30.Seconds());

});

}

}

上述代碼會阻塞調用線程直到響應返回或超時。超時後會拋出RequestTimeoutException。如果響應處理發生異常,該異常會重新拋出给調用請求發出線程。

如果想使用異步處理,可以調用BeginPublishRequest,在處理結束後,調用EndRequest方法完成處理。

 

 

7.3.  Sagas

参考 http://go.rritw.com/docs.masstransit-project.com/en/latest/overview/saga.html

 

一個Saga是指由協調器(coordinator)管理的一個長事務。Sagas通常由一個事件(event)启動,Sagas對事件進行編排,並維護所有事務的狀態。

 

7.3.1.  定義Sagas

MT中有兩種方法定義Saga:一種是用接口和類直接定義初始化、協調交互的各種消息,或可以被Saga示例觀察到的各種消息;另一種是通過在類裏面定義事件、狀態、行为來定義一個狀態機來實現。

 

7.3.2.  通過狀態機實現Sagas

要通過狀態機實現Sagas,必須從SagaStateMachine類繼承:

public class AuctionSaga :

SagaStateMachine,

ISaga

{

static CombineSaga()

{

Define(() =>

{

// the state machine behavior is defined here

});

}

public Guid CorrelationId { get; set; }

public IServiceBus Bus { get; set; }

}

然後定義各種狀態:

public static State Initial { get; set; }

public static State Completed { get; set; }

public static State Open { get; set; }

public static State Closed { get; set; }

定義與狀態關聯的事件:

public static Event Create { get; set; }

public static Event Bid { get; set; }

定義消息:

public interface CreateAuction :

CorrelatedBy

{

string Title { get; }

string OwnerEmail { get; }

decimal OpeningBid { get; }

}

public interface PlaceBid

{

Guid BidId { get; }

Guid AuctionId { get; }

decimal MaximumBid { get; }

string BidderEmail { get; }

}

定義狀態轉換:

static AuctionSaga()

{

Define(() =>

{

Initially(

When(Create));

During(Open,

When(Bid));

});

}


定義事件發生時的行为:

static AuctionSaga()

{

Define(() =>

{

Initially(

When(Create)

.Then((saga,message) =>

{

saga.OpeningBid = message.OpeningBid;

saga.OwnerEmail = message.OwnerEmail;

saga.Title = message.Title;

})

.TransitionTo(Open));

});

}

//

public decimal OpeningBid { get; set; }

public string OwnerEmail { get; set; }

public string Title { get; set; }


完整的定義:

static SupervisorSaga()

{

Define(() =>

{

Initially(

When(Create)

.Then((saga,message) =>

{

saga.PostalCode = message.PostalCode;

})

.Publish((saga,message) => new RequestPostalCodeDetails(saga.PostalCode))

.Publish((saga,message) => new RequestGeolocation(saga.PostalCode))

.TransitionTo(Waiting));

During(Waiting,

When(PostalCodeDetailsReceived)

.Then((saga,message) =>

{

saga.City = message.City;

saga.State = message.State;

}),

When(GeolocationReceived)

.Then((saga,message) =>

{

saga.Latitude = message.Latitude;

saga.Longitude = message.Longitude;

}));

Combine(PostalCodeDetailsReceived, GeolocationReceived)

.Into(ReadyToProceed, saga => saga.ReadyFlags);

During(Waiting,

When(ReadyToProceed)

.Then((saga,message) =>

{

saga.Bus.Publish(new PostalCodeDetails(...));

})

.Complete());

});

}

//

public int ReadyFlags { get; set; }

public static Event Create { get; set; }

public static Event PostalCodeDetailsReceived { get; set; }

public static Event GeolocationReceived { get; set; }

public static Event ReadyToProceed { get; set; }


Sagas定義好後,就可以在Bus中進行訂閱:

public class Program

{

public static void Main()

{

Bus.Initialize(sbc =>

{

sbc.ReceiveFrom("loopback://localhost/my_saga_bus");

sbc.Subscribe(subs =>

{

subs.Saga(new InMemorySagaRepository())

.Permanent();

});

});

}

}

本文来源:https://www.2haoxitong.net/k/doc/52dd01f7da38376baf1faefc.html

《MassTransit 探究初步(繁体).doc》
将本文的Word文档下载到电脑,方便收藏和打印
推荐度:
点击下载文档

文档为doc格式