宝安中心区范围,云南网站优化建站,做系统那个网站好,人员调动在网站上怎么做前言Saga单词翻译过来是指尤指古代挪威或冰岛讲述冒险经历和英雄业绩的长篇故事#xff0c;对#xff0c;这里强调长篇故事。许多系统都存在长时间运行的业务流程#xff0c;NServiceBus使用基于事件驱动的体系结构将容错性和可伸缩性融入这些业务处理过程中。 当然… 前言 Saga单词翻译过来是指尤指古代挪威或冰岛讲述冒险经历和英雄业绩的长篇故事对这里强调长篇故事。许多系统都存在长时间运行的业务流程NServiceBus使用基于事件驱动的体系结构将容错性和可伸缩性融入这些业务处理过程中。 当然一个单一接口调用则算不上一个长时间运行的业务场景那么如果在给定的用例中有两个或多个调用则应该考虑数据一致性的问题这里有可能第一个接口调用成功第二次调用则可能失败或者超时Saga的设计以简单而健壮的方式处理这样的业务用例。认识Saga 先来通过一段代码简单认识一下Saga在NServiceBus里使用Saga的话则需要实现抽象类SagaSqlSaga这里的T的是Saga业务实体封装数据用来在长时间运行过程中封装业务数据。public class Saga:SagaState, IAmStartedByMessagesStartOrder, IHandleMessagesCompleteOrder { protected override void ConfigureHowToFindSaga(SagaPropertyMapperState mapper) { mapper.ConfigureMappingStartOrder(messagemessage.OrderId).ToSaga(sagasaga.OrderId); mapper.ConfigureMappingCompleteOrder(messagemessage.OrderId).ToSaga(sagasaga.OrderId); } public Task Handle(StartOrder message, IMessageHandlerContext context) { return Task.CompletedTask; } public Task Handle(CompleteOrder message, IMessageHandlerContext context) { MarkAsComplete(); return Task.CompletedTask; } }临时状态 长时间运行则意味着有状态任何涉及多个网络调用的进程都需要一个临时状态这个临时状态可以存储在内存中序列化在磁盘中也可以存储在分布式缓存中。在NServiceBus中我们定义实体继承抽象类ContainSagaData即可默认情况下所有公开访问的属性都会被持久化。public class State:ContainSagaData{ public Guid OrderId { get; set; }}添加行为 在NServiceBus里处理消息的有两种接口IHandlerMessages、IAmStartedByMessages。开启一个Saga 在前面的代码片段里我们看到已经实现了接口IAmStartedByMessages这个接口告诉NServiceBus如果收到了StartOrder 消息则创建一个Saga实例Saga Instance当然Saga长流程处理的实体至少有一个需要开启Saga流程。处理无序消息 如果你的业务用例中确实存在无序消息的情况则还需要业务流程正常轮转那么则需要多个messaeg都要事先接口IAmStartedByMessages接口也就是说多个message都可以创建Saga实例。依赖可恢复性 在处理无序消息和多个消息类型的时候就存在消息丢失的可能必须在你的Saga状态完成以后这个Saga实例又收到一条消息但这时Saga状态已经是完结状态这条消息则仍然需要处理这里则实现NServiceBus的IHandleSagaNotFound接口。 public class SagaNotFoundHandler:IHandleSagaNotFound { public Task Handle(object message, IMessageProcessingContext context) { return context.Reply(new SagaNotFoundMessage()); } } public class SagaNotFoundMessage { }结束Saga 当你的业务用例不再需要Saga实例时则调用MarkComplete()来结束Saga实例。这个方法在前面的代码片段中也可以看到其实本质也就是设置Saga.Complete属性这是个bool值你在业务用例中也可以用此值来判断Saga流程是否结束。namespace NServiceBus{ using System; using System.Threading.Tasks; using Extensibility; public abstract class Saga { /// summary /// The sagas typed data. /// /summary public IContainSagaData Entity { get; set; } public bool Completed { get; private set; } internal protected abstract void ConfigureHowToFindSaga(IConfigureHowToFindSagaWithMessage sagaMessageFindingConfiguration); protected Task RequestTimeoutTTimeoutMessageType(IMessageHandlerContext context, DateTime at) where TTimeoutMessageType : new() { return RequestTimeout(context, at, new TTimeoutMessageType()); } protected Task RequestTimeoutTTimeoutMessageType(IMessageHandlerContext context, DateTime at, TTimeoutMessageType timeoutMessage) { if (at.Kind DateTimeKind.Unspecified) { throw new InvalidOperationException(Kind property of DateTime at must be specified.); } VerifySagaCanHandleTimeout(timeoutMessage); var options new SendOptions(); options.DoNotDeliverBefore(at); options.RouteToThisEndpoint(); SetTimeoutHeaders(options); return context.Send(timeoutMessage, options); } protected Task RequestTimeoutTTimeoutMessageType(IMessageHandlerContext context, TimeSpan within) where TTimeoutMessageType : new() { return RequestTimeout(context, within, new TTimeoutMessageType()); } protected Task RequestTimeoutTTimeoutMessageType(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage) { VerifySagaCanHandleTimeout(timeoutMessage); var sendOptions new SendOptions(); sendOptions.DelayDeliveryWith(within); sendOptions.RouteToThisEndpoint(); SetTimeoutHeaders(sendOptions); return context.Send(timeoutMessage, sendOptions); } protected Task ReplyToOriginator(IMessageHandlerContext context, object message) { if (string.IsNullOrEmpty(Entity.Originator)) { throw new Exception(Entity.Originator cannot be null. Perhaps the sender is a SendOnly endpoint.); } var options new ReplyOptions(); options.SetDestination(Entity.Originator); context.Extensions.Set(new AttachCorrelationIdBehavior.State { CustomCorrelationId Entity.OriginalMessageId }); options.Context.Set(new PopulateAutoCorrelationHeadersForRepliesBehavior.State { SagaTypeToUse null, SagaIdToUse null }); return context.Reply(message, options); } //这个方法结束saga流程标记Completed属性 protected void MarkAsComplete() { Completed true; } void VerifySagaCanHandleTimeoutTTimeoutMessageType(TTimeoutMessageType timeoutMessage) { var canHandleTimeoutMessage this is IHandleTimeoutsTTimeoutMessageType; if (!canHandleTimeoutMessage) { var message $The type {GetType().Name} cannot request timeouts for {timeoutMessage} because it does not implement IHandleTimeouts{typeof(TTimeoutMessageType).FullName}; throw new Exception(message); } } void SetTimeoutHeaders(ExtendableOptions options) { options.SetHeader(Headers.SagaId, Entity.Id.ToString()); options.SetHeader(Headers.IsSagaTimeoutMessage, bool.TrueString); options.SetHeader(Headers.SagaType, GetType().AssemblyQualifiedName); } }} Saga持久化 本机开发环境我们使用LearningPersistence但是投产的话则需要使用数据库持久化这里我们基于MySQLSQL持久化需要引入NServiceBus.Persistence.Sql。SQL Persistence会生成几种关系型数据库的sql scripts然后会根据你的断言配置选择所需数据库比如SQL Server、MySQL、PostgreSQL、Oracle。 持久化Saga自动创建所需表结构你只需手动配置即可配置后编译成功后项目执行目录下会生成sql脚本文件夹名称是NServiceBus.Persistence.Sql下面会有Saga子目录。/* TableNameVariable */set tableNameQuoted concat(, tablePrefix, Saga);set tableNameNonQuoted concat(tablePrefix, Saga);/* Initialize */drop procedure if exists sqlpersistence_raiseerror;create procedure sqlpersistence_raiseerror(message varchar(256))beginsignal sqlstate ERRORset message_text message, mysql_errno 45000;end;/* CreateTable */set createTable concat( create table if not exists , tableNameQuoted, ( Id varchar(38) not null, Metadata json not null, Data json not null, PersistenceVersion varchar(23) not null, SagaTypeVersion varchar(23) not null, Concurrency int not null, primary key (Id) ) default charsetascii;);prepare script from createTable;execute script;deallocate prepare script;/* AddProperty OrderId */select count(*)into existfrom information_schema.columnswhere table_schema database() and column_name Correlation_OrderId and table_name tableNameNonQuoted;set query IF( exist 0, concat(alter table , tableNameQuoted, add column Correlation_OrderId varchar(38) character set ascii), select \Column Exists\ status);prepare script from query;execute script;deallocate prepare script;/* VerifyColumnType Guid */set column_type_OrderId ( select concat(column_type, character set , character_set_name) from information_schema.columns where table_schema database() and table_name tableNameNonQuoted and column_name Correlation_OrderId);set query IF( column_type_OrderId varchar(38) character set ascii, call sqlpersistence_raiseerror(concat(\Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \, column_type_OrderId, \.\));, select \Column Type OK\ status);prepare script from query;execute script;deallocate prepare script;/* WriteCreateIndex OrderId */select count(*)into existfrom information_schema.statisticswhere table_schema database() and index_name Index_Correlation_OrderId and table_name tableNameNonQuoted;set query IF( exist 0, concat(create unique index Index_Correlation_OrderId on , tableNameQuoted, (Correlation_OrderId)), select \Index Exists\ status);prepare script from query;execute script;deallocate prepare script;/* PurgeObsoleteIndex */select concat(drop index , index_name, on , tableNameQuoted, ;)from information_schema.statisticswhere table_schema database() and table_name tableNameNonQuoted and index_name like Index_Correlation_% and index_name Index_Correlation_OrderId and table_schema database()into dropIndexQuery;select if ( dropIndexQuery is not null, dropIndexQuery, select no index to delete;) into dropIndexQuery;prepare script from dropIndexQuery;execute script;deallocate prepare script;/* PurgeObsoleteProperties */select concat(alter table , table_name, drop column , column_name, ;)from information_schema.columnswhere table_schema database() and table_name tableNameNonQuoted and column_name like Correlation_% and column_name Correlation_OrderIdinto dropPropertiesQuery;select if ( dropPropertiesQuery is not null, dropPropertiesQuery, select no property to delete;) into dropPropertiesQuery;prepare script from dropPropertiesQuery;execute script;deallocate prepare script;/* CompleteSagaScript */生成的表结构持久化配置 Saga持久化需要依赖NServiceBus.Persistence.Sql。引入后需要实现SqlSaga抽象类抽象类需要重写ConfigureMapping配置Saga工作流程业务主键。public class Saga:SqlSagaState, IAmStartedByMessagesStartOrder{ protected override void ConfigureMapping(IMessagePropertyMapper mapper) { mapper.ConfigureMappingStartOrder(messagemessage.OrderId); } protected override string CorrelationPropertyName nameof(StartOrder.OrderId); public Task Handle(StartOrder message, IMessageHandlerContext context) { Console.WriteLine($Receive message with OrderId:{message.OrderId}); MarkAsComplete(); return Task.CompletedTask; } } static async Task MainAsync() { Console.Title Client-UI; var configuration new EndpointConfiguration(Client-UI); //这个方法开启自动建表、自动创建RabbitMQ队列 configuration.EnableInstallers(); configuration.UseSerializationNewtonsoftSerializer(); configuration.UseTransportLearningTransport(); string connectionString server127.0.0.1;uidroot;pwd000000;databasenservicebus;port3306;AllowUserVariablesTrue;AutoEnlistfalse; var persistence configuration.UsePersistenceSqlPersistence(); persistence.SqlDialectSqlDialect.MySql(); //配置mysql连接串 persistence.ConnectionBuilder(()new MySqlConnection(connectionString)); var instance await Endpoint.Start(configuration).ConfigureAwait(false); var command new StartOrder() { OrderId Guid.NewGuid() }; await instance.SendLocal(command).ConfigureAwait(false); Console.ReadKey(); await instance.Stop().ConfigureAwait(false); } Saga Timeouts 在消息驱动类型的环境中虽然传递的无连接特性可以防止在线等待过程中消耗资源但是毕竟等待时间需要有一个上线。在NServiceBus里已经提供了Timeout方法我们只需订阅即可可以在你的Handle方法中根据需要订阅Timeout可参考如下代码public class Saga:SagaState, IAmStartedByMessagesStartOrder, IHandleMessagesCompleteOrder, IHandleTimeoutsTimeOutMessage { public Task Handle(StartOrder message, IMessageHandlerContext context) { var modelnew TimeOutMessage(); //订阅超时消息 return RequestTimeout(context,TimeSpan.FromMinutes(10)); } public Task Handle(CompleteOrder message, IMessageHandlerContext context) { MarkAsComplete(); return Task.CompletedTask; } protected override string CorrelationPropertyName nameof(StartOrder.OrderId); public Task Timeout(TimeOutMessage state, IMessageHandlerContext context) { //处理超时消息 } protected override void ConfigureHowToFindSaga(SagaPropertyMapperState mapper) { mapper.ConfigureMappingStartOrder(messagemessage.OrderId).ToSaga(sagasaga.OrderId); mapper.ConfigureMappingCompleteOrder(messagemessage.OrderId).ToSaga(sagasaga.OrderId); } }//从Timeout的源码看这个方法是通过设置SendOptions然后再把当前这个消息发送给自己来实现protected Task RequestTimeoutTTimeoutMessageType(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage) { VerifySagaCanHandleTimeout(timeoutMessage); var sendOptions new SendOptions(); sendOptions.DelayDeliveryWith(within); sendOptions.RouteToThisEndpoint(); SetTimeoutHeaders(sendOptions); return context.Send(timeoutMessage, sendOptions); }总结 NServiceBus因为是商业产品对分布式消息系统所涉及到的东西都做了实现包括分布式事务Outbox、DTC都有还有心跳检测监控都有全而大目前我们用到的也只是NServiceBus里很小的一部分功能。