RocketMQ源码分析pullMessage:Consumer是如何从broker拉取消息的?

网友投稿 933 2022-11-10

RocketMQ源码分析pullMessage:Consumer是如何从broker拉取消息的?

RocketMQ源码分析pullMessage:Consumer是如何从broker拉取消息的?

一、背景

在《​​RocketMQ源码解析:ProcessQueue的作用​​​》一文中我们介绍了consumer拉取消息时入参PullRequest中的主要操作类ProcessQueue,每个MessageQueue对应一个ProcessQueue,然后将两者封装成一个PullRequest。这边文章我变继续聊一下consumer是如何使用PullRequest拉取消息进行消费的? 留个坑:下文聊一聊consumer的rebalance,毕竟PullRequest的来源是它。

二、源码分析

以下所有分析相关的源码注释请见GitHub中的release-4.8.0分支:​​void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set mqSet = this-icSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } case CLUSTERING: { // 获取topic下的所有queue元数据信息 Set mqSet = this-icSubscribeInfoTable.get(topic); // 获取topic下的所有consumer List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List mqAll = new ArrayList(); mqAll.addAll(mqSet); // 将consumer和MessageQueue排序 Collections.sort(mqAll); Collections.sort(cidAll); // 获取负载均衡策略 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set allocateResultSet = new HashSet(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } // 真正执行消费的方法 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; }}

我们进入rebalanceByTopic()方法,发现其会根据消息的传播方式为BROADCASTING 或 CLUSTERING做不同的逻辑;但是我们可以发现广播模式的代码约等于集群模式代码的一部分,即简化版,所以我们重点看一下集群模式。

1)获取topic下的所有MessageQueue、Consumer。 2)获取负载均衡策略,组装消息分配结果。 3)调用updateProcessQueueTableInRebalance()方法构建PullRequest

private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet, final boolean isOrder) { boolean changed = false; // 获取所有的消息 Iterator> it = this.processQueueTable.entrySet().iterator(); // 遍历每个MessageQueue和其对应的ProcessQueue while (it.hasNext()) { Entry next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); if (mq.getTopic().equals(topic)) { // 当前消费者不需要消费该MessageQueue if (!mqSet.contains(mq)) { pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } } else if (pq.isPullExpired()) { // 在负载均衡更新ProcessQueueTable时调用,如果拉取失效,ProcessQueue将被丢弃。 switch (this.consumeType()) { case CONSUME_ACTIVELY: break; case CONSUME_PASSIVELY: pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", consumerGroup, mq); } break; default: break; } } } } List pullRequestList = new ArrayList(); // 遍历所有的MessageQueue,然后构建对应的ProcessQueue给它 for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } // 负载均衡服务分发PullRequest请求。 this.dispatchPullRequest(pullRequestList); return changed;}

核心逻辑:如果processQueueTable不为空,则直接从内存中取;否则遍历所有的MessageQueue,给其构建对应的ProcessQueue,然后封装到PullRequest中。最后调用dispatchPullRequest()方法将PullRequeat对象通过PullMessageService放入到pullRequestQueue阻塞队列中。

这里循环调用了defaultMQPushConsumerImpl 的立即执行PullRequest方法,我们继续跟进入看看。

这里又将PullRequest交给了PullMessageService 服务,继续继续

这里终于将PullRequest对象放到了pullRequestQueue中。

咦,那pullMessageSerice是什么时候启动的?

其实它也是一个线程,在启动MQClientInstance的时候,也会将它启动,并且它的run方法中有个死循环,会一直运行内部逻辑。

2、pullMessage()方法解析

上面提到,在启动MQClientInstance的时候,会将PullMessageService服务启动,PullMessageService的工作职责是 从LinkedBlockQueue中循环取PullRequest对象,然后执行pullMessage方法,进而去请求broker获取消息。

注意:

1)一个应用程序(消费端)中一个消费组对应一个 DefaultMQPushConsumerImpl (同一个IP:端口)。 2)注意同一个JVM中只有一个MQClientInstance 。 3)每一个MQClientInstance中持有一个PullMessageServive实例。

所以:同一个应用程序中,如果存在多个消费组,那么多个DefaultMQPushConsumerImpl 的消息拉取,都需要依靠一个PullMessageServive。

简版流程图

PullMessageService的run方法中会间接调用DefaultMQPushConsumerImpl的pullMessage()方法,接下来我们便看一下它内部都做了什么?(1)首先获取pullRequest的处理队列ProcessQueue,然后更新ProcessQueue的最后一次消息拉取时间:

(2)校验消费者服务的状态,如果不是RUNNING状态,则将PullRequest延时3s放入到pullRequestQueue中。

(3)流量控制,从消息个数(阈值1000个)和消息总大小(阈值100MB)两个维度考虑

