RocketMQ源码解析:消息拉取和消费流程

网友投稿 672 2022-09-06

RocketMQ源码解析:消息拉取和消费流程

RocketMQ源码解析:消息拉取和消费流程

消息消费总览

RocketMQ消息消费有两种模式,顺序消费和并发消费

顺序消费在工作中几乎没遇到过,所以就不分析这方面的源码了,单纯分析并发消费。并发消费实现MessageListenerConcurrently接口即可,顺序消费实现MessageListenerOrderly接口

public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); }}

RocketMQ消息的消费方式有两种,拉模式和推模式 其实RocketMQ的推模式也是基于拉模式来实现的,即Consumer端通过不断的拉取来实现推的效果,那拉取的间隔如何确定呢?

如果太短,不断发起拉取请求,会造成Broker端的压力比较大 如果太长,则消息不能及时被消费

为了解决这个问题,RocketMQ采用了长轮询的策略,即Consumer发送拉取请求到Broker端,如果Broker有数据则返回,Consumer端再次拉取。如果Broker端没有数据,不立即返回,而是等待一段时间(默认5s)。

如果在等待的这段时间,有要拉取的消息,则将消息返回,Consumer端再次拉取。如果等待超时,也会直接返回,不会将这个请求一直hold住,Consumer端再次拉取

那么Consumer端是多会拉取消息的?

在Consumer端所有的拉取请求都会包装成PullRequest对象,而这对象是由RebalanceService创建的

针对每个topic,RebalanceService会根据负载均衡策略,算出当前Consuemr应该消费的队列。因为Consumer的数量和队列是动态变化的,所以每隔一段时间就要重新算一下当前Consumer应该消费那些队列,如下图所示

我这里演示的是AllocateMessageQueueAveragely这种负载均衡策略的分配逻辑,即每个Consume依次平均分配队列,RocketMQ还提供了很多种负载均衡策略,有兴趣的可以看一下

一个队列只会被一个Consumer消费,当Consumer的数量比队列的数量还多的时候,则有Consumer会被闲置,不会消费消息。之所以这样做是考虑到如果一个队列可以让多个Consumer消费要考虑并发问题,效率还提升不了多少。

RebalanceService在Consumer端启动的时候会执行一次重平衡,后续每隔20s执行一次重平衡,这其实就是消息拉取的时机

当从Broker端拉到消息的时候,会回调PullCallback的实现类,消费消息。理解了大概流程,我们看源码

源码解析

发送拉取请求

当调用DefaultMQPushConsumerImpl#start方法时会启动RebalanceService,这个服务会每隔20s执行一次重平衡,因为头一次拉取消息并不需要再等20s,所以当头一次阻塞等待时,后面的方法会立即唤醒阻塞的线程,开始执行拉取,后续就是每隔20s执行一次重平衡。

public class RebalanceService extends ServiceThread { @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 每隔20s进行一次重平衡 this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }}

对每个topic都会执行重平衡,最后调用到的方法如下

RebalanceImpl#rebalanceByTopic

当消费模式为BROADCASTING时会消费所有的队列

当消费模式为CLUSTERING时,会使用负载均衡策略算出当前Consumer应该消费的队列

首先获取到topic下所有的messageQueue和消费者id(每个消费者启动的时候会分配一个唯一的id),然后对messageQueue和消费者id进行排序,保证视图的一致性

以AllocateMessageQueueAveragely为例,整个分配思路和分页查询有点类似,消费者a消费第一页的messageQueue,消费者b消费第二页的messageQueue,所以要先对messageQueue和消费者id进行排序

给新分配的MessageQueue创建对应的PullRequest,放到阻塞队列中开始进行拉取

如果之前的MessageQueue被分配给别的消费者消费了,则将MessageQueue对应的ProcessQueue的dropped属性设置为true,此时ProcessQueue对应的PullRequest不会进行拉取,已经拉取到的消息有可能会被消费,但是不会提交消费进度,此时就会造成消息重复消费(后面会详细阐述这一过程)(PullRequest会将拉取到的消息放到ProcessQueue中,而ProcessQueue用TreeMap来存储数据)

PullMessageService会不断从阻塞队中获取PullRequest,然后执行拉取

// PullMessageServicepublic void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end");}

消息拉取的具体逻辑为DefaultMQPushConsumerImpl#pullMessage方法,所以我们看一下这个方法具体的拉取逻辑

