SpringBoot+RabbitMQ实现消息可靠传输详解

网友投稿 804 2022-10-04

SpringBoot+RabbitMQ实现消息可靠传输详解

SpringBoot+RabbitMQ实现消息可靠传输详解

目录环境配置消息丢失分析生产阶段生产端模拟消息丢失RabbitMQ消费端

环境配置

SpringBoot 整合 RabbitMQ 实现消息的发送。

1.添加 maven 依赖

org.springframework.boot

spring-boot-starter

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-amqp

2.添加 application.yml 配置文件

spring:

rabbitmq:

host: 192.168.3.19

port: 5672

username: admin

password: xxxx

3.配置交换机、队列以及绑定

@Bean

public DirectExchange myExchange() {

DirectExchange directExchange = new DirectExchange("myExchange");

return directExchange;

}

@Bean

public Queue myQueue() {

Queue queue = new Queue("myQueue");

return queue;

}

@Bean

public Binding binding() {

return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");

}

4.生产发送消息

@Autowired

private RabbitTemplate rabbitTemplate;

@GetMapping("/send")

public String send(String message) {

rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message);

System.out.println("【发送消息】" + message)

return "【send message】" + message;

}

5.消费者接收消息

@RabbitListener(queuesToDeclare = @Queue("myQueue"))

public void process(String msg, Channel channel, Message message) {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

Date date = new Date();

String time = sdf.format(date);

System.out.println("【接收信息】" + msg + " 当前时间" + time);

6.调用生产端发送消息 hello,控制台输出:

【发送消息】hello【接收信息】hello 当前时间2022-05-12 10:21:14

说明消息已经被成功接收。

消息丢失分析

一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:

生产端丢失: 生产者无法传输到 RabbitMQ存储端丢失: RabbitMQ 存储自身挂了消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费

RabbitMQ 从生产端、储存端、消费端都对可靠性传输做很好的支持。

生产阶段

生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。

配置application.yml

spring:

rabbitmq:

# 消息确认机制 生产者 -> 交换机

publisher-confirms: true

# 消息返回机制 交换机 -> 队列

publisher-returns: true

配置

@Configuration

@Slf4j

public class RabbitConfig {

@Autowired

private CpXzLGTbMuonnectionFactory connectionFactory;

@Bean

public RabbitTemplate rabbitTemplate() {

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

log.info("【correlationData】:" + correlationData);

log.info("【ack】" + ack);

log.info("【cause】" + cause);

if (ack) {

log.info("【发送成功】");

} else {

log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause);

}

}

});

rabbitTemplate.setMandatory(true);

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

@Override

public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

log.warn("【消息发送失败】");

log.info("【message】" + message);

log.info("【replyCode】" + replyCode);

}

});

return rabbitTemplate;

}

}

消息从 生产者 到 交换机, 有confirmCallback 确认模式。发送消息成功后消息会调用方法confirm(CorrelationData correlationData, boolean ack, String cause),根据 ack 判断消息是否发送成功。

消息从 交换机 到 队列,有returnCallback 退回模式。

发送消息 product message 控制台输出如下:

【发送消息】product message【接收信息】product message 当前时间2022-05-12 11:27:56【correlationData】:null【ack】true【cause】null【发送成功】

生产端模拟消息丢失

这里有两个方案:

发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。发送不存在的交换机:

// myExchange 修改成 myExchangexxxxx

rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);

结果:

【correlationData】:null【ack】false【cause】channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)【发送失败】

当发送失败可以对消息进行重试

交换机正确,发送不存在的队列:

交换机接收到消息,返回成功通知,控制台输出:

【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]【ack】true【cause】null【发送成功】

交换机没有找到队列,返回失败信息:

【消息发送失败】【message】product message【replyCode】312

RabbitMQ

开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里。

修改队列的持久化,修改成非持久化:

@Bean

public Queue myQueue() {

Queue queue = new Queue("myQueue",false);

return queue;

}

发送消息之后,消息存放在队列中,然后重启 RabbitMQ,消息不存在了。设置队列持久化:

@Bean

public Queue myQueue() {

Queue queue = new Queue("myQueue",true);

return queue;

}

重启之后,队列的消息还存在。

消费端

消费端默认开始 ack 自动确认模式,当队列消息被消费者接收,不管有没有被消费端消息,都自动删除队列中的消息。所以为了确保消费端能成功消费消息,将自动模式改成手动确认模式:

修改application.yml 文件

spring:

rabbitmq:

# 手动消息确认

listener:

simple:

acknowledge-mode: manual

消费接收消息之后需要手动确认:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

@RabbitListener(queuesToDeclare = @Queue("myQueue"))

public void process(String msg, Channel channel, Message message) {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

Date date = new Date();

String time = sdf.format(date);

System.out.println("【接收信息】" + msg + " 当前时间" + time);

System.out.println(message.getMessageProperties().getDeliveryTag());

try {

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

} catch (IOException e) {

e.printStackTrace();

}

}

如果不添加:

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

发送两条消息

消息被接收后,没有确认,重新放到队列中:

重启项目,之后,队列的消息会发送到消费者,但是没有 ack 确认,还是继续会放回队列中。

加上 channel.basicAck 之后pXzLGTbMu,再重启项目

队列消息就被删除了

basicAck 方法最后一个参数 multiple 表示是删除之前的队列。

multiple 设置成 true,把后面的队列都清理掉了

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:小程序开发流程是什么?(怎么开发小程序微信小程序开发流程)
下一篇:Windows 下编译 OpenSSL
相关文章

 发表评论

暂时没有评论,来抢沙发吧~