// 流量控制,两个维度// 1) 消息数量达到阈值(默认1000)if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { // 延迟50ms再将PullRequest加入到pullRequestQueue中。 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return;}// 2)消息体总大小达到阈值(默认100MB)if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { // 延迟50ms再将PullRequest加入到pullRequestQueue中。 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return;}

非顺序消费情况下,还存在一个流控规则,即一批消息的offset间隔大于2000,延时执行–> 将PullRequest,在50毫秒后,放入pullRequestQueue阻塞队列中,然后再尝试拉取。

顺序消费情况下,也存在延时执行拉取消息请求的可能。即当ProcessQueue的读写锁被其他线程占用时。延时3s将PullRequest放入pullRequestQueue阻塞队列中。

(4)从内存中获取主题订阅信息,如果为空,延时3s执行拉取请求。

(5)组装异步拉取消息时的callBack逻辑到PullCallBack中。

我们先把代码贴出来,代码分析放在消息拉取成功之后。

// ** 调用MQClientInstance异步拉取消息后的回调处理逻辑PullCallback pullCallback = new PullCallback() { // 拉取消息成功时 @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { // 将拉取到的消息放到PullResult中 pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { // 拉取到消息时 case FOUND: // 获取拉取消息之前processQueue中offset最大的消息的下一个offset,后面只是用它做一个判断,没什么实际作用。 long prevRequestOffset = pullRequest.getNextOffset(); // 将拉取回来的offset最大的消息的下一个offset赋值给pullRequest的nextOffset pullRequest.setNextOffset(pullResult.getNextBeginOffset()); // 拉取消息的总耗时 long pullRT = System.currentTimeMillis() - beginTimestamp; // 汇总所有拉取消息操作的总耗时 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; // 如果拉取到的消息个数为0 if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { // 将请求PullRequest重新放入到pullRequestQueue中 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { // 获取第一条消息的offset firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); // 汇总拉取到的所有消息的长度 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); // 将拉取到的消息放入到ProcessQueue中 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); // 消费消息服务开始干活 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); }};

(6)如果消息传播方式是集群消费模式,则从内存中获取MessageQueue的CommitLog的最新偏移量

(7)从FilterServer中获取类过滤器

(8)构建拉取消息系统的标识:是否支持commitOffset、suspend、subExpression、classFilter

(9)pullAPIWrapper真正执行消息拉取,消息拉取失败则延时3s重试。

拉取消息的核心逻辑,我们进一步看一下拉取消息核心逻辑,pullAPIWrapper#pullKernelImpl()方法。

3、拉取消息核心逻辑,pullAPIWrapper#pullKernelImpl()方法

1)入参

/** * 消息消费队列的元数据信息 */final MessageQueue mq,/** * 消息订阅子模式,subscribe(topicName, "模式") */final String subExpression,final String expressionType,/** * 版本 */final long subVersion,/** * 开始拉取的消息偏移量,pullRequest.getNextOffset() */final long offset,/** * 拉取的最大消息个数,defaultMQPushConsumer.getPullBatchSize() */final int maxNums,/** * 系统标识,FLAG_COMMIT_OFFSET_SUSPEND */final int sysFlag,/** * 内存中当前消息队列commitLog日志中当前的最新偏移量 */final long commitOffset,/** * 允许broker暂停的时间,单位:毫秒,默认15s. */final long brokerSuspendMaxTimeMillis,/** * 拉取消息的超时时间 */final long timeoutMillis,/** * 拉取方式,同步、异步、只发送不关注返回 */final CommunicationMode communicationMode,/** * 回调逻辑 */final PullCallback

2)根据MQ的Broker信息获取Broker信息,并封装成FindBrokerResult。

3)封装网络请求体,调用MQClientInstance拉取消息。

4)处理返回结果

(1)MQClientAPIImpl#pullMessage()方法如下:

拉取消息,会根据拉取模式,是同步还是异步,调用MQClientAPIImpl回调或直接处理。由于前面传过来的CommunicationMode为ASYNC,即异步。所以我们着重看一下MQClientAPIImpl中异步拉取消息的逻辑。(2)调用MQClientAPIImpl异步拉取消息,将返回的内容封装成pullResult;放入PullCallBack中。

(3)我们看一下PullCallBack回调的处理逻辑

(3.1)首先对拉取的消息进行一系列的处理,主要包括:1、对返回体解码成一条条消息 ;2、执行消息过滤(可自定义过滤钩子函数);3、消息属性设置

对消息的处理体现在PullAPIWrapper#processPullResult()方法中:

由于consumer订阅的时候可以关注Tags,所以在消息到达时,会按照订阅的tags进行过滤,用户监听消息的时候就只会收到自己关注的tag。(3.2)拉取到消息时,首先将其放入到处理队列ProcessQueue中;然后消费消息服务consumeMessageService开始干活。

第一步:将拉取到的消息放入到ProcessQueue中,即将消息加入到ProcessQueue的msgTreeMap红黑树容器中。

