精通RocketMQ系列:万字深度剖析RocketMQ Consumer start启动流程源码

网友投稿 843 2022-09-03

精通RocketMQ系列:万字深度剖析RocketMQ Consumer start启动流程源码

精通RocketMQ系列:万字深度剖析RocketMQ Consumer start启动流程源码

一、概述

RocketMQ的消息消费包含两种模式:推push和拉pull。对于拉模式官方已经不推荐使用,所以我们主要介绍推模式。​​​特别说明:本文的源码基于RocketMQ4.8​​。

二、Push模式启动流程

1、consumer代码片段

package com.example.demo.rocketmq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/** * @author Saint */public class Consumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); // topic , 过滤器 * 表示不过滤 consumer.subscribe("saint-study-topic", "*"); consumer.setConsumeTimeout(20L); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 消息传播模式 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // ack机制 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer start。。。。。。"); }}

2、确定启动流程入口

在consumer.start()行我们F7进入方法发现它所有的逻辑都是​​DefaultMQPushConsumerImpl​​类中的start()方法中做的,从这里我们可以确定入口就是DefaultMQPushConsumerImpl#start()。

到这肯定有很多小机灵鬼要问了,这个traceDispatcher是干嘛用的?满脸黑人问号。从注释中我们看出,它是用来异步传输数据的,默认情况下它是null,也就是说正常我们使用不到它,所以不需要专门花费过多精力去看它。

3、启动流程逻辑

接着上面,我们继续F7步入方法,可以看到此时consumer服务的状态处于CREATE_JUST,然后我们继续深入剖析一把start()方法的内部,拔开它的底裤。

秉持了广大网友的习惯,我们先把源码和相应注释贴出来,方便大家先了解一下。

其实大家看RocketMQ相对新点的版本会发现,注释就像是珍稀动物一下,那是真的少。可想而知在开源之前大部分都是中文注释,开源了中文注释指定不能留,外国人看不懂。所以自己啃吧,好在RocketMQ的代码逻辑很贴近中国人的思维,没那么多设计模式;相对比较好理解。

1)DefaultMQPushConsumerImpl#start()方法

流程图:

public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); // 0、将消费者服务状态预设置为 "启动失败" this.serviceState = ServiceState.START_FAILED; // 1、校验一堆配置,例如:consumerGroup配置规则、消息传播方式不能为null(默认为集群消费--CLUSTERING)、并发消费线程数量。 this.checkConfig(); // 2、copy订阅关系,监听重投队列%RETRY%TOPIC。 this.copySubscription(); // 3、如果消息传播方式是集群模式,将消费者实例的name 修改为PID if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } // 4、初始化MQ客户端连接工厂,此处的MQClientManager使用了饿汉式单例模式 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); // 5、 消息重新负载消费 // 指定消费组 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); // 指定消息传播方式 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); // 队列分配算法,指定如何将消息队列分配给每个使用者客户端。 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); // 指定MQClient工厂 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); // 6、指定Pull模型请求包装器 this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); // 注册消息过滤钩子 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); // 7、指定消费进度(偏移量) if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { // 广播模式offset保存在本地 case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; // 集群模式offset保存在服务器 case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); // 8、创建消费服务 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { // 顺序消费 this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { // 并行消费 this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } // 启动消费服务--定时任务 this.consumeMessageService.start(); // 向broker注册自己(consumer) boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown()); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); // 将consumer的状态修改为 "运行中" this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } // 从nameServer中获取监听的topic路由信息,若变更则修改。 this.updateTopicSubscribeInfoWhenSubscriptionChanged(); // 检查消费者是否注册到broker中 this.mQClientFactory.checkClientInBroker(); // 向所有broker发送心跳信息、并上传FilterClass的源文件 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); // 唤醒ReBalance服务线程 this.mQClientFactory.rebalanceImmediately(); }

我们刚启用一个consumer的时候,consumer客户端的状态是CREATE_JUST,在Switch case逻辑中,当serverState是CREATE_JUST时,会执行以下逻辑:

(1)将消费者服务状态预设置为 “启动失败”。这个操作相信很多看过JUC的源码的大佬都会记得,JUC坐着道格.李老爷子的编程习惯:先预置状态,后续逻辑成功直接提交,否者就回滚。

(2)然后我们真正进入RocketMQ的启动流程,​​第一步​​是很常规的校验操作,校验一堆配置,比如:consumerGroup配置规则、消息传播方式不能为null(默认为集群消费–CLUSTERING)、并发消费线程数量等。感兴趣的老哥可以自己跟进去,你会发现这个逻辑太像我们平时写的代码了。

(3)​​第二步​​​:复制copy订阅关系,监听重投队列%RETRY%TOPIC。这一步对于我们整体consumer的启动流程来讲意义不大,所以​​不要专进入​​​、​​不要专进入​​​、​​不要专进入​​。重要的事说三遍。

(4)​​第三步​​:判断消息传播方式是否为集群模式,是就将消费者实例的name 修改为PID。

(5)​​第四步​​​:初始化MQ客户端连接工厂–MQClientManagerFactory,进而初始化MQClient,此处的MQClientManager使用了饿汉式单例模式。MQClientInstance封装了 RocketMQ 网络处理 API,是消息生产者、消息消费者与 NameServer、Broker 打交道的网络通道。另外:​​同一个 JVM 中的不同消费者和不同生产者在启动时获取到的 MQClientInstance 实例都是同一个​​

(6)​​第五步​​:指定消息重新重新负载的相关配置,比如:消费组、消息传播方式、队列负载策略、MQClient工厂等。

(7)​​第六步​​:指定创建Pull模型请求包装器(PullAPIWrapper),它是拉取Broker消息的API操作包装器。

