洞察纵观鸿蒙next版本,如何凭借FinClip加强小程序的跨平台管理,确保企业在数字化转型中的高效运营和数据安全?
2098
2022-10-03
详解SpringBoot整合RabbitMQ如何实现消息确认
目录简介生产者消息确认介绍流程配置ConfirmCallbackReturnCallback注册ConfirmCallback和ReturnCallback消费者消息确认介绍手动确认三种方式
简介
本文介绍SpringBoot整合RabbitMQ如何进行消息的确认。
生产者消息确认
介绍
发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递。
如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出;如果是镜像队列,所有镜像接受成功后发确认消息。
流程
如果消息没有到达exchange,则confirm回调,ack=false如果消息到达exchange,则confirm回调,ack=trueexchange到queue成功,则不回调returnexchange到queue失败,则回调return(需设置mandatory=true,否则不会回调,这样消息就丢了)
配置
application.yml
# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true
ConfirmCallback
ConfirmCallback:消息只要被 RabbitMQ broker 接收到就会触发confirm方法。
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("confirm==>发送到broker失败\r\n" +
"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
} else {
log.info("confirm==>发送到broker成功\r\n" +
"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
}
}
}
correlationData:对象内部有id (消息的唯一性)和Message。
(若ack为false,则Message不为null,可将Message数据 重新投递;若ack是true,则correlationData为null)
ack:消息投递到exchange 的状态,true表示成功。
cause:表示投递失败的原因。 (若ack为false,则cause不为null;若ack是true,则cause为null)
给每一条信息添加一个dataId,放在CorrelationData,这样在RabbitConfirmCallback返回失败时可以知道哪个消息失败。
public void send(String dataId, String exchangeName, String rountingKey, String message){
CorrelationData correlationData = new CorrelationData();
correlationData.setId(dataId);
rabbitTemplate.convertAndSend(exchangeName, rountingKey, message, correlationData);
}
public String receive(String queueName){
return String.valueOf(rabbitTemplate.receiveAndConvert(queueName));
}
2.1版本开始,CorrelationData对象具有ListenableFuture,可用于获取结果,而不是在rabbitTemplate上使用ConfirmCallback。
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnCallback
ReturnCallback:如果消息未能投递到目标 queue 里将触发returnedMessage方法。
若向 queue 投递消息未成功,可记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
注意:需要rabbitTemplate.setMandatory(true);
当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者。当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。
代码:
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
"replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
message, replyCode, replyText, exchange, routingKey);
}
}
message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。
注册ConfirmCallback和ReturnCallback
整合后的写法
package com.example.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@Configuration
public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
return rabbitTemplate;
}
// 下边这样写也可以
// @Autowired
// private RabbitTemplate rabbitTemplate;
// @PostConstruct
// public void init() {
// rabbitTemplate.setMandatory(true);
// rabbitTemplate.setReturnCallback(this);
// rabbitTemplate.setConfirmCallback(this);
// }
@Override
public void confirhttp://m(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("confirm==>发送到broker失败\r\n" +
"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
} else {
log.info("confirm==>发送到broker成功\r\n" +
"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
"replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
message, replyCode, replyText, exchange, routingKey);
}
}
消费者消息确认
介绍
确认方式简介详述auto(默认)根据消息消费的情况,智能判定若消费者抛出异常,则mq不会收到确认消息,mq会一直此消息发出去若消费者没有抛出异常,则mq会收到确认消息,mq不会再次将此消息发出去。若消费者在消费时所在服务挂了,mq不会再次将此消息发出去。nonemq发出消息后直接确认消息 manual消费端手动确认消息消费者调用 ack、nack、reject 几种方法进行确认,可以在业务失败后进行一些操作,如果消息未被 ACK 则消息还会存在于MQ,mq会一直将此消息发出去。如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限。
只要消息没有被消费者确认(包括没有自动确认),会导致消息一直被失败消费,死循环导致消耗大量资源。正确的处理方式是:发生异常,将消息记录到db,再通过补偿机制来补偿消息,或者记录消息的重复次数,进行重试,超过几次后再放到db中。
消息确认三种方式配置方法
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
手动确认三种方式
basicAck,basicNack,basicReject
basicAck
含义
表示成功确认,使用此回执方法后,消息会被RabbitMQ broker 删除。
函数原型
void basicAck(long deliveryTag, boolean multiple)
deliveryTag
消息投递序号每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。
multiple
是否批量确认值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
示例: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
实例
@RabbitHandler
public void process(String content, Channel channel, Message message){
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
basicNack
含义
表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。
函数原型
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag:表示消息投递序号。multiple:是否批量确认。requeue:值为 true 消息将重新入队列。
basicReject
含义
拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
函数原型
void basicReject(long deliveryTag, boolean requeue)
deliveryTag:表示消息投递序号。requeue:值为 true 消息将重新入队列。
以上就是详解SpringBoot整合RabbitMQ如何实现消息确认的详细内容,更多关于SpringBoot RabbitMQ消息确认的资料请关注我们其它相关文章!
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~