《RockerMQ源码分析》客户端是如何发送心跳到Broker的?含心跳包数据来源

网友投稿 417 2022-11-10

《RockerMQ源码分析》客户端是如何发送心跳到Broker的?含心跳包数据来源

《RockerMQ源码分析》客户端是如何发送心跳到Broker的?含心跳包数据来源

一、前言

RocketMQ所有的心跳机制: 1)Producer端:

Producer与NameSrv随机建立长连接,定期从NameSrv获取topic路由信息;Producer与Broker的Master结点建立长连接,用于发送消息;此外Producer与Master维持了一个心跳。

2)ConSumer端:

Conumser与NamseSrv随机建立长连接,定期从NameSrv获取topic路由信息;Consumer与Broker的Master和Slave结点建立长连接,用于订阅消息;此外Consumer与Master和slave维持一个心跳。

二、客户端发送心跳

(1)Producer和Consumer通过​​MQClientInstance​​​的​​sendHeartbeatToAllBrokerWithLock()​​方法实现发送心跳请求;

public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { // 发送心跳包 this.sendHeartbeatToAllBroker(); // 上传类过滤器源码 this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed. [{}]", this.clientId); }}

我们看到​​sendHeartbeanToAllBrokerWithLock()​​​方法中在​​sendHeartbeatToAllBroker()​​之前加了锁,这是因为点啥嘞?

1、RocketMQ对底层进行通信的​​MQClientInstance​​​进行了复用,即在​​同一个jvm​​​里的不同的Consumer下面使用的都是同一个​​MQClientInstance​​​。 2、既然是复用的,那么就可能存在并发,因此这里进行了上锁操作。 3、所以这里是为了防止​​​心跳错乱​​。

(2)另外在MQClientInstance启动时会启动会调用startScheduledTask()方法,开始一堆定时任务,其中包括:定期默认每30s发送心跳信息到Broker。

MQClientInstance类源码:

public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: ....... /** * 1.定时2min拉取最新的nameServer信息 * 2.默认定时30秒拉取最新的broker和topic路由信息(可配置) * 3.默认定时30s向broker发送心跳包(可配置) * 4.默认定时5s持久化consumer的offset(可配置) * 5.定时1分钟,动态调整线程池线程数量 */ this.startScheduledTask(); ....... } }}

​​startScheduledTask()​​方法启动定时任务:

private void startScheduledTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 清理下线的broker MQClientInstance.this.cleanOfflineBroker(); // 向所有的broker发送心跳 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);}

我们来看看sendHeartbeatToAllBroker()做了什么?

1、MQClientInstance#sendHeartbeatToAllBroker()

1.准备心跳信息​​HeartbeatData​​​,如果心跳信息为空,直接返回; 2. 遍历所有的Broker,​​​尝试向所有的Broker发送心跳包​​;注意:根据客户端的类型(Producer、Consumer)不同,发送到的Broker对象会又差别。 1、如果启动的是​​生产者​​,那么心跳保证消费者的相关信息为空,这时​​只会向Broker的Mater节点发送心跳​​;因为RocketMQ中主要Master的Broker才能处理写请求。 2、如果启动的是消费者,则会向所有的Broker发送心跳。

