微前端架构如何改变企业的开发模式与效率提升
672
2022-09-06
RocketMQ源码解析:消息拉取和消费流程
消息消费总览
RocketMQ消息消费有两种模式,顺序消费和并发消费
顺序消费在工作中几乎没遇到过,所以就不分析这方面的源码了,单纯分析并发消费。并发消费实现MessageListenerConcurrently接口即可,顺序消费实现MessageListenerOrderly接口
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List
RocketMQ消息的消费方式有两种,拉模式和推模式 其实RocketMQ的推模式也是基于拉模式来实现的,即Consumer端通过不断的拉取来实现推的效果,那拉取的间隔如何确定呢?
如果太短,不断发起拉取请求,会造成Broker端的压力比较大 如果太长,则消息不能及时被消费
为了解决这个问题,RocketMQ采用了长轮询的策略,即Consumer发送拉取请求到Broker端,如果Broker有数据则返回,Consumer端再次拉取。如果Broker端没有数据,不立即返回,而是等待一段时间(默认5s)。
如果在等待的这段时间,有要拉取的消息,则将消息返回,Consumer端再次拉取。如果等待超时,也会直接返回,不会将这个请求一直hold住,Consumer端再次拉取
那么Consumer端是多会拉取消息的?
在Consumer端所有的拉取请求都会包装成PullRequest对象,而这对象是由RebalanceService创建的
针对每个topic,RebalanceService会根据负载均衡策略,算出当前Consuemr应该消费的队列。因为Consumer的数量和队列是动态变化的,所以每隔一段时间就要重新算一下当前Consumer应该消费那些队列,如下图所示
我这里演示的是AllocateMessageQueueAveragely这种负载均衡策略的分配逻辑,即每个Consume依次平均分配队列,RocketMQ还提供了很多种负载均衡策略,有兴趣的可以看一下
一个队列只会被一个Consumer消费,当Consumer的数量比队列的数量还多的时候,则有Consumer会被闲置,不会消费消息。之所以这样做是考虑到如果一个队列可以让多个Consumer消费要考虑并发问题,效率还提升不了多少。
RebalanceService在Consumer端启动的时候会执行一次重平衡,后续每隔20s执行一次重平衡,这其实就是消息拉取的时机
当从Broker端拉到消息的时候,会回调PullCallback的实现类,消费消息。理解了大概流程,我们看源码
源码解析
发送拉取请求
当调用DefaultMQPushConsumerImpl#start方法时会启动RebalanceService,这个服务会每隔20s执行一次重平衡,因为头一次拉取消息并不需要再等20s,所以当头一次阻塞等待时,后面的方法会立即唤醒阻塞的线程,开始执行拉取,后续就是每隔20s执行一次重平衡。
public class RebalanceService extends ServiceThread { @Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 每隔20s进行一次重平衡 this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }}
对每个topic都会执行重平衡,最后调用到的方法如下
RebalanceImpl#rebalanceByTopic
当消费模式为BROADCASTING时会消费所有的队列
当消费模式为CLUSTERING时,会使用负载均衡策略算出当前Consumer应该消费的队列
首先获取到topic下所有的messageQueue和消费者id(每个消费者启动的时候会分配一个唯一的id),然后对messageQueue和消费者id进行排序,保证视图的一致性
以AllocateMessageQueueAveragely为例,整个分配思路和分页查询有点类似,消费者a消费第一页的messageQueue,消费者b消费第二页的messageQueue,所以要先对messageQueue和消费者id进行排序
给新分配的MessageQueue创建对应的PullRequest,放到阻塞队列中开始进行拉取
如果之前的MessageQueue被分配给别的消费者消费了,则将MessageQueue对应的ProcessQueue的dropped属性设置为true,此时ProcessQueue对应的PullRequest不会进行拉取,已经拉取到的消息有可能会被消费,但是不会提交消费进度,此时就会造成消息重复消费(后面会详细阐述这一过程)(PullRequest会将拉取到的消息放到ProcessQueue中,而ProcessQueue用TreeMap来存储数据)
PullMessageService会不断从阻塞队中获取PullRequest,然后执行拉取
// PullMessageServicepublic void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception", e); } } log.info(this.getServiceName() + " service end");}
消息拉取的具体逻辑为DefaultMQPushConsumerImpl#pullMessage方法,所以我们看一下这个方法具体的拉取逻辑
processQueue.isDropped 表明 PullRequest 对应的 ConsumeQueue 交给别的消费者消费了,所以不用执行拉取任务了
接着就是通过ProcessQueue进行流控(防止Consumer拉取的消息过多,但是消费很慢)
为什么可以通过ProcessQueue可以进行流控呢?
因为当PullRequest拉取到消息后,会将消息存在ProcessQueue中,而ProcessQueue则是用TreeMap来存储消息的,所以可以通过消息的总条数,总大小,以及偏移量进行流控
消息拉取只有异步这一种方式,最终会调用到MQClientAPIImpl#pullMessageAsync方法,
当拉取到消息时会调用pullCallback实现类的的onSuccess方法,如果发生异常则调用onException方法
消费消息
从上一级我们知道,消息会交给PullCallback的实现类来进行消费,PullCallback的实现类有2个,一个是DefaultMQPullConsumerImpl的匿名内部类,一个是DefaultMQPushConsumerImpl的匿名内部类,正好对应两种消息消费模式,我们只分析DefaultMQPushConsumerImpl中PullCallback的处理逻辑
当拉到消息时,会通过tag对消息进行过滤。然后调用ConsumeMessageConcurrentlyService#submitConsumeRequest进行消息消费。最后往阻塞队列放PullRequest持续进行拉取
将每次要消费的消息封装成ConsumeRequest对象,然后放到线程池中进行消费
ConsumeRequest进行消费的代码比较多,我挑出一部分重要的代码来分析哈
// ConsumeRequestpublic void run() { // dropped = true // 重平衡分配给别的消费者了,停止对该消息队列的消费 if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } ConsumeConcurrentlyStatus status = null; boolean hasException = false; try { // 调用消息-消费消息 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { hasException = true; } // 不管是异常还是返回null,都需要重试消息 if (null == status) { log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } // 这块地方会造成消息的重复消费 // 队列没有被丢弃,则处理消费结果 if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); }}
首先如果processQueue被dropped了,说明重平衡的时候队列交给别的消费者来进行消费了,消息就不用再消费了然后将消息传入MessageListenerConcurrently接口的实现类中,进行消费消息。在消费的过程中如果返回发生异常或者返回null,则将消费结果改为ConsumeConcurrentlyStatus.RECONSUME_LATER(当消息的消费状态为ConsumeConcurrentlyStatus.RECONSUME_LATER时会发送重试消息)最后调用processConsumeResult方法来处理消费结果
在这里说个挺有意思的问题,当我们使用RocketMQ的时候经常被告知要保证消息消费的幂等性,因为消息可能会被重复投递,在什么情况下会造成消息的重复投递呢?
其实在重平衡的时候就会造成消息的重复投递,看上面的代码,虽然在刚开始的时候判断了processQueue是否被丢弃,但是有可能在这个语句执行完后,processQueue被丢弃了,但是消息会被消费哈,在代码的最后判断到processQueue被丢弃了,processConsumeResult方法不会被执行(processConsumeResult方法里面包含了消费进度提交的逻辑),消费进度不会提交,就会造成其他消费者再次消费队列中的消息
if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}
当消费完毕后,提交消费进度,并且根据消费结果来判断是否需要重试
// ConsumeMessageConcurrentlyService#processConsumeResultpublic void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) { // 通过ackIndex控制消息是否要进行重试 int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: // 消费成功 if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: // 需要重试消息 ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: // 广播模式不会重试消息,失败就丢弃了 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List
当消费状态为CONSUME_SUCCESS表示消息消费成功,当消费状态为RECONSUME_LATER表示消息消费失败,需要重试当消费模式为BROADCASTING不会进行消息重试,只有当消费模式为CLUSTERING时,才会进行消息重试最后将消息从ProcessQueue中删除,并且更新队列的消费进度
参考博客
负载均衡过程 [1]消息重平衡的过程 [2]https://cloud.tencent.com/developer/article/1521811
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~