SpringBoot整合RabbitMQ消息队列的完整步骤

网友投稿 521 2023-01-23

SpringBoot整合RabbitMQ消息队列的完整步骤

SpringBoot整合RabbitMQ消息队列的完整步骤

SpringBoot整合RabbitMQ

主要实现RabbitMQ以下三种消息队列:

简单消息队列(演示direct模式)

基于RabbitMQ特性的延时消息队列

基于RabbitMQ相关插件的延时消息队列

公共资源

1. 引入pom依赖

org.springframework.boot

spring-boot-starter-amqp

2. 配置yml文件

基于上篇《RabbitMQ安装与配置》实现的情况下,进行基础配置。

spring:

rabbitmq:

host: 121.5.168.31

port: 5672 # 默认可省略

virtual-host: /*** # 虚拟主机

username: *** # 用户名

password: *** # 用户密码

# 开启投递成功回调 P -> Exchange

publisher-confirm-type: correlated

# 开启投递消息到队列失败回调 Exchange -> Queue

publisher-returns: true

# 开启手动ACK确认模式 Queue -> C

listener:

simple:

acknowledge-mode: manual # 代表手动ACK确认

# 一些基本参数的设置

concurrency: 3

prefetch: 15

retry:

enabled: true

max-attempts: 5

max-concurrency: 10

3. 公共Constants类

/**

* @author Mr.Horse

* @version 1.0

* @description: {description}

* @date 2021/4/23 15:28

*/

public class Constants {

/**

* 第一个配置Queue,Exchange,Key(非注解方式)

*/

public final static String HORSE_SIMPLE_QUEUE = "HORSE_SIMPLE_QUEUE";

public final static String HORSE_SIMPLE_EXCHANGE = "HORSE_SIMPLE_EXCHANGE";

public final static String HORSE_SIMPLE_KEY = "HORSE_SIMPLE_KEY";

/**

* 第二个配置Queue,Exchange,Key(注解方式)

*/

public final static String HORSE_ANNOTATION_QUEUE = "HORSE_ANNOTATION_QUEUE";

public final static String HORSE_ANNOTATION_EXCHANGE = "HORSE_ANNOTATION_EXCHANGE";

public final static String HORSE_ANNOTATION_KEY = "HORSE_ANNOTATION_KEY";

//************************************延时消息队列配置信息**************************

/**

* 延时队列信息配置

*/

public final static String HORSE_DELAY_EXCHANGE = "HORSE_DELAY_EXCHANGE";

public final static String HORSE_DELAY_QUEUE = "HORSE_DELAY_QUEUE";

public final static String HORSE_DELAY_KEY = "HORSE_DELAY_KEY";

/**

* 死信队列

*/

public final static String HORSE_DEAD_EXCHANGE = "HORSE_DEAD_EXCHANGE";

public final static String HORSE_DEAD_QUEUE = "HORSE_DEAD_QUEUE";

public final static String HORSE_DEAD_KEY = "HORSE_DEAD_KEY";

//**************************************延时消息队列配置信息(插件版)******************************

/**

* 新延时队列信息配置

*/

public final static String HORSE_PLUGIN_EXCHANGE = "HORSE_PLUGIN_EXCHANGE";

public final static String HORSE_PLUGIN_QUEUE = "HORSE_PLUGIN_QUEUE";

public final static String HORSE_PLUGIN_KEY = "HORSE_PLUGIN_KEY";

}

简单消息队列(direct模式)

4. RabbitTemplate模板配置

主要定义消息投递Exchange成功回调函数和消息从Exchange投递到消息队列失败的回调函数。

package com-sun.rabbit;

import com.sun.org.apache.xpath.internal.operations.Bool;

import com-sun.constants.Constants;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.*;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.amqp.support.converter.Jackson2jsonMessageConverter;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @author Mr.Horse

* @version 1.0

* @description: {description}

* @date 2021/4/23 14:17

*/

@Configuration

public class RabbitConfig {

private static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);

@Autowired

private CachingConnectionFactory connectionFactory;

/**

* @return

*/

@Bean

public RabbitTemplate rabbitTemplate() {

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 触发setReturnCallback回调必须设置mandatory=true,否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调

rabbitTemplate.setMandatory(Boolean.TRUE);

// 设置序列化机制

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

// 消息由投递到Exchange中时触发的回调

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->

logger.info("消息发送到Exchange情况反馈:唯一标识:correlationData={},消息确认:ack={},原因:cause={}",

correlationData, ack, cause)

);

// 消息由Exchange发送到Queue时失败触发的回调

rabbitTemplate.setReturnsCallback((returnedMessage) -> {

// 如果是插件形式实现的延时队列,则直接返回

// 原因: 因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列,从而实现延时队列的操作

if (Constants.HORSE_PLUGIN_EXCHANGE.equals(returnedMessage.getExchange())) {

return;

}

logger.warn("消息由Exchange发送到Queue时失败:message={},replyCode={},replyText={},exchange={},rountingKey={}",

returnedMessage.getMessage(), returnedMessage.getReplyText(), returnedMessage.getReplyText(),

returnedMessage.getExchange(), returnedMessage.getRoutingKey());

});

