微前端架构如何改变企业的开发模式与效率提升
467
2022-11-10
《RockerMQ源码分析》客户端是如何发送心跳到Broker的?含心跳包数据来源
一、前言
RocketMQ所有的心跳机制: 1)Producer端:
Producer与NameSrv随机建立长连接,定期从NameSrv获取topic路由信息;Producer与Broker的Master结点建立长连接,用于发送消息;此外Producer与Master维持了一个心跳。
2)ConSumer端:
Conumser与NamseSrv随机建立长连接,定期从NameSrv获取topic路由信息;Consumer与Broker的Master和Slave结点建立长连接,用于订阅消息;此外Consumer与Master和slave维持一个心跳。
二、客户端发送心跳
(1)Producer和Consumer通过MQClientInstance的sendHeartbeatToAllBrokerWithLock()方法实现发送心跳请求;
public void sendHeartbeatToAllBrokerWithLock() { if (this.lockHeartbeat.tryLock()) { try { // 发送心跳包 this.sendHeartbeatToAllBroker(); // 上传类过滤器源码 this.uploadFilterClassSource(); } catch (final Exception e) { log.error("sendHeartbeatToAllBroker exception", e); } finally { this.lockHeartbeat.unlock(); } } else { log.warn("lock heartBeat, but failed. [{}]", this.clientId); }}
我们看到sendHeartbeanToAllBrokerWithLock()方法中在sendHeartbeatToAllBroker()之前加了锁,这是因为点啥嘞?
1、RocketMQ对底层进行通信的MQClientInstance进行了复用,即在同一个jvm里的不同的Consumer下面使用的都是同一个MQClientInstance。 2、既然是复用的,那么就可能存在并发,因此这里进行了上锁操作。 3、所以这里是为了防止心跳错乱。
(2)另外在MQClientInstance启动时会启动会调用startScheduledTask()方法,开始一堆定时任务,其中包括:定期默认每30s发送心跳信息到Broker。
MQClientInstance类源码:
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: ....... /** * 1.定时2min拉取最新的nameServer信息 * 2.默认定时30秒拉取最新的broker和topic路由信息(可配置) * 3.默认定时30s向broker发送心跳包(可配置) * 4.默认定时5s持久化consumer的offset(可配置) * 5.定时1分钟,动态调整线程池线程数量 */ this.startScheduledTask(); ....... } }}
startScheduledTask()方法启动定时任务:
private void startScheduledTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 清理下线的broker MQClientInstance.this.cleanOfflineBroker(); // 向所有的broker发送心跳 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);}
我们来看看sendHeartbeatToAllBroker()做了什么?
1、MQClientInstance#sendHeartbeatToAllBroker()
1.准备心跳信息HeartbeatData,如果心跳信息为空,直接返回; 2. 遍历所有的Broker,尝试向所有的Broker发送心跳包;注意:根据客户端的类型(Producer、Consumer)不同,发送到的Broker对象会又差别。 1、如果启动的是生产者,那么心跳保证消费者的相关信息为空,这时只会向Broker的Mater节点发送心跳;因为RocketMQ中主要Master的Broker才能处理写请求。 2、如果启动的是消费者,则会向所有的Broker发送心跳。
private void sendHeartbeatToAllBroker() { // 心跳包--包装类,主要是Producer和Consumer相关信息 final HeartbeatData heartbeatData = this.prepareHeartbeatData(); final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); // 生产者和消费者数据都为空时 if (producerEmpty && consumerEmpty) { log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId); return; } // broker列表不为空时 // todo brokerAddrTable是什么时候初始化的? // 1)当topic的路由信息改变后,会往brokerAddrTable中添加数据 if (!this.brokerAddrTable.isEmpty()) { // 统计发送心跳的次数 long times = this.sendHeartbeatTimesTotal.getAndIncrement(); // 遍历broker列表 Iterator
RocketMQ中客户端和服务端的通信通过Netty实现,这里我们的客户端是Consumer/Producer、服务端是Broker。
我们先看一下RocketMQ是如何准备心跳包数据的?
2、心跳包HeartBeatData
心跳包内容包括:客户端id、生产者信息、消费者信息;
一般情况下生产者信息和消费者信息是互斥的,producerDataSet和consumerDataSet会有一个为空。但如果一个应用既是生产者,也是消费者,那么这种情况下producerDataSet和consumerDataSet都不为空。
public class HeartbeatData extends RemotingSerializable { // consumer 客户端ID private String clientID; /** * 生产者信息 * 1. groupName */ private Set
我们分别看一下生产者信息和消费者信息都包括什么?
1)生产者信息ProducerData
不能再简单了,就一个生产者组的名称。
public class ProducerData { /** * 生产组名称 */ private String groupName;}
2)消费者信息ConsumerData
ConsumerData消费者信息包括:
groupName消费类型:push/pull消息传播方式:集群还是广播启动消费者时从哪开始消费订阅信息SubscriptionData:过滤消息相关标签、SQL规则等。
public class ConsumerData { /** * 消费者名称 */ private String groupName; /** * 消费类型:push/pull */ private ConsumeType consumeType; /** * 消息传播方式:广播 / 集群消费 */ private MessageModel messageModel; /** * 从哪开始消费:从一开始偏移量、从最后偏移量、按时间戳消费 */ private ConsumeFromWhere consumeFromWhere; /** * 订阅数据:过滤消息相关标签、SQL规则等 */ private Set
在我们日常写代码时,这些属性很常见、经常会配置到。SubscriptionData是我们的消费者订阅信息,其内容如下:
public class SubscriptionData implements Comparable
这里面,我们平时最常用到的是topic、subString。
// topic , 过滤器 * 表示根据SQL不过滤、TAG-A || TAG-B表示根据TAG过滤consumer.subscribe("saint-study-topic", "TAG-A || TAG-B");
创建订阅信息的时候,subString会被分割成TAG-A 、TAG-B,然后保存至tagsSet集合里,tag的hashcode会保存到codeSet集合里。
那么心跳包数据是如何组装的?我接着来看。
3、MQClientInstance#prepareHeartbeatData()
private HeartbeatData prepareHeartbeatData() { HeartbeatData heartbeatData = new HeartbeatData(); // clientID heartbeatData.setClientID(this.clientId); // Consumer for (Map.Entry
1)以准备Consumer的心跳信息来看:
其遍历MQClientInstance的属性consumerTable.entrySet(),获取到MQConsumerInner信息,然后将其填充到consumerData中。DefaultMQPushConsumerImplement#start()方法–消费者启动时,会调用MQClientInstance#registerConsumer()方法,将消费者信息(含订阅信息)填充到consumerTable。
咦,这里只是把DefaultMQPushConsumerImpl作为MQConsumerInner传入到了MQClientInstance#registerConsumer,订阅信息在哪里可以看到撒?
往心跳包HeartbeatData的consumerData属性中填充的是MQConsumerInner#subscriptions()方法的返回值。
那我们就看一下DefaultMQPushConsumerImplement#subscriptions()方法:
原来是取的负载均衡服务RebalanceImpl中的subscriptionInner属性,那RebalanceImpl的subscriptionInner属性又是怎么填充的?
既然是订阅信息,会不会和我们的subscribe()订阅操作有关呢?
我们看一下DefaultMQPushConsumerImpl#subscribe()方法:
果然是这样填充的消费者订阅者信息。
下面是组装SubscriptionData订阅信息的代码:
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic, String subString) throws Exception { SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\|\\|"); if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); if (trimString.length() > 0) { subscriptionData.getTagsSet().add(trimString); subscriptionData.getCodeSet().add(trimString.hashCode()); } } } } else { throw new Exception("subString split error"); } } return subscriptionData;}
2)以准备Producer的心跳信息来看:
这里和Consumer的心跳信息来源类似。
其遍历MQClientInstance的属性producerTable.entrySet(),获取到MQProducerInner信息,然后将其填充到producerData中。
DefaultMQProducerImpl#start()方法 生产者启动时,会调用MQClientInstance#registerProducer()方法,将生产者信息填充到producerTable中。
我们接着看一下MQClientInstance#sendHearbeat()是如何发送心跳的?
4、MQClientInstance#sendHearbeat()
1、封装请求,包括:请求编码为HEART_BEAT、编程语言为Java、心跳包HeartBeatData; 2、调用远程服务类NettyRemotingClient通过Netty发送心跳。
@Overridepublic RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { // 发送心跳前,执行的钩子函数 doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } // 发送心跳 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); // 发送心跳后,执行的钩子函数 doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); }}
对于RocketMQ是如何通过Netty通信的,不是本文的重点,后续专文分析。
三、总结
以上所有分析相关的源码注释请见GitHub中的release-4.8.0分支:https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~