RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)

网友投稿 521 2022-11-10

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)

RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)

一、原理

以集群消费为例,集群消息在同一个消费组中只能有一个消费者可以消费到这条消息;假如最开始我们有一个叫TopicA的主体,TopicA中有8个MessageQueue;有个消费组ConsumerGroupA;在最开始的时候我们只启动一个consumer1消费者实例,即consumer1这一个消费实例将消费这8个queue,如下图:

然后我们在启动一个consumer2消费者实例,其会向Broker注册;这时broker发现ConsumerGroupA中新增了一个消费者实例,其会通知consumer1:嗨,哥们,你们这个消费组的实例发生了变化,你重新负载均衡一下吧。

这时consumer2在注册的过程中,也会进行负载均衡;它首先会从broker获取ConsumerGroupA下的所有的消费者实例,发现有两个;接着获取订阅的TopicA下的MessageQueue信息集合,一看有8个,然后其会根据负载均衡算法(默认是平均分配)算出可以分几个MessageQueue;8个queue、2个consumer实例,一人4个queue刚刚好。

这时,我们先看一下consumer1它是怎么做的,原本有8个queue,包括它queue4、queue5、queue6、queue7 ,现在不能有了,它会将本地维护的MessageQueue对应的ProcessQueue清理掉,ProcessQueue我们在前面有介绍过:​​RocketMQ源码解析:ProcessQueue的作用​​​。再看consumer2实例,其新加了4个queue,他会遍历这4个queue,获取每个queue的元数据信息(包括:开始消费的offset、队列ID queueId、broker民称 brokerName),然后为每个MessageQueue生成一个对应的ProcessQueue;最后将MessageQueue和ProcessQueue封装成PullRequest拉取消息请求,并放入到PullMessageService的pullRequestQueue阻塞队列中;然后由PullMessageService服务执行后续的拉取消息动作。关于PullMessageService拉取消息的介绍,我们前面也有聊过:​​RocketMQ源码分析pullMessage:Consumer是如何从broker拉取消息的?​​。

rebalance负载均衡就这!!感觉就是同一个消费组中消费者实例如何分配MessageQueue,下面我们来看看RocketMQ源码中是怎么实现的。

二、源码分析

以下所有分析相关的源码注释请见GitHub中的release-4.8.0分支:​​— 真正负载均衡的地方

private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set mqSet = this-icSubscribeInfoTable.get(topic); if (mqSet != null) { boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, mqSet); log.info("messageQueueChanged {} {} {} {}", consumerGroup, topic, mqSet, mqSet); } } else { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } break; } case CLUSTERING: { // 获取topic下的所有queue元数据信息 Set mqSet = this-icSubscribeInfoTable.get(topic); // 获取topic下的所有consumer List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List mqAll = new ArrayList(); mqAll.addAll(mqSet); // 将consumer和MessageQueue排序 Collections.sort(mqAll); Collections.sort(cidAll); // 获取负载均衡策略 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set allocateResultSet = new HashSet(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } // 真正执行消费的方法 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; }}

我们进入rebalanceByTopic()方法,发现其会根据消息的传播方式为BROADCASTING 或 CLUSTERING做不同的逻辑;对于广播模式,约等于不需要做负载均衡,当前消费者会直接消费所有的队列。

1、我们重点看一下集群消息的负载均衡是如何做的?

我们把集群消息的负载均衡分为五步来看,下面我们一步一步的介绍:

1)第一步

从Topic订阅表中​​获取​​​当前topic下的​​所有MessageQueue​​​(MessageQueue的信息是定时从NameServer中拉取到的),然后根据Topic从broker中获取当前​​ConsumerGroup下的所有消费者实例​​。

2)第二步

对​​MessageQueue集合和ConsumerID集合进行排序​​,以保证每个Consumer实例对应的MessageQueue集合顺序是一致的,并且这一步对于下一步的负载均衡算法及其重要。

3)第三步

使用负载均衡算法进行重新负载,默认使用AllocateMessageQueueAveragely 平均分配算法;我们接着来看一下AllocateMessageQueueStrategy#allocate()方法是如何对Queue进行负载的。

就像我们写的代码一样,前面常规一堆校验。我们直接看下平均分配这个负载均衡算法的实现:

