seata源码解析:全局事务id是如何传递的?

网友投稿 1133 2022-09-02

seata源码解析:全局事务id是如何传递的?

seata源码解析:全局事务id是如何传递的?

通过Dubbo方式进行调用

想在Dubbo应用之间进行参数传递,其实非常简单。通过Dubbo提供的隐式传递功能即可实现。使用方式如下所示。

// 1. A服务设置参数RpcContext.getContext().setAttachment("name", "小明");// 2. A服务通过RPC调用B服务// 3. B服务可以获取到A服务设置的参数RpcContext.getContext().getAttachment("name");

所以我们可以把全局事务id放在RpcContext(rpc)中,然后在下游取出来。但是seata是从RootContext取出事务id的,我们如何先把事务id从RpcContext取出来然后再放到RootContext中呢?其实可以用到Dubbo Fillter

我们先看一下Dubbo Filter在哪个环节起作用,当我们调用远程方法的时候,实际上是通过代理对象调用的,将调用信息通过网络发送到服务端,服务端也是通过代理对象来接收请求的,然后根据请求调用服务端的方法,并返回结果。

而Dubbo Filter的作用则可以让用户在发送请求之前或者执行本地方法之前执行用户自定义的逻辑,增加可扩展性

所以我们可以在Consumer端通过Filter将事务id放在RpcContext中,在Provider端将事务id从RpcContext中取出来然后放入到RootContext中

@Activate(group = {DubboConstants.PROVIDER, DubboConstants.CONSUMER}, order = 100)public class ApacheDubboTransactionPropagationFilter implements Filter { private static final Logger LOGGER = LoggerFactory.getLogger(ApacheDubboTransactionPropagationFilter.class); @Override public Result invoke(Invoker invoker, Invocation invocation) throws RpcException { String xid = RootContext.getXID(); BranchType branchType = RootContext.getBranchType(); String rpcXid = getRpcXid(); String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE); if (LOGGER.isDebugEnabled()) { LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid); } boolean bind = false; // 事务id为空,说明是在客户端调用,将事务id设置进去 if (xid != null) { RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid); RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name()); } else { // rpcXid不为空,说明这是一个服务端的方法,将上游传递下来的事务id绑定到上下文 if (rpcXid != null) { // 将事务id绑定到上下文中 RootContext.bind(rpcXid); if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) { RootContext.bindBranchType(BranchType.TCC); } bind = true; if (LOGGER.isDebugEnabled()) { LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType); } } } try { return invoker.invoke(invocation); } finally { if (bind) { BranchType previousBranchType = RootContext.getBranchType(); // 将事务id解绑 String unbindXid = RootContext.unbind(); if (BranchType.TCC == previousBranchType) { RootContext.unbindBranchType(); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType); } if (!rpcXid.equalsIgnoreCase(unbindXid)) { LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid, rpcBranchType != null ? rpcBranchType : "AT", previousBranchType); if (unbindXid != null) { RootContext.bind(unbindXid); LOGGER.warn("bind xid [{}] back to RootContext", unbindXid); if (BranchType.TCC == previousBranchType) { RootContext.bindBranchType(BranchType.TCC); LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType); } } } } } } /** * get rpc xid * @return */ private String getRpcXid() { String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID); if (rpcXid == null) { rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase()); } return rpcXid; }}

通过Http方式进行调用

当通过feign调用时,客户端在请求head中设置xid服务端从请求head中获取到xid并设置到RootContext中

客户端在head中设置xid

// 这个类在spring-cloud-starter-alibaba-seata包中public class SeataFeignClient implements Client { SeataFeignClient(BeanFactory beanFactory, Client delegate) { this.delegate = delegate; this.beanFactory = beanFactory; } @Override public Response execute(Request request, Request.Options options) throws IOException { Request modifiedRequest = getModifyRequest(request); return this.delegate.execute(modifiedRequest, options); } private Request getModifyRequest(Request request) { String xid = RootContext.getXID(); if (StringUtils.isEmpty(xid)) { return request; } Map> headers = new HashMap<>(MAP_SIZE); headers.putAll(request.headers()); List seataXid = new ArrayList<>(); seataXid.add(xid); // 往head中设置xid headers.put(RootContext.KEY_XID, seataXid); return Request.create(request.method(), request.url(), headers, request.body(), request.charset()); }}

服务端在-中从请求head中获取到xid并设置到RootContext中

服务端在HandlerInterceptor#preHandle方法中设置xid,调用完毕再清除xid

public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class); @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { String xid = RootContext.getXID(); String rpcXid = request.getHeader(RootContext.KEY_XID); if (LOGGER.isDebugEnabled()) { LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid); } if (xid == null && rpcXid != null) { RootContext.bind(rpcXid); if (LOGGER.isDebugEnabled()) { LOGGER.debug("bind[{}] to RootContext", rpcXid); } } return true; } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) { if (RootContext.inGlobalTransaction()) { XidResource.cleanXid(request.getHeader(RootContext.KEY_XID)); } }}

我在刚学seata的过程中,就遇到过分布式事务不回滚的情况,此时,你就需要查看一下XID是否正常传递。当XID不能正常传递,就不会认为方法处于分布式事务中,自然就不会注册分支事务,当全局事务失败时就不会回滚

参考博客

[1]

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Redis源码解析:Redis有哪些慢操作?
下一篇:学PHP必知PHP岗位面试题(php面试试题)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~