RocketMQ源码解析:消息消费失败后的重试逻辑

网友投稿 1383 2022-10-07

RocketMQ源码解析:消息消费失败后的重试逻辑

RocketMQ源码解析:消息消费失败后的重试逻辑

消息消费失败重新投递的流程

我们接着消息消费的逻辑分析,当消费完成后会调用ConsumeMessageConcurrentlyService#processConsumeResult的方法处理消费的结果,如果消费失败,则会将消息再次发送到这条消息原来存储的broker

为什么要将消息发送到原来存储的broker呢? 因为重试的消息只有消息的基本信息,没有具体的消息体,需要重新从commitLog中获取原来的消息获得消息体

// MQClientAPIImpl#consumerSendMessageBackConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);// 只发送消息的基本信息,等发送到broker重新从commitLog文件取消息requestHeader.setGroup(consumerGroup);requestHeader.setOriginTopic(msg.getTopic());requestHeader.setOffset(msg.getCommitLogOffset());requestHeader.setDelayLevel(delayLevel);requestHeader.setOriginMsgId(msg.getMsgId());requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);// 同步发送重试消息RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);

当消息到达broker端的时候,会根据偏移量从commitLog中找到原来的消息,然后构建新的消息,消息体和原来的消息体一样,消息的topic变更为%RETRY% + consumerGroup,并且将消息的delayLevel+1,如果是第一次重试,则直接将delayLevel设置为3,此时这条消息的存储和消费逻辑就会按照定时消息来处理,定时消息的处理逻辑在后一节分析

源码解析

Consumer端处理消费结果

// ConsumeMessageConcurrentlyService#processConsumeResultpublic void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest) { // 通过ackIndex控制消息是否要进行重试 int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: // 消费成功 if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: // 需要重试消息 ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: // 广播模式不会重试消息,失败就丢弃了 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); // 发送重试消息 boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } // 重发消息失败,5s后再重新消费 if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } // 不管消息消费成功与否,将消费过的消息从 ProcessQueue 中删除 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { // 不管消息消费成功与否,都会更新消费进度,将消费进度先暂存在本地,后台定时任务会定时将消费进度同步到broker中 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }}

从这段代码中可以看到当客户端的消费状态为RECONSUME_LATER时,会发送重试消息,当消费者的消费状态为如下三种时会进行重试

返回ConsumeConcurrentlyStatus.RECONSUME_LATER返回null主动或被动抛出异常

Consumer端消费重试消息

Consumer端在启动的时候会订阅普通的topic和重试的topic

// DefaultMQPushConsumerImpl#copySubscriptionswitch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: // 订阅重试主题 final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break;}

Broker端存储消息

Broker端存储消息的流程在SendMessageProcessor#asyncConsumerSendMsgBack,挑重要的源码分析一下

// 超过最大消费次数16次,或者 delayLevel < 0// 则将消息投递到死信队列if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE | PermName.PERM_READ, 0); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return CompletableFuture.completedFuture(response); } msgExt.setDelayTimeLevel(0);} else { if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } msgExt.setDelayTimeLevel(delayLevel);}

当消息超过最大消费次数16次,或者 delayLevel < 0时,会将消息投递到死信队列中,死信队列的topic名为%DLQ% + consumerGroup,否则将topic名字改为%RETRY% + consumerGroup,放入重试队列

String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());

重试消息设置了delayLevel后,这个消息就变成了延时消息,到达投递时间后,Consumer就又能消费到这条消息了(虽然消息的topic此时已经变为%RETRY% + consumerGroup了,但是Consumer在启动的时候已经订阅这个topic了哈)

参考博客

好文 [2][3]https://baiyp.ren/RocketMQ%E6%B6%88%E8%B4%B9%E8%80%85%E4%BF%9D%E9%9A%9C.html

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:最简单的微信小程序Demo(最简单的微信小程序怎么做)
下一篇:基于mybatis
相关文章

 发表评论

暂时没有评论,来抢沙发吧~