(8)​​第七步​​:指定消息消费进度OffsetStore对象,初始化消息消费进度。集群模式下消息消费进度offset保存在broker、广播模式下消息消费进度offset保存在client消费者端,即本地文件中。如果是广播模式,紧接着会从本磁盘中加载消费进度文件。

从这里我们可以看到本地的文件的命名规则为:RocketMQ运行目录 / MQClientInstance的ID / groupName / offsets.json

(9)​​第八步​​​:根据是否为顺序消费创建​​ConsumeMessageOrderlyService​​​实现或​​ConsumeMessageConcurrentlyService​​​实现的不同​​ConsumeMessageService​​对象并开启消费消息服务----这是个定时任务。consumeMessageService主要负责消息消费,内部维护一个线程池,可以通过参数配置最大和最小核心线程数、注意的它的阻塞队列是无界的。

(10)​​第九步​​​:consumer向broker注册自己,注册失败则将consumer服务实例的状态回滚到CREATE_JUST,并​​将已经启动的消费消息的定时任务取消​​。否则将consumer的状态修改为RUNNING,并启动MQClientInstance。

嚯,启动MQClientInstance都干了什么呢?

卧槽,居然有注释。我们看一下它的意思:

public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // 如果nameserver地址为空,会去`+ WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP`获取, // WS_DOMAIN_NAME由配置参数rocketmq.namesrv.domain设置,WS_DOMAIN_SUBG由配置参数rocketmq.namesrv.domain.subgroup设置 if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // 开启请求和响应通道,即远程通信服务,生产者和消费者客户端处理消息发送和消费的API。 this.mQClientAPIImpl.start(); /** * 1.定时2min拉取最新的nameServer信息 * 2.默认定时30秒拉取最新的broker和topic路由信息(可配置) * 3.默认定时30s向broker发送心跳包(可配置) * 4.默认定时5s持久化consumer的offset(可配置) * 5.定时1分钟,动态调整线程池线程数量 */ this.startScheduledTask(); // 启动消息拉取服务 this.pullMessageService.start(); // 启动负载均衡服务 this.rebalanceService.start(); // 启动producer消息推送服务 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } }}

开启MQClientAPIImpl远程通信服务,生产者和消费者客户端处理消息发送和消费的API。开启各种各样的定时任务,比如定时拉取最新的nameServer、broker、topic信息,向broker发送心跳,持久化offset和调整线程池数量等。开启从Broker拉取消息服务,供消费端消息消费。开启消费者和消费队列关于消息消费的负载均衡服务。

这地方展开了说会很多,我们后面专门聊一下broker相关的内容。

可能大家会困惑这个serviceState的状态不是修改为了START_FAILED吗?这边不就抛异常直接退出了!!!!注意这里的serviceState是MQClientInstance自己的,而不是上文说的DefaultMQPushConsumerImpl中的那个serviceState。

后面无论consumer的状态是什么都会执行:

(11)​​第十步​​:从namesrv获取topic路由信息,若变更则修改。

(12)​​第十一步​​:向broker端校验客户端,检查client是否注册到broker。

(13)​​第十二步​​:向所有broker发送心跳信息、并上传FilterClass的源文件给FilterServer。

(14)​​第十二步​​:唤醒ReBalance服务线程,立即负载队列。

对于(11)—(14)步的详细介绍我们放在下一篇​​消费者subscribe流程​​中介绍。

三、Pull模式启动流程

Pull模式的启动流程主要体现在DefaultMQPullConsumerImpl类中,下面我们贴出其start()方法:

public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPullConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPullConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPullConsumer.getOffsetStore(); } else { switch (this.defaultMQPullConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPullConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PullConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; }}

从代码上来看,几乎和DefaultMQPushConsumerImpl类的start()方法一样,区别在于DefaultMQPullConsumerImpl类的start()方法中在switch case逻辑后多了如下片段:

// 从nameServer中获取监听的topic路由信息,若变更则修改。this.updateTopicSubscribeInfoWhenSubscriptionChanged();// 检查消费者是否注册到broker中this.mQClientFactory.checkClientInBroker();// 向所有broker发送心跳信息、并上传FilterClass的源文件this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 唤醒ReBalance服务线程this.mQClientFactory.rebalanceImmediately();

四、总结

1、几个关键类的作用

​​DefaultMQPushConsumerImpl​​​是供客户端进行消息消费的,它创建了​​ConsumeMessageService​​​消息消费服务、消息进度保存对象​​OffsetStore​​​,消息消费的-对象​​MessageListener​​。

​​MQClientInstance​​​开启了请求和响应通道、即远程通信服务;开启了消息拉取服务​​PullMessageService​​​、从Broker拉取消息;负载均衡服务​​RebalanceService​​,给consumer和消息队列做负载均衡。

另外:​​DefaultMQPushConsumerImpl​​​和​​MQClientInstance​​都是部署在客户端的;像从Broker拉取消息,消息队列的负载均衡都是在客户端完成的。

2、consumer启动流程关键点

主要就是检查配置参数; 获取MQClientInstance; 给重新负载服务设置消费组、消息传播模式、负载策略等属性, 创建pullAPIWrapper采用长轮询的方式拉取消息; 根据消息传播方式加载offsetStore; 根据是否为顺序消费选择对应的ConsumerMessageService消费服务并启动; 启动MQClientInstance、给broker心跳、唤醒ReBalance服务线程–立即负载队列。

至此国庆嗨皮完回归学习状态。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:php cookie工作原理与实例详解(php是什么语言)
下一篇:高效PHP Redis缓存技术,可参考下步骤(高效液相色谱法)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~