聊聊RabbitMQ发布确认高级问题

网友投稿 506 2022-11-10

聊聊RabbitMQ发布确认高级问题

聊聊RabbitMQ发布确认高级问题

目录1、发布确认高级1.1、发布确认SpringBoot版本1.1.1、确认机制方案1.1.2、代码架构图1.1.3、配置文件1.1.4、配置类1.1.5、回调接口1.1.6、生产者1.1.7、消费者1.1.8、测试结果1.2、回退消息1.2.1、Mandatory参数1.2.2、配置文件1.2.3、生产者代码1.2.4、回调接口代码1.2.5、测试结果1.3、备份交换机1.3.1、代码架构图1.3.2、配置类代码1.3.3、消费者代码1.3.4、测试结果

1、发布确认高级

1. 存在的问题

再生产环境中由于一些不明原因导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,会导致消息丢失。

1.1、发布确认SpringBoot版本

1.1.1、确认机制方案

当消息不能正常被接收的时候,我们需要将消息存放在缓存中。

1.1.2、代码架构图

1.1.3、配置文件

spring.rabbitmq.host=192.168.123.129

spring.rabbitmq.port=5672

spring.rabbitmq.username=admin

spring.rabbitmq.password=123

spring.rabbitmq.publisher-confirm-type=correlated

NONE:禁用发布确认模式,是默认值。CORRELATED:发布消息成功到交换机会触发回调方方法。CORRELATED:就是发布一个就确认一个。

1.1.4、配置类

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class ConfirmConfig {

public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

public static final String CONFIRM_ROUTING_KEY = "key1";

@Bean("confirmExchange")

public DirectExchange confirmExchange(){

return new DirectExchange(CONFIRM_EXCHANGE_NAME);

}

@Bean("confirmQueue")

public Queue confirmQueue(){

return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();

}

@Bean

public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,

@Qualifier("confirmQueue") Queue confirmQueue){

return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);

}

}

1.1.5、回调接口

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**

* 回调接口

*/

@Component

@Slf4j

public class MyCallBack implements RabbitTemplate.ConfirmCallback {

@Autowired

RabbitTemplate rabbitTemplate;

@PostConstruct

public void init(){

rabbitTemplate.setConfirmCallback(this);

}

/**

* 交换机接受失败后进行回调

* 1. 保存消息的ID及相关消息

* 2. 是否接收成功

* 3. 接受失败的原因

* @param correlationData

* @param b

* @param s

*/

@Override

public void confirm(CorrelationData correlationData, boolean b, String s) {

String id = correlationData != null ? correlationData.getId() : "";

if(b == true){

log.info("交换机已经收到id为:{}的消息",id);

}else{

log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s);

}

}

}

1.1.6、生产者

import com.xiao.springbootrabbitmq.utils.MyCallBack;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotatsPSThZjfion.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

@RestController

@RequestMapping("/confirm")

@Slf4j

public class Producer {

public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

@Autowired

RabbitTemplate rabbitTemplate;

@GetMapping("/sendMessage/{message}")

public void sendMessage(@PathVariable String message){

CorrelationData correlationData1 = new CorrelationData("1");

String routingKey1 = "key1";

rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1);

CorrelationData correlationData2 = new CorrelationData("2");

String routingKey2 = "key2";

rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2);

log.info("发送得内容是:{}",message);

}

}

1.1.7、消费者

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

@Slf4j

public class ConfirmConsumer {

public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

@RabbitListener(queues = CONFIRM_QUEUE_NAME)

public void receiveMessage(Message message){

String msg = new String(message.getBody());

log.info("接收到队列" + CONFIRM_QUEUE_NAME + "消息:{}",msg);

}

}

1.1.8、测试结果

1. 第一种情况

ID为1的消息正常送达,ID为2的消息由于RoutingKey的错误,导致不能正常被消费,但是交换机还是正常收到了消息,所以此时由于交换机正常接收之后的原因丢失的消息不能正常被接收。

2. 第二种情况

我们再上一种情况下修改了ID为1的消息的交换机的名称,所以此时回调函数会进行回答由于什么原因导致交换机无法接收成功消息。

1.2、回退消息

1.2.1、Mandatory参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由(就是消息被交换机成功接收后,无法到达队列),那么消息会直接被丢弃,此时生产者是不知道消息被丢弃这个事件的。通过设置该参数可以在消息传递过程中不可达目的地时将消息返回给生产者。

1.2.2、配置文件

