微前端架构如何改变企业的开发模式与效率提升
555
2022-11-10
RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)
一、原理
以集群消费为例,集群消息在同一个消费组中只能有一个消费者可以消费到这条消息;假如最开始我们有一个叫TopicA的主体,TopicA中有8个MessageQueue;有个消费组ConsumerGroupA;在最开始的时候我们只启动一个consumer1消费者实例,即consumer1这一个消费实例将消费这8个queue,如下图:
然后我们在启动一个consumer2消费者实例,其会向Broker注册;这时broker发现ConsumerGroupA中新增了一个消费者实例,其会通知consumer1:嗨,哥们,你们这个消费组的实例发生了变化,你重新负载均衡一下吧。
这时consumer2在注册的过程中,也会进行负载均衡;它首先会从broker获取ConsumerGroupA下的所有的消费者实例,发现有两个;接着获取订阅的TopicA下的MessageQueue信息集合,一看有8个,然后其会根据负载均衡算法(默认是平均分配)算出可以分几个MessageQueue;8个queue、2个consumer实例,一人4个queue刚刚好。
这时,我们先看一下consumer1它是怎么做的,原本有8个queue,包括它queue4、queue5、queue6、queue7 ,现在不能有了,它会将本地维护的MessageQueue对应的ProcessQueue清理掉,ProcessQueue我们在前面有介绍过:RocketMQ源码解析:ProcessQueue的作用。再看consumer2实例,其新加了4个queue,他会遍历这4个queue,获取每个queue的元数据信息(包括:开始消费的offset、队列ID queueId、broker民称 brokerName),然后为每个MessageQueue生成一个对应的ProcessQueue;最后将MessageQueue和ProcessQueue封装成PullRequest拉取消息请求,并放入到PullMessageService的pullRequestQueue阻塞队列中;然后由PullMessageService服务执行后续的拉取消息动作。关于PullMessageService拉取消息的介绍,我们前面也有聊过:RocketMQ源码分析pullMessage:Consumer是如何从broker拉取消息的?。
rebalance负载均衡就这!!感觉就是同一个消费组中消费者实例如何分配MessageQueue,下面我们来看看RocketMQ源码中是怎么实现的。
二、源码分析
以下所有分析相关的源码注释请见GitHub中的release-4.8.0分支:— 真正负载均衡的地方
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { Set
我们进入rebalanceByTopic()方法,发现其会根据消息的传播方式为BROADCASTING 或 CLUSTERING做不同的逻辑;对于广播模式,约等于不需要做负载均衡,当前消费者会直接消费所有的队列。
1、我们重点看一下集群消息的负载均衡是如何做的?
我们把集群消息的负载均衡分为五步来看,下面我们一步一步的介绍:
1)第一步
从Topic订阅表中获取当前topic下的所有MessageQueue(MessageQueue的信息是定时从NameServer中拉取到的),然后根据Topic从broker中获取当前ConsumerGroup下的所有消费者实例。
2)第二步
对MessageQueue集合和ConsumerID集合进行排序,以保证每个Consumer实例对应的MessageQueue集合顺序是一致的,并且这一步对于下一步的负载均衡算法及其重要。
3)第三步
使用负载均衡算法进行重新负载,默认使用AllocateMessageQueueAveragely 平均分配算法;我们接着来看一下AllocateMessageQueueStrategy#allocate()方法是如何对Queue进行负载的。
就像我们写的代码一样,前面常规一堆校验。我们直接看下平均分配这个负载均衡算法的实现:
(1)获取当前客户端ID(currentCID)在消费者集合(cidAll)中的索引位置; (2)计算MessageQueue能不能 没“零头”的平均分配到每个Consumer上,如果不能就把“零头”记下来; (3)接着计算当前消费者能分到几个MessageQueue去消费,计算规则如下:首先队列个数 是否小于等于 消费者个数;如果小于,则一个consumer最多只能分配到一个queue;否者判断queue是否有“零头”;如果有零头 并且 当前client在消费者集合中的索引位置 小于 “零头”, 当前consumer可以多分到一个queue;否者就不管“零头”,平均分配就得了。这里的索引位置就体现出了第二步时对消费者集合排序的重要性。 (4)计算当前consumer可以从MessageQueue集合中取MessageQueue的startIndex。 (5)结合上面计算的averageSize算出真正要取的queue个数。
我们以实际数据来推演一下:
比如我们有8个queue、一个ConsumerGroup下有3个consumer实例(消费者0、消费者1、消费者2),并且当前consumer客户端在consumer集合的0索引处,即是消费者0。
来,我们细品,如果不对MessageQueue集合和consumer集合进行排序的话,每个Consumer取到的MessageQueue个数不会变,但是大概念会重复,因为大家都是随便取的,所以排序很重要。
4)第四步
更新当前Topic下的所有MessageQueue和对应的ProcessQueue信息,比如MessageQueue的新增、删除等操作。这里我们分三块(移除不再需要关注的MessageQueue/ProcessQueue、新增MessageQueue/ProcessQueue、分配PullRequest拉取消息请求)来看。
(1)遍历ProcessQueueTable这个Map,如果负载均衡后分配的queue中不在包含之前的queue,则就将之前的queue移除;如果之前的queue过了120s没有拉取d到消息了,也会将它销毁。
我们来看看是怎么销毁queue的:
先是设置ProcessQueue的状态为dropped;然后移除没必要的queue,即将这个queue的消费offset上报给broker;另外如果是顺序消费的话,向broker发送释放锁请求。
(2)处理需要新增的MessageQueue,级联创建对应的ProcessQueue。
遍历负载均衡后传递过来的queue集合,当processQueueTable中不包含MessageQueue时执行下列逻辑:
顺序消费时,如果MessageQueue没有被锁定成功,跳过当前MessageQueue,不进行消费操作。清空MessageQueue的消费进度信息,计算下个要消费消息的offset:如果是CONSUME_FROM_LAST_OFFSET,则从Broker中获取该queue的最大offset信息,即从最后开始消费。如果是CONSUME_FROM_FIRST_OFFSET,则先去Broker中获取最后消费到了哪个offset,如果还没消费则从0开始,否则从获取到的offset开始消费。如果是CONSUME_FROM_TIMESTAMP,则先从broker中获取最后消费到的offset,如果还没消费,则去broker上找我们当前时间对应的一个offset。组装ProcessQueue,然后将组装好的ProcessQueue放入到processQueueTable中,并将其封装到PullRequest中放入到pullRequestList。
(3)分配PullRequest拉取消息请求,将pullRequestList加入到PullMessageService的pullRequestQueue中,以供执行拉取消息操作。最后调用RebalanceImpl#dispatchPullRequest()方法 将PullRequeat对象放入到PullMessageService拉取消息服务的pullRequestQueue阻塞队列中。
5)第五步
如果第四步中MessageQueue或ProcessQueue发生了变化,会执行消费队列分配发生变更后的逻辑。
这里会首先给订阅信息设置一个版本号,然后计算一些拉取消息的阈值,包括:Topic级别的拉取阈值、拉取消息大小阈值;
如果配置了topic级别的拉取阈值,计算出来每个queue的拉取阈值,即topic的限制阈值 / queue的数量,然后设置到DefaultMQPushConsumer的pullThresholdForQueue 参数中。同理计算Topic级别拉取消息大小的阈值。
最后向broker 发送心跳包,通知一下Broker。
三、总结
rabalance其实就是解决一些MessageQueue应该如何分给一些消费者的问题。负载均衡操作还牵扯Consumer拉取消息的请求源。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~