《RocketMQ源码分析》Broker是如何处理心跳的?

网友投稿 1491 2022-09-03

《RocketMQ源码分析》Broker是如何处理心跳的?

《RocketMQ源码分析》Broker是如何处理心跳的?

一、Broker接收请求

Broker作为服务端接收请求的流程如下图:

接收到请求之后,我们着重看一下​​NettyRemotingAbstract#processRequestCommand()​​方法,看一下它是怎么处理客户端的请求的。

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { // 根据Command的Code从processorTable中获取相应的事件处理器和线程池 final Pair matched = this.processorTable.get(cmd.getCode()); // 找不到事件处理器,则使用默认的处理器 final Pair pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); final RemotingResponseCallback callback = new RemotingResponseCallback() { @Override public void callback(RemotingCommand response) { doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else { } } } }; if (pair.getObject1() instanceof AsyncNettyRequestProcessor) { // 从 pair 中拿到 Processor事件处理器 AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor) pair.getObject1(); // 处理请求 processor.asyncProcessRequest(ctx, cmd, callback); } else { NettyRequestProcessor processor = pair.getObject1(); RemotingCommand response = processor.processRequest(ctx, cmd); callback.callback(response); } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString()); if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } }; if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; } try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); } if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); }}

因为​​ClientManageProcessor​​​是​​AsyncNettyRequestProcessor​​的子类,所以会走asyncProcessRequest()方法异步处理请求。

在​​AsyncNettyRequestProcessor#asyncProcessRequest()​​​中会调用子类的​​processRequest()​​方法实现:

Broker处理心跳也就在​​ClientManageProcessor#processRequest()​​中:

二、Broker处理心跳

上面提到了Broker处理心跳是在​​ClientManageProcessor#heartBeat()​​​方法中处理的,下面我们来看一下​​heartBeat()​​方法:

public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(null); HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( ctx.channel(), heartbeatData.getClientID(), request.getLanguage(), request.getVersion() ); // 处理心跳包中的消费者信息 for (ConsumerData data : heartbeatData.getConsumerDataSet()) { // 获取Broker端的消费组订阅信息 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( data.getGroupName()); boolean isNotifyConsumerIdsChangedEnable = true; // 如果Broker端的消费组订阅信息不为空,说明当前可能要修改消费者订阅信息 if (null != subscriptionGroupConfig) { // 是否通知到所有的消费者 订阅信息变更 isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); int topicSysFlag = 0; if (data.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } // 消费失败后的,消息消费重试队列、名为:%RETRY%groupName String newTopic = MixAll.getRetryTopic(data.getGroupName()); // 创建消息消费重试队列 this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } // 注册消费者订阅信息到Broker中,并判断订阅信息是否变更 boolean changed = this.brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable ); // 消费者信息发生变更,则打印日志记录 if (changed) { log.info("registerConsumer info changed {} {}", data.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()) ); } } // 处理心跳包中的生产者信息 for (ProducerData data : heartbeatData.getProducerDataSet()) { // 直接注册producer,并把Producer的ClientChannelInfo保存下来、用于后面与Producer通信 this.brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo); } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response;}

对于Producer的心跳信息处理非常简单,直接注册Producer、并把Producer的ClientChannelInfo保存下来、用于后面与Producer通信;

对于Consumer的心跳信息处理稍微复杂一点:

1、先判断Broker端中是否存在消费者的订阅信息:1)如果存在,创建一个用于消息​​重试消费的topic​​(​​%RETRY%​​groupName)2、注册消费者订阅信息到Broker中; 3、判断消费者订阅信息是否变更、消费者的clientChannel(所在机器IP+Port)是否变更;只要有一个变更,就会通知当前消费组下的所有消费者实例 消费组中有信息变更。

最后我们看一下​​ConusumerManager#registerConsumer()​​注册消费者信息的源码:

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set subList, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); // 消费者初次上报订阅信息时 if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } // 消费者的clientChannel(机器)是否变更 boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); // 消费者的订阅信息是否变更 boolean r2 = consumerGroupInfo.updateSubscription(subList); // 若消费者的机器变更 或 订阅信息变更,则通知所有的消费者实例 if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); // 消费者信息发生变更,则返回true return r1 || r2;}

源码注释出处

以上所有分析相关的源码注释请见GitHub中的release-4.8.0分支:​​https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0​​

你们的每一个赞和关注都是博主创作的动力

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:MySQL 面试高频一百问(mysql安装教程)
下一篇:《SpringBoot系列一》:yaml配置文件各种数据类型使用姿势(含@EnableConfigurationProperties、@ConfigurationProperties)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~