private void sendHeartbeatToAllBroker() { // 心跳包--包装类,主要是Producer和Consumer相关信息 final HeartbeatData heartbeatData = this.prepareHeartbeatData(); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); // 生产者和消费者数据都为空时 if (producerEmpty && consumerEmpty) { log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId); return; } // broker列表不为空时 // todo brokerAddrTable是什么时候初始化的? // 1)当topic的路由信息改变后,会往brokerAddrTable中添加数据 if (!this.brokerAddrTable.isEmpty()) { // 统计发送心跳的次数 long times = this.sendHeartbeatTimesTotal.getAndIncrement(); // 遍历broker列表 Iterator>> it = this.brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry> entry = it.next(); String brokerName = entry.getKey(); // 获取一个broker地址 HashMap oneTable = entry.getValue(); if (oneTable != null) { for (Map.Entry entry1 : oneTable.entrySet()) { Long id = entry1.getKey(); String addr = entry1.getValue(); if (addr != null) { // 消费数据为空 并且 broker不是Mater节点时,不发送心跳。 // 因为Producer只需要与Mater维护心跳即可 if (consumerEmpty) { // broker不是mater节点 if (id != MixAll.MASTER_ID) continue; } try { // 发送心跳 // todo MQClientAPIImpl是什么时候初始化的? // 1)实例化MQClientInstance时初始化mQClientAPIImpl int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); if (!this.brokerVersionTable.containsKey(brokerName)) { this.brokerVersionTable.put(brokerName, new HashMap(4)); } this.brokerVersionTable.get(brokerName).put(addr, version); if (times % 20 == 0) { log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); log.info(heartbeatData.toString()); } } catch (Exception e) { if (this.isBrokerInNameServer(addr)) { log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e); } else { log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr, e); } } } } } } }}

RocketMQ中客户端和服务端的通信通过Netty实现,这里我们的客户端是Consumer/Producer、服务端是Broker。

我们先看一下RocketMQ是如何准备心跳包数据的?

2、心跳包HeartBeatData

心跳包内容包括:客户端id、生产者信息、消费者信息;

一般情况下生产者信息和消费者信息是互斥的,producerDataSet和consumerDataSet会有一个为空。但如果一个应用既是生产者,也是消费者,那么这种情况下producerDataSet和consumerDataSet都不为空。

public class HeartbeatData extends RemotingSerializable { // consumer 客户端ID private String clientID; /** * 生产者信息 * 1. groupName */ private Set producerDataSet = new HashSet(); /** * 消费者信息 * 1. groupName * 2. 消费类型:push/pull * 3. 消息传播方式:集群还是广播 * 4. 启动消费者时从哪开始消费 * 5. 订阅信息:过滤消息相关标签、SQL规则。 */ private Set consumerDataSet = new HashSet(); ......}

我们分别看一下生产者信息和消费者信息都包括什么?

1)生产者信息ProducerData

不能再简单了,就一个生产者组的名称。

public class ProducerData { /** * 生产组名称 */ private String groupName;}

2)消费者信息ConsumerData

ConsumerData消费者信息包括:

​​groupName​​​​消费类型​​:push/pull​​消息传播方式​​:集群还是广播​​启动消费者时从哪开始消费​​​​订阅信息SubscriptionData​​:过滤消息相关标签、SQL规则等。

public class ConsumerData { /** * 消费者名称 */ private String groupName; /** * 消费类型:push/pull */ private ConsumeType consumeType; /** * 消息传播方式:广播 / 集群消费 */ private MessageModel messageModel; /** * 从哪开始消费:从一开始偏移量、从最后偏移量、按时间戳消费 */ private ConsumeFromWhere consumeFromWhere; /** * 订阅数据:过滤消息相关标签、SQL规则等 */ private Set subscriptionDataSet = new HashSet();}

在我们日常写代码时,这些属性很常见、经常会配置到。​​SubscriptionData​​​是我们的消费者订阅信息,其内容如下:

