柯桥网站建设书生商友,电商主页设计,做网站需要多钱,微网站无锡我们上篇简单梳理了下TM、RM的一些流程#xff08;离现在过得挺久的了#xff0c;这篇我们这篇来梳理下TC的内容。
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态#xff0c;驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定…我们上篇简单梳理了下TM、RM的一些流程离现在过得挺久的了这篇我们这篇来梳理下TC的内容。
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源与TC交谈以注册分支事务和报告分支事务的状态并驱动分支事务提交或回滚。
一、TC的定义
TC (Transaction Coordinator) - 事务协调者维护全局和分支事务的状态驱动全局事务提交或回滚
这个是官网给出的定义就是说其实全局事务的协调。具体来说例如当某个TM(事务管理器)需要开启事务的时候就需要一个协调者来驱动全局事务或提交或回滚这个概念我们开始的时候很容易与TM混淆。我们看到这两个一个是开始一个是驱动具体通过demo来说。
1、一些基本逻辑信息
用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持
仓储服务(Storage)对给定的商品扣除仓储数量。订单服务(Order)根据采购需求创建订单。帐户服务(Account)从用户帐户中扣除余额。业务服务(Business)发起整个业务逻辑。 这个官网给出的例子就是Business服务发起本次业务逻辑也就是通过GlobalTransactional来开始全局事务然后后面的也就是TM的一些逻辑处理这个我们上篇有说过就是去调TC表示自己要开始全局事务然后TC就会生成全局事务的XID,之后其调其他的分支事务Stock、Order的话就带上这个全局事务XID。然后这里TC就是一个协调者的角色其他的TM、RM都想TC注册。例如如果后面如果GlobalTransactional方法正常执行的话就事务提交其就向TC发送消息然后TC去协调分支事务的提交。
下面我们就来具体看下在TC中这些注册、提交、回滚等的操作。
二、TC的一些逻辑处理
1、前置内容
首先在TC这边关于对应请求的处理入口都是RemotingProcessor接口的具体实现 其的子类 2、RM通道注册
RM(资源管理器)注册。我们上面有介绍过Seata会代理服务关于数据库的操作。然后其数据源初始化的时候就会向TC注册。 但我们需要注意这个目前还只是channel的通道连接不是具体的RM资源注册具体的RM事务资源注册是在全局事务开始然后操作数据库的时候。
对应到TC这边就是RegRmProcessor在处理这个主要是保存Channel通道与Netty相关的内容目前还没具体研究但影响不大。先不深入这个。 3、TM通道注册 这个与前面的RM是类似的。
4、ServerOnRequestProcessor
然后这个是一个核心的处理类其用来处理各种类型的信息。 我们可以看到像分支注册、全局事务的开始、回滚都是这个Process处理的。
public class ServerOnRequestProcessor implements RemotingProcessor {private static final Logger LOGGER LoggerFactory.getLogger(ServerOnRequestProcessor.class);private RemotingServer remotingServer;private TransactionMessageHandler transactionMessageHandler;public ServerOnRequestProcessor(RemotingServer remotingServer, TransactionMessageHandler transactionMessageHandler) {this.remotingServer remotingServer;this.transactionMessageHandler transactionMessageHandler;}Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {........if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format(close a unhandled connection! [%s], ctx.channel().toString()));}}}private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message rpcMessage.getBody();RpcContext rpcContext ChannelManager.getContextFromIdentified(ctx.channel());...........if (message instanceof MergedWarpMessage) {AbstractResultMessage[] results new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];for (int i 0; i results.length; i) {final AbstractMessage subMessage ((MergedWarpMessage) message).msgs.get(i);results[i] transactionMessageHandler.onRequest(subMessage, rpcContext);}MergeResultMessage resultMessage new MergeResultMessage();resultMessage.setMsgs(results);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);} else {// the single send request messagefinal AbstractMessage msg (AbstractMessage) message;AbstractResultMessage result transactionMessageHandler.onRequest(msg, rpcContext);remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}} 然后处理的核心就是DefaultCoordinator上面的AbstractRMHandler是RM的处理 5、DefaultCoordinator
这里的主要用来处理TC的逻辑就是TCInboundHandler接口 通过这些Request我们也能找到其主要是用来处理全局事务的开启、分支事务的注册、全局事务的提交、回滚、状态这些。下面我们就来主要看下看下GlobalBeginRequest、BranchRegisterRequest、GlobalCommitRequest的处理。
1、GlobalBeginRequest的处理 通过这里我们可以找到在GlobalBegin开启全局事务的主要处理就是生成全局事务Xid返回。 public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName,int timeout) {GlobalSession session new GlobalSession(applicationId, txServiceGroup, txName, timeout);return session;
}public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {this.transactionId UUIDGenerator.generateUUID();this.status GlobalStatus.Begin;this.applicationId applicationId;this.transactionServiceGroup transactionServiceGroup;this.transactionName transactionName;this.timeout timeout;this.xid XID.generateXID(transactionId);}这里是全局事务的session然后再通过session.begin来进行对应处理例如将全局事务session保存到数据库 我们可以看到这里有多种处理方式、例如保存到文件、redis这些。目前我们是保存到数据库中处理的就是DataBaseSessionManager
public class DataBaseSessionManager extends AbstractSessionManagerimplements Initialize {............Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {if (StringUtils.isBlank(taskName)) {boolean ret transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);if (!ret) {throw new StoreException(addGlobalSession failed.);}} else {boolean ret transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);if (!ret) {throw new StoreException(addGlobalSession failed.);}}} 这里之后就是DataBaseTransactionStoreManager处理入库逻辑 2、BranchRegisterRequest分支事务注册的处理
再之后我们来看下分支事务的注册处理逻辑。分支事务的注册就是每个RM对应开始操作DB的时候这个RM相关的内容上篇有提到过向TC发起分支事务注册代码逻辑就是放在seata代理的jdbc的ConnectionProxy 然后TC这边就会处理这个请求可以看到这里就是生成并返回分支事务ID 同时会将当前分支事务保存到数据库中 3、GlobalCommitRequest全局事务提交的处理
我们上篇有提到再TM发起全局事务后如果各个分支都执行成功后其会向TC发起全局事务的提交。
对于我们官方给到的demo。我们看下关于事务相关的表的数据。 然后各个分支都执行完成后由TM来提交全局事务 在TM提交后我们来看下TC的处理 
这里我们主要需要注意SessionHelper.forEach(globalSession.getSortedBranches(), branchSession对于每个branchSession的循环调用也就是每个分支事务的session在这里就会循环然后调用每个分支进行分支事务的本地提交也就是RM本地提交的处理。 然后RM的处理主要就是RMHandlerAT这个类在处理因为我们当前是AT模式 然后ResourceManager就是DataSourceManager 这里主要是添加到异步队列中。 然后主要处理就是删除数据库的undo记录就我们全局事务成功了不需要使用undo来回滚本地事务了。