seata源码解析:TM RM 客户端的初始化过程

网友投稿 623 2022-11-10

seata源码解析:TM RM 客户端的初始化过程

seata源码解析:TM RM 客户端的初始化过程

TM和RM初始化过程

上一篇文章说过,在Spring启动的过程中就会就会初始化TM和RM,建立与TC的长连接。TM,RM,TC都是用netty来处理网络连接的,初始化netty客户端和服务端的过程也非常类似。

本篇文章只分析TM的初始化过程,RM和TM复用了很多方法

// TmNettyRemotingClientpublic void init() { // registry processor // 注册消息处理器 registerProcessor(); if (initialized.compareAndSet(false, true)) { super.init(); }}

// AbstractNettyRemotingClientpublic void init() { // 不断连接seata server timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { clientChannelManager.reconnect(getTransactionServiceGroup()); } }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS); // 是否允许批量发送请求 if (NettyClientConfig.isEnableClientBatchSendRequest()) { mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD, MAX_MERGE_SEND_THREAD, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD)); mergeSendExecutorService.submit(new MergedSendRunnable()); } // 移除发送超时的消息 super.init(); clientBootstrap.start();}

clientBootstrap#start是netty启动的模版代码,注册消息处理器和处理消息的套路我在seata server启动的文章分析的比较详细,本篇文章就不深入分析了

建立和TC的连接

TM和RM每隔10s都要TC集群的每个地址建立长连接

// NettyClientChannelManager#reconnectvoid reconnect(String transactionServiceGroup) { List availList = null; try { // 获得事务分组对应的集群中每台机器地址 availList = getAvailServerList(transactionServiceGroup); } catch (Exception e) { LOGGER.error("Failed to get available servers: {}", e.getMessage(), e); return; } if (CollectionUtils.isEmpty(availList)) { RegistryService registryService = RegistryFactory.getInstance(); String clusterName = registryService.getServiceGroup(transactionServiceGroup); if (StringUtils.isBlank(clusterName)) { LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct", ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX, transactionServiceGroup); return; } if (!(registryService instanceof FileRegistryServiceImpl)) { LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName); } return; } // 遍历tc服务器地址 for (String serverAddress : availList) { try { // 建立与tc的连接 acquireChannel(serverAddress); } catch (Exception e) { LOGGER.error("{} can not connect to {} cause:{}",FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e); } }}

Channel acquireChannel(String serverAddress) { Channel channelToServer = channels.get(serverAddress); // 与当前serverAddress已经存在连接,直接返回 if (channelToServer != null) { channelToServer = getExistAliveChannel(channelToServer, serverAddress); if (channelToServer != null) { return channelToServer; } } if (LOGGER.isInfoEnabled()) { LOGGER.info("will connect to " + serverAddress); } // 与当前serverAddress不存在连接,新建连接 Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object()); synchronized (lockObj) { return doConnect(serverAddress); }}

private Channel doConnect(String serverAddress) { Channel channelToServer = channels.get(serverAddress); // 当前地址已经存在连接 if (channelToServer != null && channelToServer.isActive()) { return channelToServer; } Channel channelFromPool; try { NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress); NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey); if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) { RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage(); ((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds()); } channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress)); channels.put(serverAddress, channelFromPool); } catch (Exception exx) { LOGGER.error("{} register RM failed.",FrameworkErrorCode.RegisterRM.getErrCode(), exx); throw new FrameworkException("can not register RM,err:" + exx.getMessage()); } return channelFromPool;}

TM和RM客户端在启动的时候会和集群中的的每台seata server建立长连接,但是在后续发送请求的时候,比如开启全局事务,注册分支事务只会和其中的一台机器通讯,TM或RM首先根据事务分组找到集群列表,然后根据负载均衡策略从列表中选出一台机器发起请求。具体代码可参见AbstractNettyRemotingClient#sendSyncRequest方法

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:面试官:生产环境发生问题,你一般怎么排查?(网络篇)
下一篇:面试官:说说常用的排序算法呗
相关文章

 发表评论

暂时没有评论,来抢沙发吧~