seata源码解析:seata-server启动时都做了哪些操作?

网友投稿 987 2022-10-07

seata源码解析:seata-server启动时都做了哪些操作?

seata源码解析:seata-server启动时都做了哪些操作?

seata-server的主要功能有哪些?

当我们启动一个seata-server的时候,只需要执行一下seata-server.sh脚本即可,这个脚本其实就是调用了io.seata.server.Server的main方法,我们来看一下这个main方法做了哪些操作?

public class Server { public static void main(String[] args) throws IOException { // get port first, use to logback.xml // 获取监听的端口 int port = PortHelper.getPort(args); System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port)); // create logger final Logger logger = LoggerFactory.getLogger(Server.class); if (ContainerHelper.isRunningInContainer()) { logger.info("The server is running in container."); } //initialize the parameter parser //Note that the parameter parser should always be the first line to execute. //Because, here we need to parse the parameters needed for startup. // 解析启动和配置文件中的各种参数 ParameterParser parameterParser = new ParameterParser(args); //initialize the metrics // 监控相关 MetricsManager.get().init(); System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode()); ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(), NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()), new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy()); NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); //server port nettyRemotingServer.setListenPort(parameterParser.getPort()); // 将serverNode作为雪花算法中的workerId UUIDGenerator.init(parameterParser.getServerNode()); //log store mode : file, db, redis // SessionHolder负责事务日志的持久化存储 // 设置存储模式,有三种可选类型,file,db,redis SessionHolder.init(parameterParser.getStoreMode()); // 创建事务协调器 DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer); coordinator.init(); nettyRemotingServer.setHandler(coordinator); // register ShutdownHook ShutdownHook.getInstance().addDisposable(coordinator); ShutdownHook.getInstance().addDisposable(nettyRemotingServer); //127.0.0.1 and 0.0.0.0 are not valid here. if (NetUtil.isValidIp(parameterParser.getHost(), false)) { XID.setIpAddress(parameterParser.getHost()); } else { XID.setIpAddress(NetUtil.getLocalIp()); } XID.setPort(nettyRemotingServer.getListenPort()); try { // 启动nettyServer并阻塞在这里 nettyRemotingServer.init(); } catch (Throwable e) { logger.error("nettyServer init error:{}", e.getMessage(), e); System.exit(-1); } System.exit(0); }}

启动时的操作可以总结为如下

初始化id生成器,用来生成全局事务id和分支事务id。id生成器的底层用到了雪花算法设置存储模式,用来持久化存储事务,有三种模式db,redis,file创建事务协调器并初始化启动nettyRemotingServer

我们按照消息的处理流程分3篇文章来分析seata-server端的源码

启动nettyServer负责与 TM RM 进行通信事务协调器接收事务相关的消息然后调用事务管理器做相应的操作事务管理器通过SessionManager将事务状态进行持久化存储

启动NettyRemotingServer

public void init() { // registry processor // 注册消息处理器 registerProcessor(); if (initialized.compareAndSet(false, true)) { super.init(); }}

首先注册消息处理器,消息处理器顾名思义是用来处理消息的,根据消息的不同类型选择不同的消息处理器来处理消息,典型的策略模式,提高了程序的可扩展性

每种消息类型和其对应的处理器关系如下

这种映射关系其实是保存在map中的

// AbstractNettyRemotingprotected final HashMap> processorTable = new HashMap<>(32);

可以看到除了保存消息类型和其对应的处理器,还有一个ExecutorService(线程池),当消息的处理过程需要在IO线程上执行时,对应的线程池为空,当线程池不为空时说明消息的处理过程会在线程池中进行

// AbstractNettyRemotingServerpublic void init() { // 删除超时的消息 super.init(); serverBootstrap.start();}

serverBootstrap#start方法是一个标准的netty启动代码,设置参数,设置ChannelHandler,然后同步阻塞。我们只关心ChannelHandler的处理逻辑

NettyRemotingServer在启动的过程中设置了如下4个ChannelHandler

IdleStateHandler:处理心跳 ProtocolV1Decoder:消息解码器 ProtocolV1Encoder:消息编码器 AbstractNettyRemotingServer.ServerHandler:处理各种消息

@ChannelHandler.Sharableclass ServerHandler extends ChannelDuplexHandler { /** * 处理读取到的消息 */ @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } processMessage(ctx, (RpcMessage) msg); }}

可以看到ServerHandler类上有@ChannelHandler.Sharable注解,表明所有的连接都会共用这一个ChannelHandler,这样当消息处理的慢时,就会降低并发,所以有的消息处理过程需要放到线程池中进行

消息的处理过程用了策略模式,根据消息类型找到对应的消息处理器RemotingProcessor

// AbstractNettyRemotingprotected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody())); } Object body = rpcMessage.getBody(); if (body instanceof MessageTypeAware) { MessageTypeAware messageTypeAware = (MessageTypeAware) body; // 根据消息类型获取对应的处理器 final Pair pair = this.processorTable.get((int) messageTypeAware.getTypeCode()); if (pair != null) { // 对应的处理器设置了线程池,则放到线程池中执行 if (pair.getSecond() != null) { try { pair.getSecond().execute(() -> { try { pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } finally { MDC.clear(); } }); } catch (RejectedExecutionException e) { // 线程池拒绝策略之一,抛出RejectedExecutionException LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), "thread pool is full, current max pool size is " + messageExecutor.getActiveCount()); if (allowDumpStack) { String name = ManagementFactory.getRuntimeMXBean().getName(); String pid = name.split("@")[0]; int idx = new Random().nextInt(100); try { Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log"); } catch (IOException exx) { LOGGER.error(exx.getMessage()); } allowDumpStack = false; } } } else { // 对应的处理器没有设置线程池,则直接执行 try { pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } } } else { LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode()); } } else { LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body); }}

好了NettyRemotingServer的启动过程我们就分析完了,下篇文章我们来看消息的处理过程

参考博客

seata server [1][2]比较详细 好的系列文章

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:JVM实战:JVM运行时数据区
下一篇:微信小程序上传图片实战案例解析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~