视频软件App开发引领数字内容创作与分享的新时代
785
2022-11-10
《RocketMQ源码分析》部分Consumer下线后,倒推Broker如何做到准实时通知剩余Consumer?
一、承上启下
我们接着上一篇RocketMQ Broker如何感知Consumer的异常宕机、正常下线?继续聊Broker感知到到Consumer下线后,如何通知剩余的Consumer?
二、正文
1、Consumer接收通知
背景:这里在14:37:07这个时间点,同一消费组中的两个消费者挂了一个。
/{user.home}/logs/rocketmqlogs/rocketmq_client.log文件是RocketMQ客户端的日志所在,在其中我们可以看到receive broker's notification这一行日志,字面意思是获取到Broker的通知; 这里我可以看到从一个Consumer下线到其他Consumer感知到大约需要7s左右,这也是为什么说Broker是准实时通知剩余Consumer。
同样的套路,全局搜索receive broker's notification,找到ClientRemotingProcessor#notifyConsumerIdsChanged()方法;
往上走,进入到ClientRemotingProcessor#processRequest()方法中,其用于接收Broker发送的netty调用;当RequestCode为NOTIFY_CONSUMER_IDS_CHANGED时调用notifyConsumerIdsChanged()方法通知Consumer Group的Consumer信息发生变更。
2、Broker发送通知
全局搜索NOTIFY_CONSUMER_IDS_CHANGED,找一下哪里构建了RequestCode为NOTIFY_CONSUMER_IDS_CHANGED的请求(RemotingCommand)?如图只有一处:Broker2Client#notifyConsumerIdsChanged()中;
继续向上找,找到DefaultConsumerIdsChangeListener#handle()方法中,当event为CHANGE时,循环调用每个Consumer,通知Consumer Group的Consumers发生变更;
再看一下DefaultConsumerIdsChangeListener是哪里初始化的?
我们再看ConsumerManager类如何使用到它的?在ConsumerManager#doChannelCloseEvent()中使用到了它。
RocketMQ Broker 和Consumer/Producer之间采用Netty通信,无论Consumer下线后,Broker通过Netty的通信机制(ChannelInboundHandlerAdapter#channelInactive())可以实时感知到;
当Consumer下线后,会进入到ConsumerManager#doChannelCloseEvent()中,可以参考:RocketMQ Broker如何感知Consumer的异常宕机、正常下线?
1)ConsumerGroupInfo来源
在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable;同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。参考:《RocketMQ源码分析》Broker是如何处理心跳
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~