微信开发中 ACCESS TOKEN 过期失效的解决方案详解
623
2022-11-10
seata源码解析:TM RM 客户端的初始化过程
TM和RM初始化过程
上一篇文章说过,在Spring启动的过程中就会就会初始化TM和RM,建立与TC的长连接。TM,RM,TC都是用netty来处理网络连接的,初始化netty客户端和服务端的过程也非常类似。
本篇文章只分析TM的初始化过程,RM和TM复用了很多方法
// TmNettyRemotingClientpublic void init() { // registry processor // 注册消息处理器 registerProcessor(); if (initialized.compareAndSet(false, true)) { super.init(); }}
// AbstractNettyRemotingClientpublic void init() { // 不断连接seata server timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { clientChannelManager.reconnect(getTransactionServiceGroup()); } }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS); // 是否允许批量发送请求 if (NettyClientConfig.isEnableClientBatchSendRequest()) { mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); mergeSendExecutorService.submit(new MergedSendRunnable()); } // 移除发送超时的消息 super.init(); clientBootstrap.start();}
clientBootstrap#start是netty启动的模版代码,注册消息处理器和处理消息的套路我在seata server启动的文章分析的比较详细,本篇文章就不深入分析了
建立和TC的连接
TM和RM每隔10s都要TC集群的每个地址建立长连接
// NettyClientChannelManager#reconnectvoid reconnect(String transactionServiceGroup) { List
Channel acquireChannel(String serverAddress) { Channel channelToServer = channels.get(serverAddress); // 与当前serverAddress已经存在连接,直接返回 if (channelToServer != null) { channelToServer = getExistAliveChannel(channelToServer, serverAddress); if (channelToServer != null) { return channelToServer; } } if (LOGGER.isInfoEnabled()) { LOGGER.info("will connect to " + serverAddress); } // 与当前serverAddress不存在连接,新建连接 Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object()); synchronized (lockObj) { return doConnect(serverAddress); }}
private Channel doConnect(String serverAddress) { Channel channelToServer = channels.get(serverAddress); // 当前地址已经存在连接 if (channelToServer != null && channelToServer.isActive()) { return channelToServer; } Channel channelFromPool; try { NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress); NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey); if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) { RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage(); ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds()); } channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress)); channels.put(serverAddress, channelFromPool); } catch (Exception exx) { LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx); throw new FrameworkException("can not register RM,err:" + exx.getMessage()); } return channelFromPool;}
TM和RM客户端在启动的时候会和集群中的的每台seata server建立长连接,但是在后续发送请求的时候,比如开启全局事务,注册分支事务只会和其中的一台机器通讯,TM或RM首先根据事务分组找到集群列表,然后根据负载均衡策略从列表中选出一台机器发起请求。具体代码可参见AbstractNettyRemotingClient#sendSyncRequest方法
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~