移动网站的设计报告,深圳 网站设计 公司,免费推广途径,软件维护有哪些内容一.dtm分布式事务框架之SAGA
1.1DTM介绍
DTM是一款开源的分布式事务管理器#xff0c;解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。
通俗一点说#xff0c;DTM提供跨服务事务能力#xff0c;一组服务要么全部成功#xff0c;要么全部回滚#xff0c;避免只更…一.dtm分布式事务框架之SAGA
1.1DTM介绍
DTM是一款开源的分布式事务管理器解决跨数据库、跨服务、跨语言栈更新数据的一致性问题。
通俗一点说DTM提供跨服务事务能力一组服务要么全部成功要么全部回滚避免只更新了一部分数据产生的一致性问题。
您可以在为什么选DTM中了解更多DTM的设计初衷。
1.2SAGA介绍
10分钟说透Saga分布式事务
Saga是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务由Saga事务协调器协调如果各个本地事务成功完成那就正常完成如果某个步骤失败则根据相反顺序一次调用补偿操作。
与tcc(try,commit,cancel)不同,saga取消了commit阶段.可以出现中间状态.例如saga分布式事务(saga是dtm框架一部分): 1.从前往后执行事务,执行出错,向前补偿(回滚) 2.没有configm阶段,B可以看见中间状态 1.3.各种分布式事务应用场景 1.4DTM安装
这里采用的是源码编译安装
git clone https://github.com/dtm-labs/dtm cd dtm
go build启动后的界面如下 1.5HTTP-SAGA转账
这里参考的是DTM的SAGA例子
1.5.1创建我们的用户表
CREATE TABLE user_account (id int(11) NOT NULL AUTO_INCREMENT,user_id int(11) NOT NULL,balance decimal(10,2) NOT NULL DEFAULT 0.00,trading_balance decimal(10,2) NOT NULL DEFAULT 0.00,create_time datetime DEFAULT CURRENT_TIMESTAMP,update_time datetime DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id),UNIQUE KEY user_id (user_id)
) ENGINEInnoDB AUTO_INCREMENT3 DEFAULT CHARSETutf8mb41.5.2编写核心业务代码
调整用户的账户余额
func SagaAdjustBalance(db *gorm.DB, uid int, amount float64) error {lock.Lock()defer lock.Unlock()if amount 0 {var userAccount UserAccount{}db.First(userAccount, user_id?, uid)if userAccount.Balance amount {return fmt.Errorf(余额不足)}}t : db.Exec(update user_account set balance ? where user_id ?, gorm.Expr(balance ?, amount), uid)if t.Error ! nil {return t.Error}return nil
}再来编写具体的正向操作/补偿操作的处理函数
r.POST(/SagaBTransIn, func(c *gin.Context) {fmt.Printf(开始转入)userID : 1err SagaAdjustBalance(db, userID, 100)if err ! nil {fmt.Printf(转入失败:%s\r\n, err.Error())}fmt.Println(转入成功)})r.POST(/SagaBTransInCom, func(c *gin.Context) {fmt.Printf(转入失败开始补偿)userID : 1err SagaAdjustBalance(db, userID, -100)if err ! nil {fmt.Printf(转入补偿失败:%s\r\n, err.Error())}fmt.Println(转入补偿成功)})r.POST(/SagaBTransOut, func(c *gin.Context) {fmt.Printf(开始转出)userID : 3err SagaAdjustBalance(db, userID, -100)if err ! nil {if err.Error() 余额不足 {c.JSON(http.StatusConflict, gin.H{})return}fmt.Printf(转出失败:%s\r\n, err.Error())c.JSON(500, gin.H{message: err.Error()})return}fmt.Println(转出成功)})r.POST(/SagaBTransOutCom, func(c *gin.Context) {fmt.Printf(转出补偿)userID : 3err SagaAdjustBalance(db, userID, 100)if err ! nil {fmt.Printf(转出补偿失败:%s\r\n, err.Error())}fmt.Println(转出补偿成功)})到此各个子事务的处理函数已经OK了然后是开启SAGA事务进行分支调用 r.GET(/start, func(c *gin.Context) {req : gin.H{}dmtServer : http://127.0.0.1:36789/api/dtmsvrqsBusi : http://127.0.0.1:8089saga : dtmcli.NewSaga(dmtServer, shortuuid.New()).// 添加一个TransOut的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransOutComAdd(qsBusi/SagaBTransOut, qsBusi/SagaBTransOutCom, req).// 添加一个TransIn的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransInComAdd(qsBusi/SagaBTransIn, qsBusi/SagaBTransInCom, req)// 提交saga事务dtm会完成所有的子事务/回滚所有的子事务saga.WaitResult trueerr : saga.Submit()if err ! nil {c.JSON(500, gin.H{message: err.Error()})}c.JSON(200, gin.H{message: ok})})完整代码如下
package mainimport (fmtgithub.com/dtm-labs/client/dtmcligithub.com/gin-gonic/gingithub.com/lithammer/shortuuid/v3gorm.io/driver/mysqlgorm.io/gormglog gorm.io/gorm/loggerlognet/httpossynctime
)type UserAccount struct {ID int gorm:column:id;primary_keyUserId int gorm:user_idBalance float64 gorm:balanceTradingBalance float64 gorm:trading_balance
}func (UserAccount) TableName() string {return user_account
}var lock sync.Mutexfunc SagaAdjustBalance(db *gorm.DB, uid int, amount float64) error {lock.Lock()defer lock.Unlock()if amount 0 {var userAccount UserAccount{}db.First(userAccount, user_id?, uid)if userAccount.Balance amount {return fmt.Errorf(余额不足)}}t : db.Exec(update user_account set balance ? where user_id ?, gorm.Expr(balance ?, amount), uid)if t.Error ! nil {return t.Error}return nil
}var db *gorm.DBfunc InitDB() error {var err errordsn : fmt.Sprintf(%s:%stcp(%s:%s)/%s?charsetutf8mb4parseTimeTruelocLocal,root,123456,127.0.0.1,3306,dtm)//希望大家自己可以去封装loggernewLogger : glog.New(log.New(os.Stdout, \r\n, log.LstdFlags), // io writer日志输出的目标前缀和日志包含的内容——译者注glog.Config{SlowThreshold: time.Second, // 慢 SQL 阈值LogLevel: glog.Info, // 日志级别IgnoreRecordNotFoundError: true, // 忽略ErrRecordNotFound记录未找到错误Colorful: false, // 禁用彩色打印},)db, err gorm.Open(mysql.Open(dsn), gorm.Config{Logger: newLogger,})if err ! nil {return err}return nil
}func main() {err : InitDB()if err ! nil {panic(err)}r : gin.Default()r.POST(/SagaBTransIn, func(c *gin.Context) {fmt.Printf(开始转入)userID : 1err SagaAdjustBalance(db, userID, 100)if err ! nil {fmt.Printf(转入失败:%s\r\n, err.Error())}fmt.Println(转入成功)})r.POST(/SagaBTransInCom, func(c *gin.Context) {fmt.Printf(转入失败开始补偿)userID : 1err SagaAdjustBalance(db, userID, -100)if err ! nil {fmt.Printf(转入补偿失败:%s\r\n, err.Error())}fmt.Println(转入补偿成功)})r.POST(/SagaBTransOut, func(c *gin.Context) {fmt.Printf(开始转出)userID : 3err SagaAdjustBalance(db, userID, -100)if err ! nil {if err.Error() 余额不足 {c.JSON(http.StatusConflict, gin.H{})return}fmt.Printf(转出失败:%s\r\n, err.Error())c.JSON(500, gin.H{message: err.Error()})return}fmt.Println(转出成功)})r.POST(/SagaBTransOutCom, func(c *gin.Context) {fmt.Printf(转出补偿)userID : 3err SagaAdjustBalance(db, userID, 100)if err ! nil {fmt.Printf(转出补偿失败:%s\r\n, err.Error())}fmt.Println(转出补偿成功)})r.GET(/start, func(c *gin.Context) {req : gin.H{}dmtServer : http://127.0.0.1:36789/api/dtmsvrqsBusi : http://127.0.0.1:8089saga : dtmcli.NewSaga(dmtServer, shortuuid.New()).// 添加一个TransOut的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransOutComAdd(qsBusi/SagaBTransOut, qsBusi/SagaBTransOutCom, req).// 添加一个TransIn的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransInComAdd(qsBusi/SagaBTransIn, qsBusi/SagaBTransInCom, req)// 提交saga事务dtm会完成所有的子事务/回滚所有的子事务saga.WaitResult trueerr : saga.Submit()if err ! nil {c.JSON(500, gin.H{message: err.Error()})}c.JSON(200, gin.H{message: ok})})r.Run(:8089)
}1.5.3测试
启动 main.go在浏览上运行http://127.0.0.1:8089/start
可以看到如下的运行结果
[GIN-debug] Listening and serving HTTP on :8089
开始转出
2023/12/07 10:16:06 E:/Linuxshare/GoStart/dtm/main.go:37
[64.070ms] [rows:1] SELECT * FROM user_account WHERE user_id3 ORDER BY user_account.id LIMIT 1
gid3NujmbFwy6caKsX88fkApjopactiontrans_typesaga
开始转入
2023/12/07 10:16:06 E:/Linuxshare/GoStart/dtm/main.go:42
[5.984ms] [rows:1] update user_account set balance balance 100 where user_id 1
转入成功
[GIN] 2023/12/07 - 10:16:06 | 200 | 51.7071ms | 127.0.0.1 | POST /SagaBTransIn?branch_id02
gid3NujmbFwy6caKsX88fkApjopactiontrans_typesaga
[GIN] 2023/12/07 - 10:16:06 | 200 | 667.8659ms | 127.0.0.1 | GET /start
[GIN] 2023/12/07 - 10:16:06 | 404 | 0s | 127.0.0.1 | GET /favicon.ico 1.6GRPC-SAGA库存服务
1.6.1复制一份conf.sample.yml 改名为conf.yaml
这里面采用的通信协议是kratos,代码如下
MicroService: # gRPC/HTTP based microservice configDriver: dtm-driver-kratos # name of the driver to handle register/discoverTarget: consul://127.0.0.1:8500/dtmservice # register dtm server to this urlEndPoint: grpc://127.0.0.1:36790
修改完后重新启动DTM启动的时候要加参数如下图所示 启动完后就可以看到已经注册到consul上去了 1.6.2编写具体的服务
SAGA库存服务的具体代码如下
package mainimport (proto GoStart/api/inventory/v1fmtgithub.com/dtm-labs/client/dtmgrpcgithub.com/gin-gonic/gingithub.com/google/uuid
)func main() {r : gin.Default()r.GET(/start, func(c *gin.Context) {orderSn : uuid.NewString()req : proto.SellInfo{GoodsInfo: []*proto.GoodsInvInfo{{GoodsId: 421,Num: 2,},},OrderSn: orderSn,}dmtServer : 127.0.0.1:36790qsBusi : discovery:///inventory-srvuid : uuid.NewString()fmt.Println(uid, uid)saga : dtmgrpc.NewSagaGrpc(dmtServer, orderSn).// 添加一个TransOut的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransOutComAdd(qsBusi/Inventory/Sell, qsBusi/Inventory/Reback, req)// 提交saga事务dtm会完成所有的子事务/回滚所有的子事务saga.WaitResult trueerr : saga.Submit()if err ! nil {c.JSON(500, gin.H{message: err.Error()})}c.JSON(200, gin.H{message: ok})})r.Run(:8089)
}1.6.3启动服务进行测试
商品服务 订单服务 库存服务 库存服务原先的数据如下 这时候运行SAGA库存服务的代码然后在浏览器上访问http://127.0.0.1:8089/start可以在库存服务看到如下运行情况
2023-12-07 10:41:27.639 INFO v1/inventory.go:56 订单a0ee4972-407e-4625-a478-c8ccbe42c28d扣减库存2023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:94
[4.898ms] [rows:1] SELECT * FROM inventory WHERE goods 421 AND inventory.deleted_at IS NULL ORDER BY inventory.id LIMIT 12023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:58
[5.495ms] [rows:1] UPDATE inventory SET stocksstocks - 2 WHERE goods421 AND stocks 2 AND inventory.deleted_at IS NULL 2023/12/07 10:41:27 E:/Linuxshare/mxshop/app/inventory/srv/internal/data/v1/db/inventory.go:76
[17.646ms] [rows:1] INSERT INTO stockselldetail (order_sn,status,detail) VALUES (a0ee4972-407e-4625-a478-c8ccbe42c28d,1,
[{Goods:421,Num:2}])
再看数据库库存的数据已发生变动库存明细数据也行插入了 1.7事务屏障达到通过gin集成转入转出功能
1.7.1事务屏障介绍
异常与子事务屏障
分布式事务之所以难主要是因为分布式系统中的各个节点都可能发生各种非预期的情况。本文先介绍分布式系统中的异常问题然后介绍这些问题带给分布式事务的挑战接下来指出现有各种常见用法的问题最后给出正确的方案。
NPC的挑战
分布式系统最大的敌人可能就是NPC了在这里它是Network Delay, Process Pause, Clock Drift的首字母缩写。我们先看看具体的NPC问题是什么
Network Delay网络延迟。虽然网络在多数情况下工作的还可以虽然TCP保证传输顺序和不会丢失但它无法消除网络延迟问题。Process Pause进程暂停。有很多种原因可以导致进程暂停比如编程语言中的GC垃圾回收机制会暂停所有正在运行的线程再比如我们有时会暂停云服务器从而可以在不重启的情况下将云服务器从一台主机迁移到另一台主机。我们无法确定性预测进程暂停的时长你以为持续几百毫秒已经很长了但实际上持续数分钟之久进程暂停并不罕见。Clock Drift时钟漂移。现实生活中我们通常认为时间是平稳流逝单调递增的但在计算机中不是。计算机使用时钟硬件计时通常是石英钟计时精度有限同时受机器温度影响。为了在一定程度上同步网络上多个机器之间的时间通常使用NTP协议将本地设备的时间与专门的时间服务器对齐这样做的一个直接结果是设备的本地时间可能会突然向前或向后跳跃。
分布式事务既然是分布式的系统自然也有NPC问题。因为没有涉及时间戳带来的困扰主要是NP。
异常分类
我们以分布式事务中的TCC作为例子看看NP带来的影响。
一般情况下一个TCC回滚时的执行顺序是先执行完Try再执行Cancel但是由于N则有可能Try的网络延迟大导致先执行Cancel再执行Try。
这种情况就引入了分布式事务中的两个难题
空补偿 Cancel执行时Try未执行事务分支的Cancel操作需要判断出Try未执行这时需要忽略Cancel中的业务数据更新直接返回悬挂 Try执行时Cancel已执行完成事务分支的Try操作需要判断出Cancel已执行这时需要忽略Try中的业务数据更新直接返回
分布式事务还有一类需要处理的常见问题就是重复请求
幂等 由于任何一个请求都可能出现网络异常出现重复请求所有的分布式事务分支操作都需要保证幂等性
因为空补偿、悬挂、重复请求都跟NP有关我们把他们统称为子事务乱序问题。在业务处理中需要小心处理好这三种问题否则会出现错误数据。
异常原因
下面看一个网络异常的时序图更好的理解上述几种问题 业务处理请求4的时候Cancel在Try之前执行需要处理空回滚业务处理请求6的时候Cancel重复执行需要幂等业务处理请求8的时候Try在Cancel后执行需要处理悬挂
现有方案的问题
我们看到开源项目dtm之外包括各云厂商各开源项目他们给出的业务实现建议大多类似如下这也是大多数用户最容易想到的方案
空补偿 “针对该问题在服务设计时需要允许空补偿即在没有找到要补偿的业务主键时返回补偿成功并将原业务主键记录下来标记该业务流水已补偿成功。”防悬挂 “需要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在如果存在则要拒绝执行该笔服务以免造成数据不一致。”
上述的这种实现能够在大部分情况下正常运行但是上述做法中的“先查后改”在并发情况下是容易掉坑里的我们分析以下如下场景
正常执行顺序下Try执行时在查完没有空补偿记录的业务主键之后事务提交之前如果发生了进程暂停P或者事务内部进行网络请求出现了拥塞导致本地事务等待较久全局事务超时后Cancel执行因为没有查到要补偿的业务主键因此判断是空补偿返回Try的进程暂停结束最后提交本地事务全局事务回滚完成后Try分支的业务操作没有被回滚产生了悬挂
事实上NPC里的P和C以及P和C的组合有很多种的场景都可以导致上述竞态情况就不一一赘述了。
虽然这种情况发生的概率不高但是在金融领域一旦涉及金钱账目那么带来的影响可能是巨大的。
PS幂等控制如果也采用“先查再改”也是一样很容易出现类似的问题。解决这一类问题的关键点是要利用唯一索引“以改代查”来避免竞态条件。
子事务屏障
我们在dtm中首创了子事务屏障技术使用该技术能够非常便捷的解决异常问题极大的降低了分布式事务的使用门槛。
子事务屏障能够达到下面这个效果看示意图 所有这些请求到了子事务屏障后不正常的请求会被过滤正常请求通过屏障。开发者使用子事务屏障之后前面所说的各种异常全部被妥善处理业务开发人员只需要关注实际的业务逻辑负担大大降低。 子事务屏障提供了方法BranchBarrier.CallWithDB 方法的原型为
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BusiFunc) error业务开发人员在busiCall里面编写自己的相关逻辑调用 BranchBarrier.CallWithDB 。 BranchBarrier.CallWithDB 保证在空回滚、悬挂等场景下busiCall不会被调用在业务被重复调用时有幂等控制保证只被提交一次。
子事务屏障会管理TCC、SAGA、事务消息等也可以扩展到其他领域
原理
子事务屏障技术的原理是在本地数据库建立分支操作状态表dtm_barrier唯一键为全局事务id-分支id-分支操作try|confirm|cancel
开启本地事务对于当前操作op(try|confirm|cancel)insert ignore一条数据gid-branchid-op如果插入不成功提交事务返回成功常见的幂等控制方法如果当前操作是cancel那么在insert ignore一条数据gid-branchid-try如果插入成功注意是成功则提交事务返回成功调用屏障内的业务逻辑如果业务返回成功则提交事务返回成功如果业务返回失败则回滚事务返回失败
在此机制下解决了乱序相关的问题
空补偿控制–如果Try没有执行直接执行了Cancel那么3中Cancel插入gid-branchid-try会成功不走屏障内的逻辑保证了空补偿控制幂等控制–2中任何一个操作都无法重复插入唯一键保证了不会重复执行防悬挂控制–Try在Cancel之后执行那么Cancel会在3中插入gid-branchid-try导致Try在2中不成功就不执行屏障内的逻辑保证了防悬挂控制
对于SAGA、二阶段消息也是类似的机制。
原理图解
下面我们以图的方式来详解子事务屏障因为Confirm操作不涉及空补偿和悬挂所以重点看Try与CancelTry对应图中的ACancel对应图中的C
子事务屏障中对应的幂等处理部分 这部分就是常规的幂等处理部分往数据库中插入一个唯一键如果是重复请求那么插入失败直接失败返回。
子事务屏障技术就是在上述的幂等处理部分添加一个步骤–补偿服务再插入一条A记录正常流程下会因为唯一键冲突导致插入失败往下执行业务。 当发生乱序假设C在A前面执行那么会发生下面的时序图 对于C操作他先于A执行是一个空补偿此时C操作插入A记录时发现插入成功直接返回对于A操作他在C之后执行是一个悬挂此时A操作插入A记录时发现插入失败直接返回
这两种情况都会被子事务屏障拦截返回而不执行内部的业务操作。可以看到子事务屏障非常巧妙的解决了幂等、空补偿和悬挂三个问题。
竞态分析
上面分析了Try和Cancel的执行时间没有重叠的情况下能够解决空补偿和悬挂问题。如果出现了Try和Cancel执行时间重叠的情况我们看看会发生什么。
假设Try和Cancel并发执行Cancel和Try都会插入同一条记录gid-branchid-try由于唯一索引冲突那么两个操作中只有一个能够成功而另一个则会等持有锁的事务完成后返回。
情况1Try插入gid-branchid-try失败Cancel操作插入gid-branchid-try成功此时就是典型的空补偿和悬挂场景按照子事务屏障算法Try和Cancel都会直接返回情况2Try插入gid-branchid-try成功Cancel操作插入gid-branchid-try失败按照上述子事务屏障算法会正常执行业务而且业务执行的顺序是Try在Cancel前情况3Try和Cancel的操作在重叠期间又遇见宕机等情况那么至少Cancel会被dtm重试那么最终会走到情况1或2。
综上各种情况的详细论述子事务屏障能够在各种NP情况下保证最终结果的正确性。
优点
事实上子事务屏障有大量优点包括
两个insert判断解决空补偿、防悬挂、幂等这三个问题比其他方案的三种情况分别判断逻辑复杂度大幅降低dtm的子事务屏障是SDK层解决这三个问题业务完全不需要关心性能高对于正常完成的事务一般失败的事务不超过1%子事务屏障的额外开销是每个分支操作一个SQL比其他方案代价更小。
支持的存储
目前子事务屏障已经支持了
数据库包括 Mysql, Postgres, 以及与MysqlPostgres兼容的数据库缓存 Redis采用 Lua 脚本事务支持Mongo采用 Mongo 的事务支持
在子事务屏障的支持下您可以将Redis、Mongo和数据库的事务组合在一起形成一个全局事务。相关用法可以在dtm-examples里面找到
理论上支持事务的各种存储都可以轻松实现子事务屏障例如 TiKV 等如果较多用户有这样的需求我们将会快速支持。
对接orm库
barrier提供了sql标准接口但大家的应用通常都会引入更高级的orm库而不是裸用sql接口因此需要进行转化. 相关的对接参考对接ORM
1.7.2建表
create database if not exists dtm_barrier
/*!40100 DEFAULT CHARACTER SET utf8mb4 */
;
drop table if exists dtm_barrier.barrier;
create table if not exists dtm_barrier.barrier(id bigint(22) PRIMARY KEY AUTO_INCREMENT,trans_type varchar(45) default ,gid varchar(128) default ,branch_id varchar(128) default ,op varchar(45) default ,barrier_id varchar(45) default ,reason varchar(45) default comment the branch type who insert this record,create_time datetime DEFAULT now(),update_time datetime DEFAULT now(),key(create_time),key(update_time),UNIQUE key(gid, branch_id, op, barrier_id)
) ENGINE InnoDB DEFAULT CHARSET utf8mb4;17.3代码编写
package mainimport (database/sqlfmtlognet/httpossynctimegithub.com/gin-gonic/gingithub.com/lithammer/shortuuid/v3github.com/dtm-labs/client/dtmcligorm.io/driver/mysqlgorm.io/gormglog gorm.io/gorm/logger
)type UserAccount struct {ID int gorm:column:id;primary_keyUserId int gorm:user_idBalance float64 gorm:balanceTradingBalance float64 gorm:trading_balance
}func (UserAccount) TableName() string {return user_account
}var lock sync.Mutex// 转入和转出的时候都要加锁否则会出现并发问题
func SagaAdjustBalance(db *sql.Tx, uid int, amount float64) error {lock.Lock()defer lock.Unlock()if amount 0 {var balance float64db.QueryRow(select balance from dtm.user_account where user_id ?, uid).Scan(balance)if balance -amount {return fmt.Errorf(余额不足)}}_, err : db.Exec(update dtm.user_account set balance balance ? where user_id ?, amount, uid)if err ! nil {return err}return nil
}var db *gorm.DBfunc initDB() error {dsn : fmt.Sprintf(%s:%stcp(%s:%s)/%s?charsetutf8mb4parseTimeTruelocLocal,root,root,192.168.2.13,3306,dtm)newLogger : glog.New(log.New(os.Stdout, \r\n, log.LstdFlags), // io writer日志输出的目标前缀和日志包含的内容——译者注glog.Config{SlowThreshold: time.Second, // 慢 SQL 阈值LogLevel: glog.Info, // 日志级别IgnoreRecordNotFoundError: true, // 忽略ErrRecordNotFound记录未找到错误Colorful: false, // 禁用彩色打印},)var err errordb, err gorm.Open(mysql.Open(dsn), gorm.Config{Logger: newLogger,})if err ! nil {return err}return nil
}// 获取屏障
// MustBarrierFromGin 1
func MustBarrierFromGin(c *gin.Context) *dtmcli.BranchBarrier {ti, err : dtmcli.BarrierFromQuery(c.Request.URL.Query())fmt.Println(err)return ti
}// 服务发现 库存服务有5个
func main() {err : initDB()if err ! nil {panic(err)}r : gin.Default()r.POST(/SagaBTransIn, func(c *gin.Context) {barrier : MustBarrierFromGin(c) //1.生成一个屏障tx : db.Begin() //2.开启事务sourceTx : tx.Statement.ConnPool.(*sql.Tx)err : barrier.Call(sourceTx, func(tx1 *sql.Tx) error { //3.将业务逻辑翻到Call方法执行fmt.Println(开始转入)userID : 1err : SagaAdjustBalance(sourceTx, userID, 100) //4.修改gorm为 sql.Tx并使用原生sql查询(gorm支持不全)if err ! nil {fmt.Printf(转入失败:%s\r\n, err.Error())return err}return nil})if err ! nil {c.JSON(http.StatusOK, gin.H{code: 1, msg: err.Error()})return}return})r.POST(/SagaBTransInCom, func(c *gin.Context) {fmt.Println(转入失败 开始补偿)//userID : 1//err : SagaAdjustBalance(db, userID, -100)//if err ! nil {// fmt.Printf(转入补偿失败:%s\r\n, err.Error())// return//}fmt.Println(转入补偿成功)})r.POST(/SagaBTransOut, func(c *gin.Context) {barrier : MustBarrierFromGin(c)tx : db.Begin()sourceTx : tx.Statement.ConnPool.(*sql.Tx)err : barrier.Call(sourceTx, func(tx1 *sql.Tx) error {fmt.Println(开始转出)userID : 3err : SagaAdjustBalance(sourceTx, userID, -100)if err ! nil {if err.Error() 余额不足 {c.JSON(http.StatusConflict, gin.H{})}fmt.Printf(转出失败:%s\r\n, err.Error())c.JSON(500, gin.H{msg: err.Error()})}fmt.Println(转出成功)return nil})if err ! nil {c.JSON(http.StatusOK, gin.H{code: 1, msg: err.Error()})return}return})r.POST(/SagaBTransOutCom, func(c *gin.Context) {fmt.Println(转出失败 开始补偿)//userID : 3//err : SagaAdjustBalance(db, userID, 100)//if err ! nil {// fmt.Printf(转出补偿失败:%s\r\n, err.Error())// return//}fmt.Println(转出补偿成功)})r.GET(start, func(c *gin.Context) {req : gin.H{}dmtServer : http://127.0.0.1:36789/api/dtmsvrqsBusi : http://127.0.0.1:8089saga : dtmcli.NewSaga(dmtServer, shortuuid.New()).// 添加一个TransOut的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransOutComAdd(qsBusi/SagaBTransOut, qsBusi/SagaBTransOutCom, req).// 添加一个TransIn的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransInComAdd(qsBusi/SagaBTransIn, qsBusi/SagaBTransInCom, req)// 提交saga事务dtm会完成所有的子事务/回滚所有的子事务saga.WaitResult trueerr : saga.Submit()if err ! nil {c.JSON(500, gin.H{message: err.Error()})}c.JSON(200, gin.H{message: ok})})r.Run(:8089)
}
s\r\n, err.Error()) // return //} fmt.Println(“转出补偿成功”) })
r.GET(start, func(c *gin.Context) {req : gin.H{}dmtServer : http://127.0.0.1:36789/api/dtmsvrqsBusi : http://127.0.0.1:8089saga : dtmcli.NewSaga(dmtServer, shortuuid.New()).// 添加一个TransOut的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransOutComAdd(qsBusi/SagaBTransOut, qsBusi/SagaBTransOutCom, req).// 添加一个TransIn的子事务正向操作为url: qsBusi/TransOut 逆向操作为url: qsBusi/TransInComAdd(qsBusi/SagaBTransIn, qsBusi/SagaBTransInCom, req)// 提交saga事务dtm会完成所有的子事务/回滚所有的子事务saga.WaitResult trueerr : saga.Submit()if err ! nil {c.JSON(500, gin.H{message: err.Error()})}c.JSON(200, gin.H{message: ok})
})r.Run(:8089)}