SpringACK对RabbitMQ消息的确认(消费)

网友投稿 767 2022-12-01

SpringACK对RabbitMQ消息的确认(消费)

SpringACK对RabbitMQ消息的确认(消费)

SpringAMQP对RabbitMQ消息的确认(消费)

之前已经简单介绍了基本是从发送方去确认的,我们需要在配置文件当中开启发送方确认模式,共育两种,一种是相对于交换机一个是相对于队列。

本次的介绍是基于消费者对消息的确认,也就是基本的逻辑是消费者对消息处理的确认。

基本上生产者这边的代码是不需要去改变的,但是我们需要让消费者去正确的人发送到消息。我们按照什么形式都可以,确认与不确认都可以,因为本次主要是为了测试消费端对消息的处理确认。

首先生产者的配置和相关的代码

spring:# profiles:# active: dev rabbitmq: host: #远程主机外网地址 username: shabi #远程用户名 password: #密码 virtual-host: shabi #虚拟机名称 port: 5672 #远程主机端口名称 publisher-confirm-type: correlated #开启确认模式 publisher-returns: true

package com.jgdabc.rabbitconfig;import com.rabbitmq.client.ConnectionFactory;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitConfig { //交换机 public static final String Exchange_Name = "boot_rabbit_topic_ee"; public static final String Queue_Name = "boot_rabbit_topic_qqq"; @Bean("bootExchange") //交换机的创建 public Exchange bootExchange() { return ExchangeBuilder-icExchange(Exchange_Name).durable(true).build(); //绑定一个topic类型的交换机,持久化并构建 } @Bean("bootQueue") //队列的创建 public Queue bootQueue() { return QueueBuilder.durable(Queue_Name).build(); }// 队列和交换机的绑定关系// 哪个队列// 哪个交换机// routing key// 这里不写的话会按照方法名注入 @Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); }}

package com.jgdabc;import com.jgdabc.rabbitconfig.RabbitConfig;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.core.ReturnedMessage;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Import;import org.springframework.test.context.junit4.SpringRunner;import java.util.*;import java.util.stream.IntStream;@Slf4j@SpringBootTest@RunWith(SpringRunner.class)public class DemoApplicationTests { // 注入RabbitTemplate @Autowired private RabbitTemplate template; @Test public void testSend() { template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "hi"); } /** * 在yml配置文件当中开启去人模式 * 在RabbitTemplate定义ConfirmCallBack回调函数 */ @Test public void testConfirm() { //定义回调 template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println(b); System.out.println("confirm 方法被执行了"); if (!b) { //接收成功 System.out.println("消息成功接收"); } else { System.out.println("消息接受失败," + b); } } }); //发送一条消息 template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "你好,我的小宝贝"); }// 回退模式,当消息发送给Exchange后,Exchange路由到Queue失败后才会执行ReturnCallBack /** * 回退模式 * 1:在yml文件当中开启回退模式 * 2:设置ReturnCallBack * 3:设置Exchange处理消息的模式 * <1:如果消息没有路由到Queue,那么丢弃掉消息(默认) * <2:如果路由没有回退到Queue,返回给消息发送方 */ @Test public void testReturn() {// 设置交换机处理消息的模式 template.setMandatory(true);//设置为true交换机会将路由到队列失败的消息再返回给发送者 template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("消息对象:" + returnedMessage.getMessage()); System.out.println("错误码:" + returnedMessage.getReplyCode()); System.out.println("错误信息:" + returnedMessage.getReplyText()); System.out.println("交换机:" + returnedMessage.getExchange()); System.out.println("路由键:" + returnedMessage.getRoutingKey()); System.out.println("return执行了..."); } }); template.convertAndSend(RabbitConfig.Exchange_Name, "boot.haha", "hi"); } }

然后是这次主要介绍的消费端。

先看配置

spring: rabbitmq: host: username: password: virtual-host: port: 5672# publisher-confirm-type: correlated# publisher-returns: true# 开启ack也就是手动消息确认 listener:# 设置手动确认 simple: acknowledge-mode:

具体的类,

package com.jgdabc.boot_rabbit_consumer;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.List;/** * consumer ack 机制 * 设置手动签收,acknowledge = “manual” * 如果消息成功处理,则调用channel的basicAck签收 * // * 如果消息处理失败,则调用channel的basicNack拒绝签收,broker重新发送给consumer */@Componentpublic class ConsumerSpringbootApplication implements ChannelAwareMessageListener { @RabbitListener(queues = "boot_rabbit_topic_qqq") //指定要消费消息的队 public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("接收转换消息:" + new String(message.getBody()));// 手动签收 channel.basicAck(deliveryTag, true); } catch (IOException e) { channel.basicNack(deliveryTag, true, true); }// 第二个参数代表运行多条消息被签收// 拒绝签收,第三个参数重回队列,如果设置为true,则消息重新回到队列 } public void onMessage(Message message) {// System.out.println(message); } }

这个方法具体没有用,之所以写上,是因为我实现上边那个类的时候,如果不实现这个方法的话,那么启动就会报错。所以就写上了。

然后主要在说明一些参数

long deliveryTag = message.getMessageProperties().getDeliveryTag(); message.getMessageProperties ().getMessageId () 获取 MessageID,获取的 MessageID 可以用来判断是否已经被消费者消费过了,如果已经消费则取消再次消费。

下面这里加了一个异常的捕获,因为可能消费者这个处理消息出错,所以进行了异常的捕获。首先一定是接收了具体的消息。然后会进行一个签收

channel.basicAck (long deliveryTag, boolean multiple)为消息确认,参数1:消息的id;参数2:是否批量应答。basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue,与basic.reject区别就是同时支持多个消息,可以nack该消费者先前接收未ack的所有消息。nack后的消息也会被自己消费到。

try { System.out.println("接收转换消息:" + new String(message.getBody()));// 手动签收 channel.basicAck(deliveryTag, true); } catch (IOException e) { channel.basicNack(deliveryTag, true, true); }

这里只是列举一些方法的使用,当然还有其他的方法,后面慢慢来熟悉好了。打开这个管理面板,可以看到没有队列,这里提前已经删除掉之前的创建好的队列和交换机了,为的是为了是运行展示后的效果比较明显一些。

交换机和队列都是可以在程序中创建和绑定的。

现在我们在生产者测试类去生产一条消息。可以随便去用一个方法就可以了。

我们就运行这个方法

因为没有做错误,所以不会有错误信息输出的。

现在我们去面板看,可以看到这里就自动创建出来队列和生产了一条消息,当然交换机的创建和队列的绑定也是执行了。

现在我们在消费者去消费,执行的话,我们就去执行启动类就好。

因为我们这个类加上了这个注解,其实就是已经实例化给spring了。表明了已经成为spring的一个组件,所以直接去启动启动运行类就好了。

你看这里就接收到消息了,并且会处于一个持续运行的等待过程。

同时消费处理成功验证。

现在我们可以去让程序出错,来验证消息处理失败情况。

我们在签收之前让代码出一个错。

哦对了,这个异常是算数异常,我们之前捕获一个大的异常算了。

下面那段改成这样。

现在重新开始之前的步骤。然后这里器是会一直打印这段话,主要是因为我们设置basic.nack方法为不确认deliveryTag对应的消息,第二个参数是否应用于多消息,第三个参数是否requeue。我们这里出现异常,第二个参数为true,代表不确认,第三个代表重新让它回到队列,设置为true该行消息重新回到队列,但是我们这里会持续接收进行接收消费,于是来来回回就形成了死循环。

同时验证我们这里设置的重回队列确实生效。

大概就是这样的一个模式,当热这种处理模式并不是合适的,主要是举个例子,其他的方法处理模式顺着这个模板来就行了。

主要是为了忘记后好回顾,必要的时候直接就地取材。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:KQL和Lucene的区别
下一篇:大数据ClickHouse(九):MergeTree系列表引擎之ReplacingMergeTree
相关文章

 发表评论

暂时没有评论,来抢沙发吧~