3-RabbitMQ入门-生产者-消费者

网友投稿 1053 2022-09-29

3-RabbitMQ入门-生产者-消费者

3-RabbitMQ入门-生产者-消费者

3-RabbitMQ入门-生产者-消费者

需求:

使用简单模式完成消息传递

官网的工作模式介绍

那么下面呢,我们可以参考一下步骤进行案例演示:

步骤:

创建工程(生产者、消费者)分别添加依赖编写生产者发送消息编写消费者接收消息

搭建示例工程

1.创建工程

创建两个空的maven工程:

生产者 rabbitmq-producer

消费者 rabbitmq-consumer

2. 添加依赖

两个工程的 pom.xml 添加 rabbitmq 的客户端依赖:

com.rabbitmq amqp-client 5.6.0 org.apache.maven.plugins maven-compiler-plugin 3.8.0 1.8 1.8

生产者 rabbitmq-producer 的 pom.xml

arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 * 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("hello_world", true, false, false, null); //6. 发送消息 channel.basicPublish /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数: 1. exchange:交换机名称。简单模式下交换机会使用默认的 "" 2. routingKey:路由名称 3. props:配置信息 4. body:发送消息数据 */ String body = "hello rabbitmq~~~~"; channel.basicPublish("", "hello_world", null, body.getBytes()); //7. 释放资源 channel.close(); connection.close(); }}

2. 在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:

可以看到已经在 /test 下创建了 hello_world 的队列,下面我们来查看消息:

可以看查看队列中发送的消息了。那么下一步我们就来实现如何消费这条数据。

编写消费者

1.编写消息消费者

package com.lijw.consumer;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * @author Aron.li * @date 2022/3/2 16:16 */public class Consumer_HelloWorld { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("127.0.0.1"); // ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/test"); //虚拟机 默认值 / factory.setUsername("libai"); // 用户名 默认 guest factory.setPassword("libai"); //密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("hello_world", true, false, false, null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("标识 consumerTag: " + consumerTag); System.out.println("获取交换机信息: " + envelope.getExchange()); System.out.println("获取路由key: " + envelope.getRoutingKey()); System.out.println("获取 DeliveryTag: " + envelope.getDeliveryTag()); System.out.println("配置信息 properties: " + properties); System.out.println("接收队列的数据 body: " + new String(body)); } }; channel.basicConsume("hello_world",true,consumer); //不需要关闭资源,因为消费者需要持续监听队列信息 }}

2.启动消费者,读取消息数据:

标识 consumerTag: amq.ctag-kLt7tOMR4BxiEEQjI6s4WA获取交换机信息: 获取路由key: hello_world获取 DeliveryTag: 1配置信息 properties: #contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)接收队列的数据 body: hello rabbitmq~~~~

小结

上述的入门案例中中其实使用的是如下的简单模式:

1555991074575

在上图的模型中,有以下概念:

P:生产者,也就是要发送消息的程序C:消费者:消息的接受者,会一直等待消息到来。queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:关于微信支付小程序v3【附PHP完整后端代码】(微信支付v3 php)
下一篇:小程序开发需要的技术有哪些?(小程序开发涉及的技术)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~