return rabbitTemplate;

}

//*******************************************直接配置绑定关系*****************************************

/**

* 声明队列

*

* @return

*/

@Bean

public Queue horseQueue() {

return new Queue(Constants.HORSE_SIMPLE_QUEUE, Boolean.TRUE);

}

/**

* 声明指定模式交换机

*

* @return

*/

@Bean

public DirectExchange horseExchange() {

return new DirectExchange(Constants.HORSE_SIMPLE_EXCHANGE, Boolean.TRUE, Boolean.FALSE);

}

/**

* 绑定交换机,队列,路由Key

*

* @return

*/

@Bean

public Binding horseBinding() {

return BindingBuilder.bind(horseQueue()).to(horseExchange()).with(Constants.HORSE_SIMPLE_KEY);

}

}

5. 定义消息-

基于 @RabbitListenerzi注解,实现自定义消息-。主要有两种实现方式:

如果在配置类中声明了Queue、Excehange以及他们直接的绑定,这里直接指定队列进行消息监听

如果前面什么也没做,这里可以直接用注解的方式进行绑定实现消息监听

package com-sun.rabbit;

import com.rabbitmq.client.Channel;

import com-sun.constants.Constants;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.Exchange;

import org.springframework.amqp.rabbit.annotation.Queue;

import org.springframework.amqp.rabbit.annotation.QueueBinding;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

/**

* @author Mr.Horse

* @version 1.0

* @description: {description}

* @date 2021/4/23 14:58

*/

@Component

public class MsgListener {

private static Logger logger = LoggerFactory.getLogger(MsgListener.class);

/**

* 配置类中已经完成绑定,这里直接根据队列值接收

*

* @param message

* @param channel

* @param msg

*/

@RabbitListenerzi(queues = Constants.HORSE_SIMPLE_QUEUE)

public void customListener(Message message, Channel channel, String msg) {

// 获取每条消息唯一标识(用于手动ACK确认)

long tag = message.getMessageProperties().getDeliveryTag();

try {

logger.info(" ==> customListener接收" + msg);

// 手动ACK确认

channel.basicAck(tag, false);

} catch (IOException e) {

logger.error(" ==> 消息接收失败: {}", tag);

}

}

/**

* 根据注解的形式进行绑定接收

*

* @param message

* @param channel

* @param msg

*/

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = Constants.HORSE_ANNOTATION_QUEUE, durable = "true"),

exchange = @Exchange(value = Constants.HORSE_ANNOTATION_EXCHANGE, ignoreDeclarationExceptions = "true"),

key = {Constants.HORSE_ANNOTATION_KEY}

))

public void annotationListener(Message message, Channel channel, String msg) {

// 获取每条消息唯一标识(用于手动ACK确认)

long tag = message.getMessageProperties().getDeliveryTag();

try {

logger.info(" ==> annotationListener接收" + msg);

// 手动ACK确认

channel.basicAck(tag, false);

} catch (IOException e) {

logger.error(" ==> 消息接收失败: {}", tag);

}

}

}

6. 测试接口

这里发送100条消息:

奇数条到非注解方式的消息-

偶数条到注解式消息-

@GetMapping("/rabbit")

public void sendMsg() {

for (int i = 1; i <= 100; i++) {

String msg = "第" + i + "条消息";

logger.info("==> 发送" + msg);

if (i % 2 == 1) {

rabbitTemplate.convertAndSend(Constants.HORSE_SIMPLE_EXCHANGE, Constants.HORSE_SIMPLE_KEY, msg, new CorrelationData(String.valueOf(i)));

} else {

rabbitTemplate.convertAndSend(Constants.HORSE_ANNOTATION_EXCHANGE, Constants.HORSE_ANNOTATION_KEY, msg, new CorrelationData(String.valueOf(i)));

}

}

}

结果:自行测试过,非常成功:smile::smile::smile:

延时消息队列

原理:生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。

7. 配置绑定相关信息

/**

* @author Mr.Horse

* @version 1.0

* @description: {description}

* @date 2021/4/24 14:22

*/

@Configuration