第二步:消息消费服务开始干活,并且会再将PullRequest请求根据pullInternal决定延时pullInterval毫秒还是立即放入pullRequestQueue中,​​使Consumer可以一直拉取消息​​。

(3.3)针对是否为顺序消费,消息消费服务consumeMessageService有不同的实现。1)非顺序消费时,​​ConsumeMessageConcurrentlyService​​采用线程池的机制进行并发消费。

​​注意:如果此次拉取的消息条数大于ConsumeMessageBatchMaxSize(默认1条),则分批消费。​​来我们看一下这个线程池是怎么配置的?

我们再看放入线程池中执行的线程ConsumeRequest,主要看一下它的run()方法。注意这里的msgs,顺序消费时是没有的。

run part1:注册执行消费前的钩子函数,并设置消息的重试topic。

也就是我们业务系统定义的消费-,负责具体消息的消费。例如:

run part2:开始消费消息,并返回该批消息消费结果。

run part3:根据是否出现异常等,判断处理结果。

run part4:执行消息消费钩子方法,并根据消息消费结果(成功或失败)处理消费进度。

非顺序消费(并非消费)的主要思路: 1、将待消费的消息存入ProcessQueue中存储,并执行消息消费之前钩子函数 2、修改待消费消息的主题(设置为消费组的重试主题) 3、分页消费(每次传给业务消费-的最大数量为配置的 sconsumeMessageBatchMaxSize 4、执行消费后钩子函数,并根据业务方返回的消息消费结果(成功,重试)【ACK】确认信息,然后更新消息进度,从ProceeQueue中删除相应的消息

2)顺序消费时,​​ConsumeMessageOrderlyService​​也是采用线程池的机制进行消费,这里注意了,顺序消费时,我们要将线程池的核心线程和最大线程数设置为1。

我们重点看一下​​ConsumeMessageOrderlyService​​的ConsumeRequest。

与非顺序消费的区别是顺序消费的ConsumeRequest只针对ProcessQueue和messageQueue,而不是针对消息。其获取消息的逻辑是直接从ProcessQueue中取,一次取consumeMessageBatchMaxSize个(默认一个)。run()方法中消息消费的逻辑与非顺序消费差不多,但其关键点在于消息的消费/获取的顺序性,所以就不可避免的​​引入锁机制​​。

注意这里的加锁范围是针对ProcessQueue,所以RocketMQ的顺序消费是针对MessageQueue,即RocketMQ无法做到多MessageQueue的全局顺序消费;如果要用RocketMQ做topic级别的全局顺序消费,那该主题只能允许有一个队列。

补充

1、pullRequestQueue(LinkedBlockQueue)中的PullRequest对象是在什么时候放入的?

除了RebalanceService启动时为Topic下所有的MessageQueue分别构建一个ProcessQueue,然后将两者封装成一个PullRequest,放入到消息拉取服务PullMessageService的pullRequestQueue阻塞队列中;还有如下情况:

DefaultMQPushConsumerImpl#pullMessage()方法中, 0)消费者服务状态不是RUNNING,延时3s将pullRequest放入pullRequestQueue中。 1)消费服务被暂停,延时1s将pullRequest放入到pullRequestQueue中。 2)当拉取的消息达到阈值(消息数量达到1000,消息总大小达到100MB)时,延时50ms将PullRequest放入到PullRequestQueue中。 3)非顺序消费时,如果拉取到的一批消息之间的offset大于2000,延时50ms将PullRequest放入到pullRequestQueue中。 4)顺序消费时,如果当前线程没有获取到ProcessQueue的读写锁,延时3s将PullRequest放入到pullRequestQueue中。 5)如果从内存中获取不到订阅信息了,也是延时3s将PullRequest放入到pullRequestQueue中。 6)每次拉取消息成功之后; 6.1)如果没消息返回,则立即将pullRequest放入到PullRequestQueue中; 6.2)如果有消息返回,并且拉取时间间隔pullInterval大于0,则延时pullInterval ms之后将pullRequest放入到PullRequestQueue中。否者立即放入。 7)如果返回的pullStatus是NO_MATCHED_MSG,即没有匹配的消息,也会立即将pullRequest加入到pullRequestQueue中。 8)调用pullAPIWrapper拉取消息失败后,延时3s将pullRequest加入到pullRequestQueue中。

2、如何自定义消息过滤Hook?

1、实现FilterMessageHook接口的filterMessage()方法

2、通过调用DefaultMQPushConsumerImpl#registerFilterMessageHook()方法实现。

在消费者启动的时候(DefaultMQPushConsumerImpl#start()),会将所有的消息过滤钩子通过PullAPIWrapper#registerFilterMessageHook()方法注册。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SpringBoot源码分析之bootstrap.properties文件加载的原理
下一篇:MySQL:图解MVCC到底能不能解决幻读问题?
相关文章

 发表评论

暂时没有评论,来抢沙发吧~