(1)获取当前客户端ID(currentCID)在消费者集合(cidAll)中的索引位置; (2)计算MessageQueue能不能 没“零头”的平均分配到每个Consumer上,如果不能就把“零头”记下来; (3)接着计算当前消费者能分到几个MessageQueue去消费,计算规则如下:首先队列个数 是否小于等于 消费者个数;如果小于,则一个consumer最多只能分配到一个queue;否者判断queue是否有“零头”;如果有零头 并且 当前client在消费者集合中的索引位置 小于 “零头”, 当前consumer可以多分到一个queue;否者就不管“零头”,平均分配就得了。这里的索引位置就体现出了第二步时对消费者集合排序的重要性。 (4)计算当前consumer可以从MessageQueue集合中取MessageQueue的startIndex。 (5)结合上面计算的averageSize算出真正要取的queue个数。

我们以实际数据来推演一下:

比如我们有8个queue、一个ConsumerGroup下有3个consumer实例(消费者0、消费者1、消费者2),并且当前consumer客户端在consumer集合的0索引处,即是消费者0。

来,我们细品,如果不对MessageQueue集合和consumer集合进行排序的话,每个Consumer取到的MessageQueue个数不会变,但是大概念会重复,因为大家都是随便取的,所以排序很重要。

4)第四步

更新当前Topic下的所有MessageQueue和对应的ProcessQueue信息,比如MessageQueue的新增、删除等操作。这里我们分三块(移除不再需要关注的MessageQueue/ProcessQueue、新增MessageQueue/ProcessQueue、分配PullRequest拉取消息请求)来看。

(1)遍历ProcessQueueTable这个Map,如果负载均衡后分配的queue中不在包含之前的queue,则就将之前的queue移除;如果之前的queue过了120s没有拉取d到消息了,也会将它销毁。

我们来看看是怎么销毁queue的:

先是设置ProcessQueue的状态为dropped;然后移除没必要的queue,即将这个queue的消费offset上报给broker;另外如果是顺序消费的话,向broker发送释放锁请求。

(2)处理需要新增的MessageQueue,级联创建对应的ProcessQueue。

遍历负载均衡后传递过来的queue集合,当processQueueTable中不包含MessageQueue时执行下列逻辑:

顺序消费时,如果MessageQueue没有被锁定成功,跳过当前MessageQueue,不进行消费操作。清空MessageQueue的消费进度信息,计算下个要消费消息的offset:如果是​​CONSUME_FROM_LAST_OFFSET​​,则从Broker中获取该queue的最大offset信息,即从最后开始消费。如果是​​CONSUME_FROM_FIRST_OFFSET​​,则先去Broker中获取最后消费到了哪个offset,如果还没消费则从0开始,否则从获取到的offset开始消费。如果是​​CONSUME_FROM_TIMESTAMP​​,则先从broker中获取最后消费到的offset,如果还没消费,则去broker上找我们当前时间对应的一个offset。组装​​ProcessQueue​​​,然后将组装好的ProcessQueue放入到​​processQueueTable​​​中,并将其封装到​​PullRequest​​中放入到pullRequestList。

(3)分配PullRequest拉取消息请求,将pullRequestList加入到PullMessageService的pullRequestQueue中,以供执行拉取消息操作。最后调用RebalanceImpl#dispatchPullRequest()方法 将PullRequeat对象放入到​​PullMessageService​​​拉取消息服务的​​pullRequestQueue​​阻塞队列中。

5)第五步

如果第四步中MessageQueue或ProcessQueue发生了变化,会执行消费队列分配发生变更后的逻辑。

这里会首先给订阅信息设置一个版本号,然后计算一些拉取消息的阈值,包括:Topic级别的拉取阈值、拉取消息大小阈值;

如果配置了topic级别的拉取阈值,计算出来每个queue的拉取阈值,即topic的限制阈值 / queue的数量,然后设置到DefaultMQPushConsumer的pullThresholdForQueue 参数中。同理计算Topic级别拉取消息大小的阈值。

最后向broker 发送心跳包,通知一下Broker。

三、总结

rabalance其实就是解决一些MessageQueue应该如何分给一些消费者的问题。负载均衡操作还牵扯Consumer拉取消息的请求源。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Maven环境安装配置和新建项目介绍
下一篇:《JUC》CyclicBarrier原理/源码解析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~