spring.rabbitmq.publisher-returns=true

需要在配置文件种开启返回回调

1.2.3、生产者代码

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;

@RestController

@RequestMapping("/confirm")

@Slf4j

public class Producer {

public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

@Autowired

RabbitTemplate rabbitTemplate;

@GetMapping("/sendMesshttp://age/{message}")

public void sendMessage(@PathVariable String message){

CorrelationData correlationData1 = new CorrelationData("1");

String routingKey1 = "key1";

rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1);

log.info("发送得内容是:{}",message + routingKey1);

CorrelationData correlationData2 = new CorrelationData("2");

String routingKey2 = "key2";

rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2);

log.info("发送得内容是:{}",message + routingKey2);

}

}

1.2.4、回调接口代码

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.ReturnedMessage;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**

* 回调接口

*/

@Component

@Slf4j

public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

@Autowired

RabbitTemplate rabbitTemplate;

@PostConstruct

public void init(){

rabbitTemplate.setConfirmCallback(this);

rabbitTemplate.setReturnsCallback(this);

}

/**

* 交换机接受失败后进行回调

* 1. 保存消息的ID及相关消息

* 2. 是否接收成功

* 3. 接受失败的原因

* @param correlationData

* @param b

* @param s

*/

@Override

public void confirm(CorrelationData correlationData, boolean b, String s) {

String id = correlationData != null ? correlationData.getId() : "";

if(b == true){

log.info("交换机已经收到id为:{}的消http://息",id);

}else{

log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s);

}

}

@Override

public void returnedMessage(ReturnedMessage returnedMessage) {

Message message = returnedMessage.getMessage();

String exchange = returnedMessage.getExchange();

String routingKey = returnedMessage.getRoutingKey();

String replyText = returnedMessage.getReplyText();

log.error("消息{},被交换机{}退回,回退原因:{},路由Key:{}",new String(message.getBody()),exchange,replyText,routingKey);

}

}

1.2.5、测试结果

其他类的代码与上一小节案例相同

ID为2的消息由于RoutingKey不可路由,但是还是被回调函数处理了。

1.3、备份交换机

1.3.1、代码架构图

这里我们新增了备份交换机、备份队列、报警队列。它们绑定关系如图所示。如果确认交换机成功接收的消息无法路由到相应的队列,就会被确认交换机发送给备份交换机。

1.3.2、配置类代码

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class ConfirmConfig {

public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";

public static final String CONFIRM_QUEUE_NAME = "confirm_queue";

public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";

public static final String BACKUP_QUEUE_NAME = "backup_queue";

public static final String WARNING_QUEUE_NAME = "warning_queue";

public static final String CONFIRM_ROUTING_KEY = "key1";

@Bean("confirmExchange")

public DirectExchange confirmExchange(){

return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)

.withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();

}

@Bean("confirmQueue")

public Queue confirmQueue(){

return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();

}

@Bean("backupExchange")

public FanoutExchange backupExchange(){

return new FanoutExchange(BACKUP_EXCHANGE_NAME);

}

@Bean("backupQueue")

public Queue backupQueue(){

return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();

}

@Bean("warningQueue")

public Queue warningQueue(){

return QueueBuilder.durable(WARNING_QUEUE_NAME).build();

}

@Bean

public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,

@Qualifier("confirmQueue") Queue confirmQueue){

return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);

}

@Bean

public Binding queueBindingExchange1(@Qualifier("backupExchange") FanoutExchange backupExchange,

@QsPSThZjfualifier("backupQueue") Queue backupQueue){

return BindingBuilder.bind(backupQueue).to(backupExchange);

}

@Bean

public Binding queueBindingExchange2(@Qualifier("backupExchange") FanoutExchange backupExchange,

@Qualifier("warningQueue") Queue warningQueue){

return BindingBuilder.bind(warningQueue).to(backupExchange);

}

}

1.3.3、消费者代码

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

@Slf4j

public class WarningConsumer {

public static final String WARNING_QUEUE_NAME = "warning_queue";

@RabbitListener(queues = WARNING_QUEUE_NAME)

public void receiveMessage(Message message){

String msg = new String(message.getBody());

log.info("报警发现不可路由的消息内容为:{}",msg);

}

}

1.3.4、测试结果

mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级高。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Linux命令的组成规则及其全拼单词
下一篇:编写一个函数rev用于将一个整数前后倒置。
相关文章

 发表评论

暂时没有评论,来抢沙发吧~