RocketMQ源码解析:NameServer是如何存路由信息的?

网友投稿 1012 2022-10-07

RocketMQ源码解析:NameServer是如何存路由信息的?

RocketMQ源码解析:NameServer是如何存路由信息的?

NameServer的作用有哪些?

NameServer其实就是一个注册中心。在分布式系统中为了避免单点故障,所有的应用都是以集群的形式提供服务的,RocketMQ也不例外。在RocketMQ中NameServer的具体作用如下

Broker启动的时候会往所有的NameServer注册自己的信息,由此可以看出NameServer是一个CP系统,即放弃系统的一致性,保证可用性

NameServer的路由关系都保存在RouteInfoManager中的4个map中,路由注册,路由删除和路由发现基本都是操作这4个map

// topic -> broker信息private final HashMap> topicQueueTable;// brokerName -> 每个broker具体的ip地址private final HashMap brokerAddrTable;// private final HashMap> clusterAddrTable;// brokerAddr -> 具体的心跳信息private final HashMap brokerLiveTable;

假如说我们搭建了如下双主双从的集群,集群名字为rocketmq-cluster

序号

ip

架构模式

1

192.168.25.131

Master1

2

192.168.25.132

Master2

2

192.168.25.133

Slave1

2

192.168.25.134

Slave2

相关配置如下 master1

# 所属集群名字brokerClusterName=rocketmq-cluster# broker名字brokerName=broker-a# 0 表示 Master, 大于0 表示 SlavebrokerId=0

master2

brokerClusterName=rocketmq-clusterbrokerName=broker-bbrokerId=0

slave1

brokerClusterName=rocketmq-clusterbrokerName=broker-abrokerId=1

slave2

brokerClusterName=rocketmq-clusterbrokerName=broker-bbrokerId=1

假如说我们在rocketmq-cluster集群的broker-a和broker-b上创建一个topic,名字为myTopic,读写队列都默认为4个,消息的分布情况如下图所示

上面4个Map对应的值为

topicQueueTable

{ "myTopic": [ { "brokerName": "broker-a", "readQueueNums": 4, "writeQueueNums": 4, "perm": 6, "topicSynFlag": 0 }, { "brokerName": "broker-b", "readQueueNums": 4, "writeQueueNums": 4, "perm": 6, "topicSynFlag": 0 } ]}

brokerAddrTable

{ "broker-a": { "cluster": "rocketmq-cluster", "brokerName": "broker-a", "brokerAddrs": { "0": "192.168.25.131:10000", "1": "192.168.25.133:10000" } }, "broker-b": { "cluster": "rocketmq-cluster", "brokerName": "broker-a", "brokerAddrs": { "0": "192.168.25.132:10000", "1": "192.168.25.134:10000" } }}

clusterAddrTable

{ "rocketmq-cluster": [ "broker-a", "broker-a" ]}

brokerLiveTable

{ "192.168.25.131:10000": { "lastUpdateTimestamp": 1618750125000, "haServerAddr": "" }, "192.168.25.132:10000": { "lastUpdateTimestamp": 1618750126000, "haServerAddr": "" }, "192.168.25.133:10000": { "lastUpdateTimestamp": 1618750129000, "haServerAddr": "" }, "192.168.25.134:10000": { "lastUpdateTimestamp": 1618750127000, "haServerAddr": "" }}

源码分析

NameServer启动流程

当运行NamesrvStartup#main方法时,就能启动NameServer

