文字排版网站,广州番禺邮政编码,山西钢建公司简介,网站紧急维护目录(结尾附加项目代码资源地址)
引言#xff1a;
1. SAGA事务模式
2. 拆分为子事务
3. 失败回滚
4. 如何做补偿
4.1 失败的分支是否需要补偿
5. 异常
6. 异常与子事务屏障
6.1 NPC的挑战
6.2 现有方案的问题
6.3 子事务屏障
6.4 原理
7. 更多高级场景
7.1 部分…目录(结尾附加项目代码资源地址)
引言
1. SAGA事务模式
2. 拆分为子事务
3. 失败回滚
4. 如何做补偿
4.1 失败的分支是否需要补偿
5. 异常
6. 异常与子事务屏障
6.1 NPC的挑战
6.2 现有方案的问题
6.3 子事务屏障
6.4 原理
7. 更多高级场景
7.1 部分第三方操作无法回滚(go语言写了一点不要在意这些细节下面开始 .NET)
7.2 超时回滚
8.0 .NET CORE结合DTM实现Saga(C#启动)
8.1 准备工作(和前两期的注册环节、数据库差不多的操作看过前两期的小伙伴可跳过8.1阶段)
8.1.1 Nuget引入Dtmcli
8.1.2 生成转账数据库(EF_CORE)
8.1.3 DbContext
8.1.4 数据库持久化
8.1.5 数据库最终生成
8.1.6 appsettings.json
8.1.7 Program.cs
8.2 主程序事务API控制器
8.3 用户1转账事务API控制器
8.4 用户2转账事务API控制器
9. 开始运行
9.1 先给A和B两位用户各1000块钱。
9.2 执行转账
10. 并发下执行Saga分布式事务
10.1 Program.cs代码修改
10.2 用户1转账事务API控制器代码修改
10.3 用户2转账事务API控制器代码修改
10.4 Redis启动
小结 引言
紧接前两期 .NET CORE 分布式事务(一) DTM实现二阶段提交(.NET CORE 分布式事务(一) DTM实现二阶段提交-CSDN博客) .NET CORE 分布式事务(二) DTM实现TCC(.NET CORE 分布式事务(二) DTM实现TCC-CSDN博客) 本期讲解Saga分布式事务并探讨如何在高并发下使用Saga分布式事务。
1. SAGA事务模式
SAGA事务模式是DTM中最常用的模式主要是因为SAGA模式简单易用工作量少并且能够解决绝大部分业务的需求。SAGA最初出现在1987年Hector Garcaa-Molrna Kenneth Salem发表的论文SAGAS里。其核心思想是将长事务拆分为多个短事务由Saga事务协调器协调如果每个短事务都成功提交完成那么全局事务就正常完成如果某个步骤失败则根据相反顺序一次调用补偿操作。
2. 拆分为子事务
例如我们要进行一个类似于银行跨行转账的业务将A中的30元转给B根据Saga事务的原理我们将整个全局事务切分为以下服务
转出TransOut服务这里转出将会进行操作A-30转出补偿TransOutCompensate服务回滚上面的转出操作即A30转入TransIn服务转入将会进行B30转入补偿TransInCompensate服务回滚上面的转入操作即B-30
整个SAGA事务的逻辑是
执行转出成功执行转入成功全局事务完成
如果在中间发生错误例如转入B发生错误则会调用已执行分支的补偿操作即
执行转出成功执行转入失败执行转入补偿成功执行转出补偿成功全局事务回滚完成
下面我们看一个成功完成的SAGA事务典型的时序图 在这个图中我们的全局事务发起人将整个全局事务的编排信息包括每个步骤的正向操作和反向补偿操作定义好之后提交给服务器服务器就会按步骤执行前面SAGA的逻辑。
3. 失败回滚
如果有正向操作失败例如账户余额不足或者账户被冻结那么dtm会调用各分支的补偿操作进行回滚最后事务成功回滚。失败的时序图如下 补偿执行顺序
dtm的SAGA事务在1.10.0及之前补偿操作是并发执行的1.10.1之后是根据用户指定的分支顺序进行回滚的。
如果是普通SAGA没有打开并发选项那么SAGA事务的补偿分支是完全按照正向分支的反向顺序进行补偿的。
如果是并发SAGA补偿分支也会并发执行补偿分支的执行顺序与指定的正向分支顺序相反。假如并发SAGA指定A分支之后才能执行B那么进行并发补偿时DTM保证A的补偿操作在B的补偿操作之后执行
4. 如何做补偿
当SAGA对分支A进行失败补偿时A的正向操作可能1. 已执行2. 未执行3. 甚至有可能处于执行中最终执行成功或者失败是未知的。那么对A进行补偿时要妥善处理好这三种情况难度很大。
dtm提供了子事务屏障技术自动处理上述三种情况开发人员只需要编写好针对1的补偿操作情况即可相关工作大幅简化详细原理参见下面的异常章节。
4.1 失败的分支是否需要补偿
dtm 常被问到的一个问题是TransIn返回失败那么这个时候是否还需要调用TransIn的补偿操作DTM 的做法是统一进行一次调用这种的设计考虑点如下
XA, TCC 等事务模式是必须要的SAGA 为了保持简单和统一设计为总是调用补偿DTM 支持单服务多数据源可能出现数据源1成功数据源2失败这种情况下需要确保补偿被调用数据源1的补偿被执行DTM 提供的子事务屏障自动处理了补偿操作中的各种情况用户只需要执行与正向操作完全相反的补偿即可
5. 异常
在事务领域异常是需要重点考虑的问题例如宕机失败进程crash都有可能导致不一致。当我们面对分布式事务那么分布式中的异常出现更加频繁对于异常的设计和处理更是重中之重。
我们将异常分为以下几类
偶发失败 在微服务领域由于网络抖动、机器宕机、进程Crash会导致微小比例的请求失败。这类问题的解决方案是重试第二次进行重试就能够成功因此微服务框架或者网关类的产品都会支持重试例如配置重试3次每次间隔2s。DTM的设计对重试非常友好应当支持幂等的各个接口都已支持幂等不会发生因为重试导致事务bug的情况故障宕机 大量公司内部都有复杂的多项业务这些业务中偶尔有一两个非核心业务故障也是常态。DTM也考虑了这样的情况在重试方面做了指数退避算法如果遇见了故障宕机情况那么指数退避可以避免大量请求不断发往故障应用避免雪崩。网络乱序 分布式系统中网络延时是难以避免的所以会发生一些乱序的情况例如转账的例子中可能发生服务器先收到撤销转账的请求再收到转账请求。这类的问题是分布式事务中的一个重点难点问题。
业务上的失败与异常是需要做严格区分的例如前面的余额不足是业务上的失败必须回滚重试毫无意义。分布式事务中有很多模式的某些阶段要求最终成功。例如dtm的补偿操作是要求最终成功的只要还没成功就会不断进行重试直到成功。
6. 异常与子事务屏障
分布式事务之所以难主要是因为分布式系统中的各个节点都可能发生各种非预期的情况。本文先介绍分布式系统中的异常问题然后介绍这些问题带给分布式事务的挑战接下来指出现有各种常见用法的问题最后给出正确的方案。
6.1 NPC的挑战
分布式系统最大的敌人可能就是NPC了在这里它是Network Delay, Process Pause, Clock Drift的首字母缩写。我们先看看具体的NPC问题是什么
Network Delay网络延迟。虽然网络在多数情况下工作的还可以虽然TCP保证传输顺序和不会丢失但它无法消除网络延迟问题。Process Pause进程暂停。有很多种原因可以导致进程暂停比如编程语言中的GC垃圾回收机制会暂停所有正在运行的线程再比如我们有时会暂停云服务器从而可以在不重启的情况下将云服务器从一台主机迁移到另一台主机。我们无法确定性预测进程暂停的时长你以为持续几百毫秒已经很长了但实际上持续数分钟之久进程暂停并不罕见。Clock Drift时钟漂移。现实生活中我们通常认为时间是平稳流逝单调递增的但在计算机中不是。计算机使用时钟硬件计时通常是石英钟计时精度有限同时受机器温度影响。为了在一定程度上同步网络上多个机器之间的时间通常使用NTP协议将本地设备的时间与专门的时间服务器对齐这样做的一个直接结果是设备的本地时间可能会突然向前或向后跳跃。
分布式事务既然是分布式的系统自然也有NPC问题。因为没有涉及时间戳带来的困扰主要是NP。
6.2 现有方案的问题
我们看到开源项目dtm之外包括各云厂商各开源项目他们给出的业务实现建议大多类似如下这也是大多数用户最容易想到的方案
空补偿 “针对该问题在服务设计时需要允许空补偿即在没有找到要补偿的业务主键时返回补偿成功并将原业务主键记录下来标记该业务流水已补偿成功。”防悬挂 “需要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在如果存在则要拒绝执行该笔服务以免造成数据不一致。”
事实上NPC里的P和C以及P和C的组合有很多种的场景都可以导致上述竞态情况就不一一赘述了。
虽然这种情况发生的概率不高但是在金融领域一旦涉及金钱账目那么带来的影响可能是巨大的。
PS幂等控制如果也采用“先查再改”也是一样很容易出现类似的问题。解决这一类问题的关键点是要利用唯一索引“以改代查”来避免竞态条件。
6.3 子事务屏障 在dtm中首创了子事务屏障技术使用该技术能够非常便捷的解决异常问题极大的降低了分布式事务的使用门槛。
子事务屏障能够达到下面这个效果看示意图 所有这些请求到了子事务屏障后不正常的请求会被过滤正常请求通过屏障。开发者使用子事务屏障之后前面所说的各种异常全部被妥善处理业务开发人员只需要关注实际的业务逻辑负担大大降低。 子事务屏障提供了方法BranchBarrier.Call业务开发人员在busiCall里面编写自己的相关逻辑调用 BranchBarrier.Call 。 BranchBarrier.Call保证在空回滚、悬挂等场景下busiCall不会被调用在业务被重复调用时有幂等控制保证只被提交一次。子事务屏障会管理TCC、SAGA、事务消息等也可以扩展到其他领域
6.4 原理
子事务屏障技术的原理是在本地数据库建立分支操作状态表dtm_barrier唯一键为全局事务id-分支id-分支操作try|confirm|cancel
开启本地事务对于当前操作op(try|confirm|cancel)insert ignore一条数据gid-branchid-op如果插入不成功提交事务返回成功常见的幂等控制方法如果当前操作是cancel那么在insert ignore一条数据gid-branchid-try如果插入成功注意是成功则提交事务返回成功调用屏障内的业务逻辑如果业务返回成功则提交事务返回成功如果业务返回失败则回滚事务返回失败
在此机制下解决了乱序相关的问题
空补偿控制--如果Try没有执行直接执行了Cancel那么3中Cancel插入gid-branchid-try会成功不走屏障内的逻辑保证了空补偿控制幂等控制--2中任何一个操作都无法重复插入唯一键保证了不会重复执行防悬挂控制--Try在Cancel之后执行那么Cancel会在3中插入gid-branchid-try导致Try在2中不成功就不执行屏障内的逻辑保证了防悬挂控制 对于SAGA、二阶段消息也是类似的机制。
7. 更多高级场景
在实际应用中还遇见过一些业务场景需要一些额外的技巧进行处理
7.1 部分第三方操作无法回滚(go语言写了一点不要在意这些细节下面开始 .NET)
例如一个订单中的发货一旦给出了发货指令那么涉及线下相关操作那么很难直接回滚。对于涉及这类情况的saga如何处理呢
我们把一个事务中的操作分为可回滚的操作以及不可回滚的操作。那么把可回滚的操作放到前面把不可回滚的操作放在后面执行那么就可以解决这类问题 saga : dtmcli.NewSaga(DtmServer, shortuuid.New()).Add(Busi/CanRollback1, Busi/CanRollback1Revert, req).Add(Busi/CanRollback2, Busi/CanRollback2Revert, req).Add(Busi/UnRollback1, , req).Add(Busi/UnRollback2, , req).EnableConcurrent().AddBranchOrder(2, []int{0, 1}). // 指定step 2需要在01完成后执行AddBranchOrder(3, []int{0, 1}) // 指定step 3需要在01完成后执行
示例中的代码指定Step 23 中的 UnRollback 操作必须在Step 01 完成后执行。
对于不可回滚的操作DTM的设计建议是不可回滚的操作在业务上也不允许返回失败。可以这么思考如果发货的操作返回了失败那么这个失败的含义是不够清晰的调用方不知道这个失败是修改了部分数据的失败还是修改数据前的业务校验失败因为这个操作不可回滚所以调用方收到这个失败是不知道如何正确处理这个错误的。
另外当你的一个全局事务中如果出现了两个既不可回滚的又可能返回失败的操作那么到了实际运行中一个执行成功一个执行失败此时执行成功的那个事务无法回滚那么这个事务的一致性就不可能保证了。
对于发货操作如果可能在校验数据上可能发生失败那么将发货操作拆分为发货校验、发货两个服务则会清晰很多发货校验可回滚发货不可回滚同时也不会失败。
7.2 超时回滚
saga属于长事务因此持续的时间跨度很大可能是100ms到1天因此saga没有默认的超时时间。dtm支持saga事务单独指定超时时间到了超时时间全局事务就会回滚。
saga.TimeoutToFail 1800
在saga事务中设置超时时间一定要注意这类事务里不能够包含无法回滚的事务分支因为超时回滚时已执行的无法回滚的分支数据就是错的。
8.0 .NET CORE结合DTM实现Saga(C#启动)
8.1 准备工作(和前两期的注册环节、数据库差不多的操作看过前两期的小伙伴可跳过8.1阶段)
8.1.1 Nuget引入Dtmcli ItemGroupPackageReference IncludeDtmcli Version1.4.0 //ItemGroup
8.1.2 生成转账数据库(EF_CORE)
//模型
public partial class UserMoney
{public int id { get; set; }public int money { get; set; }public int trading_balance { get; set; }public int balance { get; set; }public int trymoney { get; set; }public string guid { get; set; }
}
8.1.3 DbContext public class DtmDbContext : DbContext{public DtmDbContext() { }public DtmDbContext(DbContextOptionsDtmDbContext options) : base(options) { }public virtual DbSetUserMoney UserMoney { get; set; }protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder){optionsBuilder.UseMySql(serverlocalhost;port3307;user idroot;password123;databaseDTM_Test, ServerVersion.Parse(8.0.23-mysql)).UseLoggerFactory(LoggerFactory.Create(option {option.AddConsole();}));}protected override void OnModelCreating(ModelBuilder modelBuilder){modelBuilder.UseCollation(utf8_general_ci).HasCharSet(utf8);modelBuilder.EntityUserMoney(entity {entity.ToTable(UserMoney);});}}
8.1.4 数据库持久化
CREATE TABLE
IFNOT EXISTS DTM_Test.barrier (id BIGINT ( 22 ) PRIMARY KEY AUTO_INCREMENT,trans_type VARCHAR ( 45 ) DEFAULT ,gid VARCHAR ( 128 ) DEFAULT ,branch_id VARCHAR ( 128 ) DEFAULT ,op VARCHAR ( 45 ) DEFAULT ,barrier_id VARCHAR ( 45 ) DEFAULT ,reason VARCHAR ( 45 ) DEFAULT COMMENT the branch type who insert this record,create_time datetime DEFAULT now( ),update_time datetime DEFAULT now( ),KEY ( create_time ),KEY ( update_time ),UNIQUE KEY ( gid, branch_id, op, barrier_id ) ) ENGINE INNODB DEFAULT CHARSET utf8mb4;
8.1.5 数据库最终生成 8.1.6 appsettings.json
{Logging: {LogLevel: {Default: Information,Microsoft.AspNetCore: Warning}},AllowedHosts: *,ConnectionString: serverlocalhost;port3307;user idroot;password123;databaseDTM_Test,DtmSettings: {TransactionUrl: http://localhost:5271,CompensateUrl: http://localhost:5271}
}
8.1.7 Program.cs // 注册DbContextbuilder.Services.AddDbContextDtmDbContext(options {options.UseMySql(builder.Configuration.GetValuestring(ConnectionString), ServerVersion.Parse(8.0.23-mysql));});builder.Services.ConfigureDtmSettings(builder.Configuration.GetSection(DtmSettings));builder.Services.AddDtmcli(dtm {dtm.DtmUrl http://localhost:36789;dtm.SqlDbType mysql;dtm.BarrierSqlTableName dtm_test.barrier;});
8.2 主程序事务API控制器
using DTM_EF;
using Dtmcli;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Options;
using System.Data.Common;
using MySqlConnector;
using DTM_EF.Model;
using Dtm_Saga;
using DtmCommon;
using Microsoft.CodeAnalysis.Operations;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using Newtonsoft.Json;namespace Dtm_Saga.Controllers
{[ApiController][Route([controller])]public class DtmSagaController : ControllerBase{private readonly ILoggerDtmSagaController _logger;private readonly IDtmClient _dtmClient;private readonly IDtmTransFactory _transFactory;private readonly DtmSettings _settings;private readonly IBranchBarrierFactory _factory;private readonly DtmDbContext _dtmDbContext;public DtmSagaController(ILoggerDtmSagaController logger,IDtmClient dtmClient,IDtmTransFactory transFactory,IOptionsDtmSettings settings,IBranchBarrierFactory factory,DtmDbContext dtmDbContext){_logger logger;_dtmClient dtmClient;_transFactory transFactory;_settings settings.Value;_factory factory;_dtmDbContext dtmDbContext;}[HttpPost(dtm-Saga)]public async TaskIActionResult Get(int Money, CancellationToken cancellationToken){var obj TransResponse.BuildFailureResponse();try{//1. 创建gid。var gid await _dtmClient.GenGid(cancellationToken);//2. 用户模型。UserMoney bodyA new UserMoney() { id 1, trymoney -Money, guid string.Empty };UserMoney bodyB new UserMoney() { id 2, trymoney Money, guid string.Empty };//3. 设置分支事务和补偿事务。var saga _transFactory.NewSaga(gid).Add(_settings.TransactionUrl /Saga/UserATransactionUrl, _settings.CompensateUrl /Saga/UserACompensateUrl, bodyA).Add(_settings.TransactionUrl /Saga/UserBTransactionUrl, _settings.CompensateUrl /Saga/UserBCompensateUrl, bodyB).EnableWaitResult();//开启了EnableWaitResult()则可通过捕获异常的方式捕获事务失败的结果。//4. 执行submitawait saga.Submit();Console.ForegroundColor ConsoleColor.Red;Console.WriteLine(result gid is {0}, gid);Console.ResetColor();obj TransResponse.BuildSucceedResponse();}catch (DtmException ex){obj TransResponse.BuildFailureResponse();}return Ok(obj);}}
}8.3 用户1转账事务API控制器
using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;
using ServiceStack.Redis;
using System.Threading;namespace Dtm_Saga.Controllers
{[Route(api/[controller])][ApiController]public class SagaUserAController : ControllerBase{private readonly IBranchBarrierFactory _barrierFactory;private readonly ILoggerSagaUserAController _Logger;private readonly DtmDbContext _dtmDbContext;private readonly IRedisClient _redisClient;private readonly RedisService _redisService;public SagaUserAController(IBranchBarrierFactory barrierFactory,ILoggerSagaUserAController Logger,DtmDbContext dtmDbContext,IRedisClient redisClient,RedisService redisService){_barrierFactory barrierFactory;_Logger Logger;_dtmDbContext dtmDbContext;_redisClient redisClient;_redisService redisService;}[HttpPost][Route(/Saga/UserATransactionUrl)]public async TaskIActionResult UserATransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);var obj TransResponse.BuildSucceedResponse();using (MySqlConnection conn new MySqlConnection(serverlocalhost;port3307;user idroot;password123;databaseDTM_Test)){try{await branchBarrier.Call(conn, async (tx) {//获取用户账户信息var UserMoney _dtmDbContext.SetUserMoney().Where(c c.id body.id).FirstOrDefault();if (UserMoney is null){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--不存在);}if (UserMoney.money body.trymoney 0){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--金额不足);}//前序判断都通过修改信息准备提交 UserMoney!.money body.trymoney;_dtmDbContext.SaveChanges();await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}}return Ok(obj);}[HttpPost][Route(/Saga/UserACompensateUrl)]public async TaskIActionResult UserACompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){//var branchBarrier _barrierFactory.CreateBranchBarrier(trans_type, gid, branch_id, op);var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);var obj TransResponse.BuildSucceedResponse();using (MySqlConnection conn new MySqlConnection(serverlocalhost;port3307;user idroot;password123;databaseDTM_Test)){try{await branchBarrier.Call(conn, async (tx) {//获取用户账户信息var UserMoney _dtmDbContext.SetUserMoney().Where(c c.id body.id).FirstOrDefault();if (UserMoney is null){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--不存在);}//前序判断都通过修改信息准备提交 UserMoney!.money - body.trymoney;_dtmDbContext.SaveChanges();await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}}return Ok(obj);}}
}
8.4 用户2转账事务API控制器
using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using ServiceStack.Redis;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;namespace Dtm_Saga.Controllers
{[Route(api/[controller])][ApiController]public class SagaUserBController : ControllerBase{private readonly IBranchBarrierFactory _barrierFactory;private readonly ILoggerSagaUserBController _Logger;private readonly DtmDbContext _dtmDbContext;private readonly IRedisClient _redisClient;private readonly RedisService _redisService;public SagaUserBController(IBranchBarrierFactory barrierFactory,ILoggerSagaUserBController Logger,DtmDbContext dtmDbContext,IRedisClient redisClient,RedisService redisService){_barrierFactory barrierFactory;_Logger Logger;_dtmDbContext dtmDbContext;_redisClient redisClient;_redisService redisService;}[HttpPost][Route(/Saga/UserBTransactionUrl)]public async TaskIActionResult UserBTransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);var obj TransResponse.BuildSucceedResponse();using (MySqlConnection conn new MySqlConnection(serverlocalhost;port3307;user idroot;password123;databaseDTM_Test)){try{await branchBarrier.Call(conn, async (tx) {//获取用户账户信息var UserMoney _dtmDbContext.SetUserMoney().Where(c c.id body.id).FirstOrDefault();if (UserMoney is null){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--不存在);}if (UserMoney.money body.trymoney 0){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--金额不足);}//前序判断都通过修改信息准备提交 UserMoney!.money body.trymoney;_dtmDbContext.SaveChanges();await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}}return Ok(obj);}[HttpPost][Route(/Saga/UserBCompensateUrl)]public async TaskIActionResult UserBCompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);var obj TransResponse.BuildSucceedResponse();using (MySqlConnection conn new MySqlConnection(serverlocalhost;port3307;user idroot;password123;databaseDTM_Test)){try{await branchBarrier.Call(conn, async (tx) {//获取用户账户信息var UserMoney _dtmDbContext.SetUserMoney().Where(c c.id body.id).FirstOrDefault();if (UserMoney null){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--不存在);}//修改信息准备提交 UserMoney!.money - body.trymoney;_dtmDbContext.SaveChanges();await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}}return Ok(obj);}}
}9. 开始运行
9.1 先给A和B两位用户各1000块钱。 9.2 执行转账 转100。 转-200。
此时我们可以看到Saga分布式事务已经正常执行并完成了转100和-200的操作。
10. 并发下执行Saga分布式事务
我们思考一个问题每次只有一个请求的时候Saga分布式事务完美运行但是在高并发下也能正常运行吗我们测试一下。
打开apipost输入请求地址选择一键压测。 每次请求转账10元10个并发。难道真的能转账成功吗(执行之前恢复数据A1000元B1000元)。我们开始运行。 执行之后可以看到A960元B1080元。按照常理来说每次转账10块钱并发10次。10*10100元应该是A900元B1100元。但是为什么会出现这个情况呢?
原因就是上一个并发还没执行完当前并发也会访问数据库资源。数据库执行冲突导致金额转账失败。如果你的程序写成这样基本就可以卷铺盖走人了。
那我们应该如何解决呢有的小伙伴会说可以使用RabbitMQ消息队列(.NET CORE消息队列RabbitMQ-CSDN博客)。这确实是一个解决方案把数据交给RabbitMQ。然后订阅一个一个数据的执行一个一个用户进行扣款或转账。确实可以完美解决这个并发的问题。解决并发有很多解决方案。今天呢就不用RabbitMQ了换一个。用Redis缓存数据库的分布式锁来解决这个并发的问题。(针对于Redis缓存数据库的分布式锁的非阻塞锁、阻塞锁、红锁以及锁的续命.NET CORE使用Redis分布式锁续命(续期)问题-CSDN博客缓存数据类型缓存apiLua脚本主从模式读写分离哨兵模式集群模式等........之后陆续会推出文章现在不做过多赘述先解决这个并发问题。)
10.1 Nuget引入StackExchange.Redis ItemGroupPackageReference IncludeStackExchange.Redis Version2.7.4 //ItemGroup
ServiceStack.Redis自3.9版本以后开始收费我们坚持一贯的作风能不花钱的就不花钱。但是ServiceStack.Redis提供的分布式锁api写的是真的很好StackExchange.Redis里没有阻塞锁。我们需要自己手写。
using Microsoft.AspNetCore.DataProtection.KeyManagement;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using System.Diagnostics;
using System.Net.Sockets;
using System.Threading;namespace Dtm_Saga
{public class RedisService{private readonly ConnectionMultiplexer _redis;private readonly IDatabase _database;/// summary/// 初始化 see crefRedisService/ 类的新实例。/// /summary/// param nameconnectionMultiplexer连接多路复用器。/parampublic RedisService(string connectionString){_redis ConnectionMultiplexer.Connect(connectionString);_database _redis.GetDatabase();}#region 分布式锁...#region 阻塞锁public bool RedisLock(string key, int expireMilliSeconds, int timeout){var script local isNX redis.call(SETNX, KEYS[1], ARGV[1])if isNX 1 thenredis.call(PEXPIRE, KEYS[1], ARGV[2])return 1endreturn 0;RedisKey[] scriptkey { key };RedisValue[] scriptvalues { key, expireMilliSeconds * 1000 };var stopwatch Stopwatch.StartNew();while (stopwatch.Elapsed.TotalSeconds timeout){if (_database.ScriptEvaluate(script, scriptkey, scriptvalues).ToString() 1){stopwatch.Stop();return true;}}Console.WriteLine($[{DateTime.Now}]{key}--阻塞锁超时);stopwatch.Stop();return false;}public bool RedisUnLock(string key){var script local getLock redis.call(GET, KEYS[1])if getLock ARGV[1] thenredis.call(DEL, KEYS[1])return 1endreturn 0;RedisKey[] scriptkey { key };RedisValue[] scriptvalues { key };return _database.ScriptEvaluate(script, scriptkey, scriptvalues).ToString() 1;}#endregion#endregion}
}用Redis的Lua脚本来实现阻塞锁的机制Lua脚本在Redis中被认为是原子性的。在执行Lua脚本期间Redis不会并行处理其他客户端的命令而是将它们排队等待。因此Lua脚本中的所有指令都会连续无中断地执行不会与其他任何命令交错。
10.2 Program.cs代码修改
Program中注册Redis这里连接超时加了500000秒是要高并发时程序一直在请求Redis实际生产环境需要多次测试选择一个最佳的超时时间。 builder.Services.AddSingletonRedisService(provider {//你的Redis连接字符串string redisConnectionString 127.0.0.1:6379,abortConnectfalse,syncTimeout500000;return new RedisService(redisConnectionString);});
10.3 用户1转账事务API控制器代码修改
using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;
using ServiceStack.Redis;
using System.Threading;namespace Dtm_Saga.Controllers
{[Route(api/[controller])][ApiController]public class SagaUserAController : ControllerBase{private readonly IBranchBarrierFactory _barrierFactory;private readonly ILoggerSagaUserAController _Logger;private readonly DtmDbContext _dtmDbContext;private readonly IRedisClient _redisClient;private readonly RedisService _redisService;public SagaUserAController(IBranchBarrierFactory barrierFactory,ILoggerSagaUserAController Logger,DtmDbContext dtmDbContext,IRedisClient redisClient,RedisService redisService){_barrierFactory barrierFactory;_Logger Logger;_dtmDbContext dtmDbContext;_redisClient redisClient;_redisService redisService;}[HttpPost][Route(/Saga/UserATransactionUrl)]public async TaskIActionResult UserATransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);var obj TransResponse.BuildSucceedResponse();using (MySqlConnection conn new MySqlConnection(serverlocalhost;port3307;user idroot;password123;databaseDTM_Test)){try{await branchBarrier.Call(conn, async (tx) {//Redis分布式锁锁定if (_redisService.RedisLock(DataLock:UserATransactionUrl, 2000, 2000)){//获取用户账户信息var UserMoney _dtmDbContext.SetUserMoney().Where(c c.id body.id).FirstOrDefault();if (UserMoney is null){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--不存在);}if (UserMoney.money body.trymoney 0){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--金额不足);}//前序判断都通过修改信息准备提交 UserMoney!.money body.trymoney;_dtmDbContext.SaveChanges();}await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}finally{//Redis分布式锁释放锁_redisService.RedisUnLock(DataLock:UserATransactionUrl);}}return Ok(obj);}[HttpPost][Route(/Saga/UserACompensateUrl)]public async TaskIActionResult UserACompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){//var branchBarrier _barrierFactory.CreateBranchBarrier(trans_type, gid, branch_id, op);var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);var obj TransResponse.BuildSucceedResponse();using (MySqlConnection conn new MySqlConnection(serverlocalhost;port3307;user idroot;password123;databaseDTM_Test)){try{await branchBarrier.Call(conn, async (tx) {//Redis分布式锁锁定if (_redisService.RedisLock(DataLock:UserACompensateUrl, 2000, 2000)){//获取用户账户信息var UserMoney _dtmDbContext.SetUserMoney().Where(c c.id body.id).FirstOrDefault();if (UserMoney is null){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--不存在);}//前序判断都通过修改信息准备提交 UserMoney!.money - body.trymoney;_dtmDbContext.SaveChanges();}await Task.CompletedTask;});}catch (Exception ex) { _Logger.LogError(ex.Message); }finally{//Redis分布式锁释放锁_redisService.RedisUnLock(DataLock:UserACompensateUrl);}}return Ok(obj);}}
}10.4 用户2转账事务API控制器代码修改
using DTM_EF;
using DTM_EF.Model;
using Dtm_Saga;
using Dtmcli;
using ServiceStack.Redis;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Distributed;
using MySqlConnector;
using Newtonsoft.Json;namespace Dtm_Saga.Controllers
{[Route(api/[controller])][ApiController]public class SagaUserBController : ControllerBase{private readonly IBranchBarrierFactory _barrierFactory;private readonly ILoggerSagaUserBController _Logger;private readonly DtmDbContext _dtmDbContext;private readonly IRedisClient _redisClient;private readonly RedisService _redisService;public SagaUserBController(IBranchBarrierFactory barrierFactory,ILoggerSagaUserBController Logger,DtmDbContext dtmDbContext,IRedisClient redisClient,RedisService redisService){_barrierFactory barrierFactory;_Logger Logger;_dtmDbContext dtmDbContext;_redisClient redisClient;_redisService redisService;}[HttpPost][Route(/Saga/UserBTransactionUrl)]public async TaskIActionResult UserBTransactionUrl([FromQuery] string gid, [FromQuery] string trans_type, [FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);var obj TransResponse.BuildSucceedResponse();using (MySqlConnection conn new MySqlConnection(serverlocalhost;port3307;user idroot;password123;databaseDTM_Test)){try{await branchBarrier.Call(conn, async (tx) {//Redis分布式锁锁定if (_redisService.RedisLock(DataLock:UserBTransactionUrl, 2000, 2000)){//获取用户账户信息var UserMoney _dtmDbContext.SetUserMoney().Where(c c.id body.id).FirstOrDefault();if (UserMoney is null){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--不存在);}if (UserMoney.money body.trymoney 0){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--金额不足);}//前序判断都通过修改信息准备提交 UserMoney!.money body.trymoney;_dtmDbContext.SaveChanges();}await Task.CompletedTask;});}catch (Exception ex){_Logger.LogError(ex.Message);}finally{//Redis分布式锁释放锁_redisService.RedisUnLock(DataLock:UserBTransactionUrl);}}return Ok(obj);}[HttpPost][Route(/Saga/UserBCompensateUrl)]public async TaskIActionResult UserBCompensateUrl([FromQuery] string gid, [FromQuery] string trans_type,[FromQuery] string branch_id, [FromQuery] string op, [FromBody] UserMoney body){var branchBarrier _barrierFactory.CreateBranchBarrier(Request.Query);var obj TransResponse.BuildSucceedResponse();using (MySqlConnection conn new MySqlConnection(serverlocalhost;port3307;user idroot;password123;databaseDTM_Test)){try{await branchBarrier.Call(conn, async (tx) {//Redis分布式锁锁定if (_redisService.RedisLock(DataLock:UserBCompensateUrl, 2000, 2000)){//获取用户账户信息var UserMoney _dtmDbContext.SetUserMoney().Where(c c.id body.id).FirstOrDefault();if (UserMoney null){obj TransResponse.BuildFailureResponse();throw new Exception($用户{body.id}--不存在);}//修改信息准备提交 UserMoney!.money - body.trymoney;_dtmDbContext.SaveChanges();}await Task.CompletedTask;});}catch (Exception ex) { _Logger.LogError(ex.Message); }finally{//Redis分布式锁释放锁_redisService.RedisUnLock(DataLock:UserACompensateUrl);}}return Ok(obj);}}
}谨记-----释放锁的时候每个锁的释放代码只能在finally出现一次。不能在try里也写上释放锁。虽然当前并发在try里释放和finally里释放运行并没有问题。但是下一个并发在执行的时候上一个执行到try释放锁之后立即抢锁抢锁成功。结果在执行的时候上一个并发在执行finally的时候给释放了这样是不对的。
10.5 Redis启动 修改好代码之后继续运行。(先启动一个win版本的Redislinux docker启动等操作之后推出文章。数据进行恢复A1000元B1000元)。每次转账1元直接上100200500并发。
转账1元100并发 转账1元200并发(让程序跑一会喝杯水) 转账1元500并发(可以离开工位出去透透气) 为什么A1627元B373元 呢单次并发有点多导致HTTP请求超时。这时候当前的服务就要部署多个实例可参考之前文章微服务架构Nacos(.NET CORE微服务之Nacos_nacos .net core-CSDN博客.NET CORE微服务之Ocelot(连接Nacos)_net ocelot noces-CSDN博客.NET CORE微服务之Polly_polly .net core-CSDN博客)或用Nginx反向代理实现负载均衡。
虽然出现问题也无法避免人工介入。但是我们最起码保证了用户资产并未出现超减或超加现象这也是电商中的秒杀防止商品超卖解决方案。当然应对高并发还有非常多的解决方案。
小结
本文给出了一个完整的 SAGA 事务方案是一个可以实际运行的 SAGA并解决高并发的使用场景您只需要在这个示例的基础上进行简单修改就能够用于解决您的真实问题
本文项目代码资源地址【免费】.NETCORE分布式事务(三)DTM实现Saga及高并发下的解决方案资源-CSDN文库