public class DelayRabbitConfig {

private static Logger logger = LoggerFactory.getLogger(DelayRabbitConfig.class);

/**

* 声明延时队列交换机

*

* @return

*/

@Bean

public DirectExchange delayExchange() {

return new DirectExchange(Constants.HORSE_DELAY_EXCHANGE, Boolean.TRUE, Boolean.FALSE);

}

/**

* 声明死信队列交换机

*

* @return

*/

@Bean

public DirectExchange deadExchange() {

return new DirectExchange(Constants.HORSE_DEAD_EXCHANGE, Boolean.TRUE, Boolean.FALSE);

}

/**

* 声明延时队列 延时10s(单位:ms),并将延时队列绑定到对应的死信交换机和路由Key

*

* @return

*/

@Bean

public Queue delayQueue() {

Map args = new HashMap<>(3);

// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机

args.put("x-dead-letter-exchange", Constants.HORSE_DEAD_EXCHANGE);

// x-dead-letter-routing-key 这里声明当前队列的死信路由key

args.put("x-dead-letter-routing-key", Constants.HORSE_DEAD_KEY);

// x-message-ttl 声明队列的TTL(过期时间)

// 可以在这里直接写死,也可以进行动态的设置(推荐动态设置)

// args.put("x-message-ttl", 10000);

return QueueBuilder.durable(Constants.HORSE_DELAY_QUEUE).withArguments(args).build();

}

/**

* 声明死信队列

*

* @return

*/

@Bean

public Queue deadQueue() {

return new Queue(Constants.HORSE_DEAD_QUEUE, Boolean.TRUE);

}

/**

* 延时队列绑定管理

*

* @return

*/

@Bean

public Binding delayBinding() {

return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(Constants.HORSE_DELAY_KEY);

}

/**

* 死信队列绑定管理

*

* @return

*/

@Bean

public Binding deadBinding() {

return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(Constants.HORSE_DEAD_KEY);

}

//**********************************延时消息队列配置信息(插件版)************************************

@Bean

public Queue pluginQueue() {

return new Queue(Constants.HORSE_PLUGIN_QUEUE);

}

/**

* 设置延时队列的交换机,必须是 CustomExchange 类型交换机

* 参数必须,不能改变

* @return

*/

@Bean

public CustomExchange customPluginExchange() {

Map args = new HashMap<>(2);

args.put("x-delayed-type", "direct");

return new CustomExchange(Constants.HORSE_PLUGIN_EXCHANGE, "x-delayed-message", Boolean.TRUE, Boolean.FALSE, args);

}

@Bean

public Binding pluginBinding() {

return BindingBuilder.bind(pluginQueue()).to(customPluginExchange()).with(Constants.HORSE_PLUGIN_KEY).noargs();

}

}

8. 定义延时-

/**

* @author Mr.Horse

* @version 1.0

* @description: {description}

* @date 2021/4/24 14:51

*/

@Component

public class DelayMsgListener {

private static Logger logger = LoggerFactory.getLogger(DelayMsgListener.class);

/**

* 监听死信队列

*

* @param message

* @param channel

* @param msg

*/

@RabbitListener(queues = Constants.HORSE_DEAD_QUEUE)

public void consumeDeadListener(Message message, Channel channel, String msg) {

long tag = message.getMessageProperties().getDeliveryTag();

try {

logger.info(" ==> consumeDeadListener接收" + msg);

// 手动ACK确认

channel.http://basicAck(tag, false);

} catch (IOException e) {

logger.error(" ==> 消息接收失败: {}", tag);

}

}

/**

* 监听延时队列(插件版)

*

* @param message

* @param channel

* @param msg

*/

@RabbitListener(queues = Constants.HORSE_PLUGIN_QUEUE)

public void consumePluginListener(Message message, Channel channel, String msg) {

long tag = message.getMessageProperties().getDeliveryTag();

try {

logger.info(" ==> consumePluginListener" + msg);

// 手动ACK确认

channel.basicAck(tag, false);

} catch (IOException e) {

logger.error(" ==> 消息接收失败: {}", tag);

}

}

}

9. 测试接口

// 基于特性的延时队列

@GetMapping("/delay/rabbit")

public void delayMsg(@RequestParam("expire") Long expire) {

for (int i = 1; i <= 10; i++) {

String msg = "第" + i + "条消息";

logger.info("==> 发送" + msg);

// 这里可以动态的设置过期时间

rabbitTemplate.convertAndSend(Constants.HORSE_DELAY_EXCHANGE, Constants.HORSE_DELAY_KEY, msg,

message -> {

message.getMessageProperties().setExpiration(String.valueOf(expire));

return message;

},

new CorrelationData(String.valueOf(i)));

}

}

// 基于插件的延时队列

@GetMapping("/delay/plugin")

public void delayPluginMsg(@RequestParam("expire") Integer expire) {

for (int i = 1; i <= 10; i++) {

String msg = "第" + i + "条消息";

logger.info("==> 发送" + msg);

// 动态设置过期时间

rabbitTemplate.convertAndSend(Constants.HORSE_PLUGIN_EXCHANGE, Constants.HORSE_PLUGIN_KEY, msg, message -> {

message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

message.getMessageProperties().setDelay(expire);

return message;

}, new CorrelationData(String.valueOf(i)));

}

}

结果:你懂的:scream_cat::scream_cat::scream_cat:

RabbitMQ的基础使用演示到此结束。

总结

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:前端混合app开发框架(前端混合app开发框架图)
下一篇:优质桌面应用下载软件安全吗(优质桌面应用下载软件安全吗)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~