springboot中rabbitmq实现消息可靠性机制详解

网友投稿 1192 2022-12-05

springboot中rabbitmq实现消息可靠性机制详解

springboot中rabbitmq实现消息可靠性机制详解

1. 生产者模块通过publisher confirm机制实现消息可靠性

1.1 生产者模块导入rabbitmq相关依赖

org.springframework.boot

spring-boot-starter-amqp

com.fasterxml.jackson.core

jackson-databind

1.2 配置文件中进行mq的相关配置

spring.rabbitmq.host=10.128.240.183

spring.rabbitmq.port=5672

spring.rabbitmq.virtual-host=/

spring.rabbitmq.publisher-confirm-type=correlated

spring.rabbitmq.publisher-returns=true

spring.rabbitmq.template.mandatory=true

publish-confirm-type:开启publisher-confirm,有以下可选值

simple:同步等待confirm结果,直到超时

correlated:异步回调,定义ConfirmCallback。mq返回结果时会回调这个ConfirmCallback

publish-returns:开启publish-return功能。可以定义ReturnCallback

template.mandatory: 定义消息路由失败的策略

true:调用ReturnCallback

false:直接丢弃消息

1.3 定义ReturnCallback(消息投递到队列失败触发此回调)

每个RabbitTemplate只能配置一个ReturnCallback。

当消息投递失败,就会调用生产者的returnCallback中定义的处理逻辑

可以在容器启动时就配置这个回调

@Slf4j

@Configuration

public class CommonConfig implements ApplicationContextAware {

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

// 获取RabbitTemplate对象

RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

// 配置ReturnCallback

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

// 判断是否是延迟消息

Integer receivedDelay = message.getMessageProperties().getReceivedDelay();

if (receivedDelay != null && receivedDelay > 0) {

// 是一个延迟消息,忽略这个错误提示

return;

}

// 记录日志

log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",

replyCode, replyText, exchange, routingKey, message.toString());

// 如果有需要的话,重发消息

});

}

}

1.4 定义ConfirmCallback(消息到达交换机触发此回调)

可以为redisTemplate指定一个统一的确认回调

@Slf4j

@Configuration

public class CommonConfig implements ApplicationContextAware {

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

// 获取RabbitTemplate对象

RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

// 配置ReturnCallback

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

// 判断是否是延迟消息

Integer receivedDelay = message.getMessageProperties().getReceivedDelay();

if (receivedDelay != null && receivedDelay > 0) {

// 是一个延迟消息,忽略这个错误提示

return;

}

// 记录日志

log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",

replyCode, replyText, exchange, routingKey, message.toString());

// 如果有需要的话,重发消息

});

// 设置统一的confirm回调。只要消息到达broker就ack=true

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean b, String s) {

System.out.println("这是统一的回调");

System.out.println("correlationData:" + correlationData);

System.out.println("ack:" + b);

System.out.println("cause:" + s);

}

});

}

}

也可以为特定的消息定制回调

@Autowired

private RabbitTemplate rabbitTemplate;

@Test

public void testmq() throws InterruptedException {

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

correlationData.getFuture().addCallback(result->{

if (result.isAck()) {

// ACK

log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());

} else {

// NACK

log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());

// 重发消息

}

},ex->{

// 记录日志

log.error("消息发送失败!", ex);

// 重发消息

});

rabbitTemplate.convertAndSend("example.direct","blue","hello,world",correlationData);

}

2. 消费者模块开启消息确认

2.1 添加配置

# 手动ack消息,不使用默认的消费端确认

spring.rabbitmq.listener.simple.acknowledge-mode=manual

none:关闭ack,消息投递时不可靠的,可能丢失

auto:类似事务机制,出现异常时返回nack,消息回滚到mq,没有异常,返回

ackmanual:我们自己指定什么时候返回ack

2.2 manual模式在-中自定义返回ack

@RabbitListener(queues = "order.release.order.queue")

@Service

public class OrderCloseListener {

@Autowired

private OrderService orderService;

@RabbitHandler

private void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {

System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());

try {

orderService.closeOrder(orderEntity);

// 第二个参数为false则表示仅确认此条消息。如果为true则表示对收到的多条消息同时确认

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {

// 第二个参数为ture表示将这个消息重新加入队列

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

}

}

}

3. 消费者模块开启消息失败重试机制

3.1 配置文件添加配置,开启本地重试

spring:

rabbitmq:

listener:

simple:

retry:

enabled: true # 开启消费者失败重试

initial-interval: 1000 # 初识的失败等待时长为1秒

multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval

max-attempts: 3 # 最大重试次数

stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

开启本地重试,如果消息处理过程总抛出异常,不会requeue到队列,而是在消费者本地重试

重试达到最大次数后,spring会返回ack,消息会被丢弃

http://

4.  消费者模块添加失败策略(用于开启失败本地重试功能后)

当开启本地重试后,重试最大次数后消息直接丢弃。

三种策略,都继承于MessageRecovery接口

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

4.2 定义处理失败消息的交换机和队列 没有会自动创建相应的队列、交换机与绑定关系,有了就啥也不做

@Bean

public DirectExchange errorMessageExchange(){

return new DirectExchange("error.direct");

}

@Bean

public Queue errorQueue(){

return new Queue("error.queue", true);

}

// 路由键为key

@Bean

public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){

return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");

}

4.3 向容器中添加一个失败策略组件

@Bean

public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){

// error为路由键

return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:SpringCloud之@FeignClient()注解的使用方式
下一篇:关于springboot配置druid数据源不生效问题(踩坑记)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~