public class SubscriptionData implements Comparable { // 表示订阅该topic下所有类型消息 public final static String SUB_ALL = "*"; // 是否开启类过滤模式,默认不开启 private boolean classFilterMode = false; // 订阅的topic private String topic; // 订阅表达式 private String subString; // 如果是tag过滤模式,这里是tag列表 private Set tagsSet = new HashSet(); // 如果是tag过滤模式,这里是tag对应的hashCode列表 private Set codeSet = new HashSet(); private long subVersion = System.currentTimeMillis(); // 表达式类型,有TAG和SQL两种,默认是Tag private String expressionType = ExpressionType.TAG; @JSONField(serialize = false) // 如果开启了类过滤模式,这里存放过滤类java代码 private String filterClassSource;}

这里面,我们平时最常用到的是​​topic​​​、​​subString​​。

// topic , 过滤器 * 表示根据SQL不过滤、TAG-A || TAG-B表示根据TAG过滤consumer.subscribe("saint-study-topic", "TAG-A || TAG-B");

​​创建订阅信息​​​的时候,​​subString​​​会被​​分割​​​成TAG-A 、TAG-B,然后保存至​​tagsSet​​​集合里,tag的​​hashcode​​​会保存到​​codeSet​​集合里。

那么心跳包数据是如何组装的?我接着来看。

3、MQClientInstance#prepareHeartbeatData()

private HeartbeatData prepareHeartbeatData() { HeartbeatData heartbeatData = new HeartbeatData(); // clientID heartbeatData.setClientID(this.clientId); // Consumer for (Map.Entry entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { ConsumerData consumerData = new ConsumerData(); consumerData.setGroupName(impl.groupName()); consumerData.setConsumeType(impl.consumeType()); consumerData.setMessageModel(impl.messageModel()); consumerData.setConsumeFromWhere(impl.consumeFromWhere()); consumerData.getSubscriptionDataSet().addAll(impl.subscriptions()); consumerData.setUnitMode(impl.isUnitMode()); heartbeatData.getConsumerDataSet().add(consumerData); } } // Producer for (Map.Entry entry : this.producerTable.entrySet()) { MQProducerInner impl = entry.getValue(); if (impl != null) { ProducerData producerData = new ProducerData(); producerData.setGroupName(entry.getKey()); heartbeatData.getProducerDataSet().add(producerData); } } return heartbeatData;}

1)以准备Consumer的心跳信息来看:

其遍历​​MQClientInstance​​​的属性​​consumerTable.entrySet()​​​,获取到​​MQConsumerInner​​信息,然后将其填充到consumerData中。DefaultMQPushConsumerImplement#start()方法–​​消费者启动​​​时,会调用MQClientInstance#​​registerConsumer()​​​方法,将消费者信息(含订阅信息)​​填充​​​到​​consumerTable​​。

咦,这里只是把DefaultMQPushConsumerImpl作为MQConsumerInner传入到了MQClientInstance#registerConsumer,​​订阅信息在哪里​​可以看到撒?

往心跳包HeartbeatData的consumerData属性中填充的是​​MQConsumerInner#subscriptions()​​​方法的​​返回值​​。

那我们就看一下​​DefaultMQPushConsumerImplement#subscriptions()​​方法:

原来是取的负载均衡服务​​RebalanceImpl​​​中的​​subscriptionInner​​​属性,那​​RebalanceImpl​​​的​​subscriptionInner​​属性又是怎么填充的?

既然是订阅信息,会不会和我们的subscribe()订阅操作有关呢?

我们看一下​​DefaultMQPushConsumerImpl#subscribe()​​方法:

果然是这样填充的消费者订阅者信息。

下面是组装​​SubscriptionData​​订阅信息的代码:

public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\|\\|"); if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); if (trimString.length() > 0) { subscriptionData.getTagsSet().add(trimString); subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { throw new Exception("subString split error"); } } return subscriptionData;}

2)以准备Producer的心跳信息来看:

这里和Consumer的心跳信息来源类似。

其遍历​​MQClientInstance​​​的属性​​producerTable.entrySet()​​​,获取到​​MQProducerInner​​信息,然后将其填充到producerData中。

DefaultMQProducerImpl#start()方法 ​​生产者启动​​​时,会调用​​MQClientInstance#registerProducer()​​​方法,将生产者信息​​填充到producerTable​​中。

我们接着看一下​​MQClientInstance#sendHearbeat()​​是如何发送心跳的?

4、MQClientInstance#sendHearbeat()

1、封装请求,包括:请求编码为HEART_BEAT、编程语言为Java、心跳包HeartBeatData; 2、调用远程服务类​​​NettyRemotingClient​​通过Netty发送心跳。

@Overridepublic RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { // 发送心跳前,执行的钩子函数 doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } // 发送心跳 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); // 发送心跳后,执行的钩子函数 doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); }}

对于RocketMQ是如何通过Netty通信的,不是本文的重点,后续专文分析。

三、总结

以上所有分析相关的源码注释请见GitHub中的release-4.8.0分支:​​https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0​​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:详解eclipse项目中的.classpath文件原理
下一篇:SpringBoot源码分析之bootstrap.properties文件加载的原理
相关文章

 发表评论

暂时没有评论,来抢沙发吧~