SpringBoot整合RabbitMQ实战教程附死信交换机

网友投稿 970 2022-09-29

SpringBoot整合RabbitMQ实战教程附死信交换机

SpringBoot整合RabbitMQ实战教程附死信交换机

目录前言环境配置配置文件业务消费者死信消费者测试

前言

使用springboot,实现以下功能,有两个队列1、2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那么就将消息发送到死信队列进行统一处理,例如记录数据库、报警等完整demo项目代码https://gitee.com/daenmax/rabbit-mq-demo

环境

Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.41.双击C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat启动MQ服务2.然后访问http://localhost:15672/,默认账号密码均为guest,3.手动添加一个虚拟主机为admin_host,手动创建一个用户账号密码均为admin

pom.xml

org.springframework.boot

spring-boot-starter-amqp

2.7.0

配置

spring:

rabbitmq:

host: 127.0.0.1

port: 5672

username: admin

password: admin

virtual-host: admin_host

publisher-confirm-type: correlated

publisher-returns: true

listener:

simple:

acknowledge-mode: manual

retry:

enabled: true #开启失败重试

max-attempts: 3 #最大重试次数

initial-interval: 1000 #重试间隔时间 毫秒

配置文件

RabbitConfig

package com.example.rabitmqdemo.mydemo.config;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.stereotype.Component;

import java.util.HashMap;

import java.util.Map;

/**

* Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,

* Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

* Queue:消息的载体,每个消息都会被投到一个或多个队列。

* Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.

* Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

* vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。

* Producer:消息生产者,就是投递消息的程序.

* Consumer:消息消费者,就是接受消息的程序.

* Channel:消息通道,在客户端的每个连接里,可建立多个channel.

*/

@Slf4j

@Component

public class RabbitConfig {

//业务交换机

public static final String EXCHANGE_PHCP = "phcp";

//业务队列1

public static final String QUEUE_COMPANY = "company";

//业务队列1的key

public static final String ROUTINGKEY_COMPANY = "companyKey";

//业务队列2

public static final String QUEUE_PROJECT = "project";

//业务队列2的key

public static final String ROUTINGKEY_PROJECT = "projectKey";

//死信交换机

public static final String EXCHANGE_PHCP_DEAD = "phcp_dead";

//死信队列1

public static final String QUEUE_COMPANY_DEAD = "company_dead";

//死信队列2

public static final String QUEUE_PROJECT_DEAD = "project_dead";

//死信队列1的key

public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead";

//死信队列2的key

public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";

// /**

// * 解决重复确认报错问题,如果没有报错的话,就不用启用这个

// *

// * @param connectionFactory

// * @return

// */

// @Bean

// public RabbitListenerContainerFactory> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {

// SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

// factory.setConnectionFactory(connectionFactory);

// factory.setMessageConverter(new Jackson2jsonMessageConverter());

// factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

// return factory;

// }

/**

* 声明业务交换机

* 1. 设置交换机类型

* 2. 将队列绑定到交换机

* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念

* HeadersExchange :通过添加属性key-value匹配

* DirectExchange:按照routingkey分发到指定队列

* TopicExchange:多关键字匹配

*/

@Bean("exchangePhcp")

public DirectExchange exchangePhcp() {

return new DirectExchange(EXCHANGE_PHCP);

}

* 声明死信交换机

@Bean("exchangePhcpDead")

public DirectExchange exchangePhcpDead() {

return new DirectExchange(EXCHANGE_PHCP_DEAD);

* 声明业务队列1

*

* @return

@Bean("queueCompany")

public Queue queueCompany() {

Map arguments = new HashMap<>(2);

arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);

//绑定该队列到死信交换机的队列1

arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD);

return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build();

* 声明业务队列2

@Bean("queueProject")

public Queue queueProject() {

//绑定该队列到死信交换机的队列2

arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD);

return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build();

* 声明死信队列1

@Bean("queueCompanyDead")

public Queue queueCompanyDead() {

return new Queue(QUEUE_COMPANY_DEAD);

* 声明死信队列2

@Bean("queueProjectDead")

public Queue queueProjectDead() {

return new Queue(QUEUE_PROJECT_DEAD);

* 绑定业务队列1和业务交换机

* @param queue

* @param directExchange

@Bean

public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {

return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY);

* 绑定业务队列2和业务交换机

public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {

return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT);

* 绑定死信队列1和死信交换机

public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {

return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD);

* 绑定死信队列2和死信交换机

public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {

return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);

}

生产者

RabbltProducer

package com.example.rabitmqdemo.mydemo.producer;

import com.example.rabitmqdemo.mydemo.config.RabbitConfig;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.*;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import javax.annotation.Resource;

import java.nio.charset.StandardCharsets;

import java.util.UUID;

@Component

@Slf4j

public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{

@Resource

private RabbitTemplate rabbitTemplate;

/**

* 初始化消息确认函数

*/

@PostConstruct

public void init() {

rabbitTemplate.setConfirmCallback(this);

rabbitTemplate.setReturnsCallback(this);

rabbitTemplate.setMandatory(true);

}

/**

* 发送消息服务器确认函数

* @param correlationData

* @param ack

* @param cause

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if (ack) {

System.out.println("消息发送成功" + correlationData);

} else {

System.out.println("消息发送失败:" + cause);

}

}

/**

* 消息发送失败,消息回调函数

* @param returnedMessage

*/

@Override

public void returnedMessage(ReturnedMessage returnedMessage) {

String str = new String(returnedMessage.getMessage().getBody());

System.out.println("消息发送失败:" + str);

}

/**

* 处理消息发送到队列1

* @param str

*/

public void sendCompany(String str){

MessageProperties messageProperties = new MessageProperties();

messageProperties.setContentType("application/json");

Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);

//也可以用下面的方式

//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

//this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);

}

/**

* 处理消息发送到队列2

* @param str

*/

public void sendProject(String str){

MessageProperties messageProperties = new MessageProperties();

messageProperties.setContentType("application/json");

Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);

//也可以用下面的方式

//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

//this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);

}

}