public class NamesrvStartup { public static void main(String[] args) { main0(args); } public static NamesrvController main0(String[] args) { try { NamesrvController controller = createNamesrvController(args); start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }}

这个启动流程比较简单,就不追源码了,简单画图总结一下

启动NettyRemotingServer的时候,这几个关键的ChannelHandler需要注意一下

NettyEncoder:编码器,将RemotingCommand转为字节 NettyDecoder:解码器,将字节转为RemotingCommand, NettyServerHandler:用来处理接收到的请求

我们所有发出去的请求都会构建成RemotingCommand对象然后转为byte发送出去,同理接收到的所有请求都会转为RemotingCommand对象,这样方便我们在程序内部进行处理。即RemotingCommand是一个协议对象

协议格式如下所示

rocketmq remoting模块的一个继承关系图如下所示

RemotingService提供了一个远程服务最基本的方法,开启和关闭

RemotingServer在RemotingService的基础上又抽象出了一个服务提供者需要提供的方法 RemotingClient在RemotingService的基础上又抽象出了一个服务调用者需要提供的方法

NettyRemotingServer基于netty实现服务提供者 NettyRemotingClient基于netty实现服务调用者

NettyRemotingAbstract则是将NettyRemotingServer和NettyRemotingClient一些公共的功能抽象到出来

至于注册消息处理器是网络请求处理的常规套路,一个请求交给一个处理器来处理,每个处理器又绑定一个线程池

NettyRemotingServer处理请求的过程是一个典型的策略模式,针对不同的请求,用不同的NettyRequestProcessor来处理。

几种常见的NettyRequestProcessor如下,在后面的章节中,我们会详细分析这些类的实现

NettyRequestProcessor

作用

PullMessageProcessor

broker端处理消息拉取请求

SendMessageProcessor

broker端处理消息发送请求

QueryMessageProcessor

broker端处理消息查询请求

DefaultRequestProcessor

nameserver端处理所有类型的请求

NameServer端只注册了一个消息处理器DefaultRequestProcessor,所以所有的消息都会交给这个处理器来处理,如注册broker信息,获取topic的路由信息。之所以用一个NettyRequestProcessor来处理,是因为每种请求的实现逻辑并不复杂,没必要再拆分到 不同的NettyRequestProcessor中。

// DefaultRequestProcessorpublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); case RequestCode.REGISTER_BROKER: // 注册broker信息 Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINFO_BY_TOPIC: // 获取路由信息 return this.getRouteInfoByTopic(ctx, request); // 省略部分代码 default: break; } return null;}

路由注册

路由注册,路由删除,路由发现的过程都比较简单,都是操作上面说过的那5个map

borker在启动后会每隔30s向nameserver发送一次注册请求

// org.apache.rocketmq.broker.BrokerController#startthis.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 每隔30s向nameserver发送一次注册请求 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } }}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

根据请求的类型,我们可以发现最终执行到RouteInfoManager#registerBroker方法,然后将信息存在这4个map中

private final HashMap> clusterAddrTable;private final HashMap brokerAddrTable;private final HashMap/* Filter Server */> filterServerTable;private final HashMap brokerLiveTable;

路由删除

而NameServer在启动后每隔10s扫描brokerLiveTable,将当前时间和上次心跳时间lastUpdatetime进行比较,如果超过120s,则认为broker不可用,移除路由表中与该broker相关的所有信息

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }}, 5, 10, TimeUnit.SECONDS);

private final HashMap brokerLiveTable;private final HashMap brokerAddrTable;private final HashMap> clusterAddrTable;

路由发现

DefaultRequestProcessor#getRouteInfoByTopic

返回TopicRouteData对象,用如下3个map构建TopicRouteData对象

// topic -> broker信息private final HashMap> topicQueueTable;// brokerName -> 每个broker具体的ip地址private final HashMap brokerAddrTable;// brokerAddr -> Filter Server列表,用于类模式消息过滤private final HashMap/* Filter Server */> filterServerTable;

public class TopicRouteData extends RemotingSerializable { private String orderTopicConf; private List queueDatas; private List brokerDatas; private HashMap/* Filter Server */> filterServerTable;}

当进行消息发送和消费时都会用到TopicRouteData

参考博客

优秀系列 [1]https://kunzhao.org/docs/rocketmq/rocketmq-message-receive-flow/

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:微信小程序上传图片实战案例解析
下一篇:如何使用微信小程序做出图片上传(上传照片到微信小程序怎么操作)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~