可以做科学模拟实验的网站,做的ASP网站手机,衡水做网站的地方,加强网络舆情监测原创/朱季谦
曾经在SpringCloudAlibaba的Seata分布式事务搭建过程中#xff0c;跨节点通过openfeign调用不同服务时#xff0c;发现全局事务XID在当前节点也就是TM处#xff0c;是正常能通过RootContext.getXID()获取到分布式全局事务XID的#xff0c;但在下游节点就出现获…原创/朱季谦
曾经在SpringCloudAlibaba的Seata分布式事务搭建过程中跨节点通过openfeign调用不同服务时发现全局事务XID在当前节点也就是TM处是正常能通过RootContext.getXID()获取到分布式全局事务XID的但在下游节点就出现获取为NULL的情况导致全局事务失效出现异常时无法正常回滚。
当时看了一遍源码才知道问题所在故而把这个过程了解到的分布式事务XID是如何跨节点传输的原理记录下来。
本文默认是使用Seata的AT模式。
在那一次的搭建过程中我设置了三个节点分别是订单节点order商品库存节点product账户余额节点account模拟购买下单逻辑在分布式环境下生成一份订单时通过openfeign远程扣减库存最后同样通过openfeign去扣减账户当然实际场景远不止这些这里只是简单模拟这个过程。
正常情况下其中有一步出错整个全局分布式事务就会进行回滚。 这三个节点在Seata AT模式下流程图是这样的order充当TM/RM角色product和充当RM角色按照在Linux服务器上的Seats Service就充当TC角色。 首先是最初调用订单节点order业务逻辑——
Override
Transactional
GlobalTransactional(name zjq-create-order,rollbackFor Exception.class)
public RestResponse createOrder(Orders order) {log.info(当前的XID RootContext.getXID());log.info(------开始新建订单);//1、新建订单orderMapper.insert(order);//2、扣减库存productService.decrease(order.getProductId(),order.getCount());//3、扣减账户accountService.decrease(order.getUserId(),order.getMoney());......
}在Seataorder充当了TM角色负责生成一个全局事务注册到TCTC会返回一个全局事务ID给TM。
在该全局事务流程里每一个分支模块理应都能获取到这一个共同的全局事务ID在该全局事务ID统筹下完成分支事务的提交或者回滚。
通过RootContext.getXID()获取到一个全局事务ID为192.168.1.152:8091:458311058765479936 创建订单成功后就会执行扣减库存操作productService.decrease(order.getProductId(),order.getCount())。
在该代码案例里productService.decrease()内部是通过openfeign远程去调用的——
FeignClient(contextId remoteProductService,value zjq-product,fallbackFactory RemoteProductServiceFallbackFactory.class)
public interface RemoteProductService {PostMapping(value /product/decrease)RestResponse decrease(RequestParam(productId)Long productId, RequestParam(count) Integer count);
}最终decrease的服务层伪代码大概如下——
Transactional(propagation Propagation.REQUIRES_NEW)
public int decrease(Long productId, Integer count) {log.info(当前的XID RootContext.getXID());log.info(----------开始查询商品是否存在);log.info(----------开始扣减库存); ......
}然而到这一步发现了一个问题这里获取的全局事务ID为null—— 这说明了一个问题TM开启了一个全局事务后已经从TC那里获取到了一个全局事务ID但远程传送给product这个RM资源管理器后没有传送成功同理另一个分支事务account模块的同样获取到的全局事务ID为null。
基于这样一个现象我就开始尝试研究了一下全局事务是如何在Openfeign跨节点环境进行传输和获取的主要分为TM节点的全局事务ID发送和远程RM节点的接收。
一、TM节点的全局事务ID发送
通过debug代码去阅读在调用 productService.decrease(order.getProductId(),order.getCount())时内部做了反射调用执行了一系列方案调用调用核心过程如下—— 本文只需要关注在整个HTTP调用过程全局事务ID是如何放进来的这个调用链涉及的类及方法在后续学习中再进一步研究。
最终在SeataFeignClient的execute方法里可以看到以下源码——
public Response execute(Request request, Request.Options options) throws IOException {Request modifiedRequest this.getModifyRequest(request);return this.delegate.execute(modifiedRequest, options);
}其中在Request modifiedRequest this.getModifyRequest(request)这行代码里对请求头做了一些补充操作。
private Request getModifyRequest(Request request) {String xid RootContext.getXID();if (StringUtils.isEmpty(xid)) {return request;} else {MapString, CollectionString headers new HashMap(16);headers.putAll(request.headers());ListString seataXid new ArrayList();seataXid.add(xid);headers.put(TX_XID, seataXid);return Request.create(request.method(), request.url(), headers, request.body(), request.charset());}
}debug到这里可以看到这里将一个全局事务ID存储到了headers里—— 这个headers其实是HTTP组装的请求头可以看到这里是将全局事务ID放到了HTTP请求头里传送给了远程机器。
Request(HttpMethod method, String url, MapString, CollectionString headers, Body body, RequestTemplate requestTemplate) {this.httpMethod (HttpMethod)Util.checkNotNull(method, httpMethod of %s, new Object[]{method.name()});this.url (String)Util.checkNotNull(url, url, new Object[0]);this.headers (Map)Util.checkNotNull(headers, headers of %s %s, new Object[]{method, url});this.body body;this.requestTemplate requestTemplate;
}通过debug可以发现在HTTP组装过程中已经将全局事务ID放到了请求头里说明在HTTP发生成功后是会携带全局事务到远程product模块的但是为何product模块打印RootContext.getXID()得到的是null呢
二、跨节点分支事务获取全局事务ID
HTTP请求传送到远程product模块后在调用具体的Controller前会流转到MVC进行拦截转发在这过程当中涉及到seata分布式事务时理应会有这样一个叫TransactionPropagationInterceptor的拦截器用来处理分布式事务的传播有两个方法分别是preHandle()和afterCompletion()暂时只需要关注preHandle方法即可
preHandle()
在处理远程请求之前被调用在该方法中通过RootContext.getXID()获取到当前线程上下文中的全局事务ID和通过request.getHeader(TX_XID)获取HTTP请求头中的事务ID。这里的请求头里的事务ID正是前面发送HTTP时放到请求头里的。
若RootContext.getXID()获取到当前线程上下文中的全局事务ID为空并且HTTP请求头的事务ID不为空就会将该HTTP请求头里的事务ID绑定到该线程上下文当中用于确保全局事务的传播和关联。
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {String xid RootContext.getXID();String rpcXid request.getHeader(TX_XID);if (LOGGER.isDebugEnabled()) {LOGGER.debug(xid in RootContext[{}] xid in HttpContext[{}], xid, rpcXid);}if (StringUtils.isBlank(xid) StringUtils.isNotBlank(rpcXid)) {RootContext.bind(rpcXid);if (LOGGER.isDebugEnabled()) {LOGGER.debug(bind[{}] to RootContext, rpcXid);}}return true;
}进入到bind方法当中可以看到这里是HTTP请求头里的事务ID缓存到了 CONTEXT_HOLDER.put(TX_XID, xid)它本质其实是一个ThreadLocal可以存储线程隔离的变量。
public static void bind(Nonnull String xid) {if (StringUtils.isBlank(xid)) {if (LOGGER.isDebugEnabled()) {LOGGER.debug(xid is blank, switch to unbind operation!);}unbind();} else {MDC.put(X-TX-XID, xid);if (LOGGER.isDebugEnabled()) {LOGGER.debug(bind {}, xid);}CONTEXT_HOLDER.put(TX_XID, xid);}}缓存成功后下一次通过 RootContext.getXID()就能获取到该线程缓存的全局事务ID了 RootContext.getXID()本质就是——
public static String getXID() {return (String)CONTEXT_HOLDER.get(TX_XID);
}在本次搭建seata环境中发现该TransactionPropagationInterceptor过滤器当中的preHandle方法一直没有执行这就造成全局事务当中远程跨环境的分支事务节点一直无法获取到全局事务ID。
于是我尝试手动将该TransactionPropagationInterceptor拦截器加入到Spring MVC流程中——
Configuration
public class WebMvcInterceptorsConfig extends WebMvcConfigurationSupport {Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new TransactionPropagationInterceptor());}Beanpublic ServerCodecConfigurer serverCodecConfigurer() {return ServerCodecConfigurer.create();}}重新运行后这次拦截器TransactionPropagationInterceptor终于生效里可以debug到了preHandle方法里将HTTP请求头的全局事务ID取出然后通过RootContext.bind(rpcXid)缓存到线程上下文当中—— 这时product节点终于能拿到从TM远程传送过来的全局事务ID了—— 最后总结一下全局事务ID在SpringCloudAlibaba Seata在Openfeign跨节点环境里的传送方式是将该全局事务ID放入到HTTP请求头当中远程传送给分支事务节点各分支事务节点会在TransactionPropagationInterceptor拦截器当中取出HTTP请求头大全局事务ID通过RootContext.bind(rpcXid)将全局事务ID缓存到线程上下文里这样分支事务就可以在其执行过程当中获取到全局事务ID啦。