业务消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

/**

* 监听业务交换机

* @author JeWang

*/

@Component

@Slf4j

public class RabbitConsumer {

/**

* 监听业务队列1

* @param message

* @param channel

* @throws IOException

*/

@RabbitListener(queues = "company")

public void company(Message message, Channel channel) throws IOException {

try{

System.out.println("次数" + message.getMessageProperties().getDeliveryTag());

channel.basicQos(1);

Thread.sleep(2000);

String s = new String(message.getBody());

log.info("处理消息"+s);

//下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机

//String str = null;

//str.split("1");

//处理成功,确认应答

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}catch (Exception e){

log.error("处理消息时发生异常:"+e.getMessage());

Boolean redelivered = message.getMessageProperties().getRedelivered();

if(redelivered){

log.error("异常重试次数已到达设置次数,将发送到死信交换机");

channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);

}else {

log.error("消息即将返回队列处理重试");

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

}

}

}

/**

* 监听业务队列2

* @param message

* @param channel

* @throws IOException

*/

@RabbitListener(queues = "project")

public void project(Message message, Channel channel) throws IOException {

try{

System.out.println("次数" + message.gehttp://tMessageProperties().getDeliveryTag());

channel.basicQos(1);

Thread.sleep(2000);

String s = new String(message.getBody());

log.info("处理消息"+s);

//下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机

//String str = null;

//str.split("1");

//处理成功,确认应答

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}catch (Exception e){

log.error("处理消息时发生异常:"+e.getMessage());

Boolean redelivered = message.getMessageProperties().getRedelivered();

if(redelivered){

log.error("异常重试次数已到达设置次数,将发送到死信交换机");

channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);

}else {

log.error("消息即将返回队列处理重试");

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

}

}

}

}

死信消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.io.IOException;

/**

* 监听死信交换机

* @author JeWang

*/

@Component

@Slf4j

public class RabbitConsumerDead {

/**

* 处理死信队列1

* @param message

* @param channel

* @throws IOException

*/

@RabbitListener(queues = "company_dead")

public void company_dead(Message message, Channel channel) throws IOException {

try{

channel.basicQos(1);

String s = new String(message.getBody());

log.info("处理死信"+s);

//在此处记录到数据库、报警之类的操作

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}catch (Exception e){

log.error("接收异常:"+e.getMessage());

}

}

/**

* 处理死信队列2

* @param message

* @param channel

* @throws IOException

*/

http:// @RabbitListener(queues = "project_dead")

public void project_dead(Message message, Channel channel) throws IOException {

try{

channel.basicQos(1);

String s = new String(message.getBody());

log.info("处理死信"+s);

//在此处记录到数据库、报警之类的操作

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

}catch (Exception e){

log.error("接收异常:"+e.getMessage());

}

}

}

测试

MqController

package com.example.rabitmqdemo.mydemo.controller;

import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;

import lombok.extern.slf4j.Slf4j;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RequestMapping("/def")

@RestController

@Slf4j

public class MsgController {

@Resource

private RabbltProducer rabbltProducer;

@RequestMapping("/handleCompany")

public void handleCompany(@RequestBody String jsonStr){

rabbltProducer.sendCompany(jsonStr);

}

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:浅析小程序中text文本组件的使用方法(微信小程序text标签)
下一篇:网站的可扩展架构
相关文章

 发表评论

暂时没有评论,来抢沙发吧~