RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合

网友投稿 898 2022-12-03

RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合

RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合

目录1. 概述2. 场景说明3. 与Springboot的整合3.1 引入依赖3.2 生产服务配置3.3 生产服务代码3.4 消费服务配置3.5 消费服务代码3.6 Rest 测试代码4. 综述

1. 概述

老话说的好:做人要懂得变通,善于思考,有时稍微转个弯,也许问题就解决了。

言归正传,之前我们聊了 RabbitMQ 3.9.7 镜像模式集群的搭建,今天我们来聊聊 RabbitMQ 3.9.7 镜像模式集群与Springboot 2.5.5 整合。

2. 场景说明

服务器A IP:192.168.1.22

服务器B IP:192.168.1.8

服务器C IP:192.168.1.144

此三台服务器上已搭建好了 RabbitMQ镜像模式集群,镜像模式集群的搭建,可参见我的上一篇文章。

3. 与Springboot的整合

3.1 引入依赖

org.springframework.boot

spring-boot-starter-parent

2.5.5

org.spCnIRwxDringframework.boot

spring-boot-starter-amqp

org.spCnIRwxDringframework.boot

spring-boot-starter-amqp

3.2 生产服务配置

spring:

rabbitmq:

addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672

username: guest

password: guest

virtual-host: /

connection-timeout: 16000

# 启用消息确认模式

publisher-confirm-type: correlated

# 启用 return 消息模式

publisher-returns: true

template:

mandatory: true

3.3 生产服务代码

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageHeaders;

import org.springframework.messaging.support.MessageBuilder;

import org.springframework.stereotype.Component;

impoCnIRwxDrt java.util.Map;

@Component

public class Producer {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 确认回调

*/

final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

// correlationData 唯一标识

// ack mq是否收到消息

// cause 失败原因

System.out.println("correlationData:" + correlationData.getId());

System.out.println("ack:" + ack);

System.out.println("cause:" + cause);

}

};

/**

* 发送消息

* @param messageBody 消息体

* @param headers 附加属性

* @throws Exception

*/

public void sendMessage(String messageBody, Map headers, String id) throws Exception {

MessageHeaders messageHeaders = new MessageHeaders(headers);

Message message = MessageBuilder.createMessage(messageBody, messageHeaders);

rabbitTemplate.setConfirmCallback(confirmCallback);

String exchangeName = "exchange-hello";

String routingKey = "test.123";

CorrelationData correlationData = new CorrelationData(id);

rabbitTemplate.convertAndSend(exchangeName, routingKey, message, new MessagePostProcessor() {

/**

* 发送消息后做的事情

* @param message

* @return

* @throws AmqpException

*/

@Override

public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {

return message;

}

}, correlationData);

}

}

3.4 消费服务配置

spring:

rabbitmq:

addresses: 192.168.1.22:5672,192.168.1.8:5672,192.168.1.144:5672

username: guest

password: guest

virtual-host: /

connection-timeout: 16000

listener:

simple:

# 设置为手工ACK

acknowledge-mode: manual

concurrency: 5

prefetch: 1

max-concurrency: 10

3.5 消费服务代码

import com.rabbitmq.client.Channel;

import org.springframework.amqp.rabbit.annotation.*;

import org.springframework.amqp.support.AmqpHeaders;

import org.springframework.messaging.Message;

import org.springframework.stereotype.Component;

@Component

public class Consumer {

@RabbitListener(bindings = @QueueBinding(

value = @Queue(value = "queue-hello", durable = "true"),

exchange = @Exchange(value = "exchange-hello" , durable = "true", type = "topic"),

key = "test.*"

))

@RabbitHandler

public void onMessage(Message message, Channel channel) throws Exception {

System.out.println("收到消息:" + message.getPayload());

Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

channel.basicAck(deliveryTag, false);

}

}

3.6 Rest 测试代码

@RestController

@RequestMapping("/mq")

public class RabbitmqController {

@Autowired

private Producer producer;

@GetMapping("/sendMessage")

public String sendMessage(@RequestParam String messageBody, @RequestParam String id) throws Excepthttp://ion {

Map headers = new HashMap<>();

producer.sendMessage(messageBody, headers, id);

return "success";

}

}

4. 综述

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:快速入手IntelliJ IDEA基本配置
下一篇:SpringCloud学习笔记之SpringCloud搭建父工程的过程图解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~