processQueue.isDropped 表明 PullRequest 对应的 ConsumeQueue 交给别的消费者消费了,所以不用执行拉取任务了

接着就是通过ProcessQueue进行流控(防止Consumer拉取的消息过多,但是消费很慢)

为什么可以通过ProcessQueue可以进行流控呢?

因为当PullRequest拉取到消息后,会将消息存在ProcessQueue中,而ProcessQueue则是用TreeMap来存储消息的,所以可以通过消息的总条数,总大小,以及偏移量进行流控

消息拉取只有异步这一种方式,最终会调用到MQClientAPIImpl#pullMessageAsync方法,

当拉取到消息时会调用pullCallback实现类的的onSuccess方法,如果发生异常则调用onException方法

消费消息

从上一级我们知道,消息会交给PullCallback的实现类来进行消费,PullCallback的实现类有2个,一个是DefaultMQPullConsumerImpl的匿名内部类,一个是DefaultMQPushConsumerImpl的匿名内部类,正好对应两种消息消费模式,我们只分析DefaultMQPushConsumerImpl中PullCallback的处理逻辑

当拉到消息时,会通过tag对消息进行过滤。然后调用ConsumeMessageConcurrentlyService#submitConsumeRequest进行消息消费。最后往阻塞队列放PullRequest持续进行拉取

将每次要消费的消息封装成ConsumeRequest对象,然后放到线程池中进行消费

ConsumeRequest进行消费的代码比较多,我挑出一部分重要的代码来分析哈

// ConsumeRequestpublic void run() { // dropped = true // 重平衡分配给别的消费者了,停止对该消息队列的消费 if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } ConsumeConcurrentlyStatus status = null; boolean hasException = false; try { // 调用消息-消费消息 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { hasException = true; } // 不管是异常还是返回null,都需要重试消息 if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } // 这块地方会造成消息的重复消费 // 队列没有被丢弃,则处理消费结果 if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); }}

首先如果processQueue被dropped了,说明重平衡的时候队列交给别的消费者来进行消费了,消息就不用再消费了然后将消息传入MessageListenerConcurrently接口的实现类中,进行消费消息。在消费的过程中如果返回发生异常或者返回null,则将消费结果改为ConsumeConcurrentlyStatus.RECONSUME_LATER(当消息的消费状态为ConsumeConcurrentlyStatus.RECONSUME_LATER时会发送重试消息)最后调用processConsumeResult方法来处理消费结果

在这里说个挺有意思的问题,当我们使用RocketMQ的时候经常被告知要保证消息消费的幂等性,因为消息可能会被重复投递,在什么情况下会造成消息的重复投递呢?

其实在重平衡的时候就会造成消息的重复投递,看上面的代码,虽然在刚开始的时候判断了processQueue是否被丢弃,但是有可能在这个语句执行完后,processQueue被丢弃了,但是消息会被消费哈,在代码的最后判断到processQueue被丢弃了,processConsumeResult方法不会被执行(processConsumeResult方法里面包含了消费进度提交的逻辑),消费进度不会提交,就会造成其他消费者再次消费队列中的消息

if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}

当消费完毕后,提交消费进度,并且根据消费结果来判断是否需要重试

// ConsumeMessageConcurrentlyService#processConsumeResultpublic void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) { // 通过ackIndex控制消息是否要进行重试 int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: // 消费成功 if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: // 需要重试消息 ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: // 广播模式不会重试消息,失败就丢弃了 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); // 发送重试消息 boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } // 重发消息失败,5s后再重新消费 if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } // 不管消息消费成功与否,将消费过的消息从 ProcessQueue 中删除 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { // 不管消息消费成功与否,都会更新消费进度,将消费进度先暂存在本地,后台定时任务会定时将消费进度同步到broker中 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }}

当消费状态为CONSUME_SUCCESS表示消息消费成功,当消费状态为RECONSUME_LATER表示消息消费失败,需要重试当消费模式为BROADCASTING不会进行消息重试,只有当消费模式为CLUSTERING时,才会进行消息重试最后将消息从ProcessQueue中删除,并且更新队列的消费进度

参考博客

负载均衡过程 [1]消息重平衡的过程 [2]https://cloud.tencent.com/developer/article/1521811

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:模板2
下一篇:mysql 使用set names 解决乱码问题的原理(mysql怎么导入sql文件)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~