RabbitMQ使用交换机处理异步消息队列------分布式事务处理案例

网友投稿 753 2022-11-24

RabbitMQ使用交换机处理异步消息队列------分布式事务处理案例

RabbitMQ使用交换机处理异步消息队列------分布式事务处理案例

RabbitMQ使用交换机处理异步消息队列案例的安装环境可以参考​​ RabbitMQ环境准备/环境搭建​​,

本片在RabbitMQ环境已有的基础上讲述RabbitMQ使用交换机处理异步消息队列------分布式事务处理案例具体过程

消息队列的持久化固化到磁

创建代码如下:

1、新建.NET Core console控制台项目ConsoleRabbitMQ项目(生产者 productor,即产生消息的)以及ConsoleRabbitMQ01项目(consumer消费者,即使用消息的)

2、对控制台项目使用NuGet程序管理包添加RabbitMQ.Client

3、控制台项目ConsoleRabbitMQ项目的Program代码如下

using RabbitMQ.Client;using System;namespace ConsoleRabbitMQ{ class Program { static void Main(string[] args) { Console.WriteLine("RabbitMQ 生产者开始。。。生产。。。!"); #region RabbitMQ 生产者 var connectionFactory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using (var connection = connectionFactory.CreateConnection()) { using var channel = connection.CreateModel(); // durable: true 队列持久化 channel.QueueDeclare(queue: "myqueue", durable: true, false, false, null); //durable: true 交换机持久化 channel.ExchangeDeclare(exchange: "myexchange", ExchangeType.Direct, durable: true, false, null); //持久化消息,告诉消息队列,该条消息需要持久化和固化到磁盘 var propertyPersist = channel.CreateBasicProperties(); propertyPersist.Persistent = true; channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "myexchangekey", null); #region Tx事务处理,不推荐使用,处理过程较复杂 //channel.TxSelect();//开起事务 1 //for (int i = 0; i < 100; i++) //{ // var body = System.Text.Encoding.UTF8.GetBytes($"这是发布的数据。{i}。"); // //持久化消息 basicProperties: propertyPersist // channel.BasicPublish(exchange: "myexchange", routingKey: "myexchangekey", basicProperties: propertyPersist, body);//发送消息给消息队列,之后消息队列收到以后会进行初持久化处理,存储路径C:\Users\Administrator\AppData下面的RabbitMQ,query文件中 // System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); //} //try //{ // channel.TxCommit();//提交事务 1 //} //catch (Exception ex) //{ // //这个说明生产者发送消息到消息队列时出错了,这里可以记录错误,也可以重试再次发送等等处理 // Console.WriteLine($"RabbitMQ 生产者发送消息到消息队列时出错了,错误信息:{ex.Message}"); // channel.TxRollback();//回滚事务 1 //} #endregion #region Tx事务处理,推荐使用 try { channel.ConfirmSelect();//开起消息确认模式 2 这个rabbitmq的扩展,可以看成一个回调 for (int i = 0; i < 100; i++) { var body = System.Text.Encoding.UTF8.GetBytes($"这是发布的数据。{i}。"); //持久化消息 basicProperties: propertyPersist channel.BasicPublish(exchange: "myexchange", routingKey: "myexchangekey", basicProperties: propertyPersist, body);//发送消息给消息队列,之后消息队列收到以后会进行初持久化处理,存储路径C:\Users\Administrator\AppData下面的RabbitMQ,query文件中 System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); } //使用下面2中确认方式 //第一种 if (channel.WaitForConfirms())//返回true 表示消息发送到消息队列,否则发送失败 { Console.WriteLine("RabbitMQ 生产者发送消息到消息队成功"); } //第二种 //channel.WaitForConfirmsOrDie();//确认消息发送到消息队列,发送成功则继续执行,否则即没发成功的话就会报错,抛出异常,在catch中捕获处理 } catch (Exception ex) { //这个说明生产者发送消息到消息队列时出错了,这里可以记录错误,也可以重试再次发送等等处理 Console.WriteLine($"RabbitMQ 生产者发送消息到消息队列时出错了,错误信息:{ex.Message}"); channel.TxRollback();//回滚事务 2 } #endregion } #endregion Console.WriteLine("RabbitMQ 输入任何字符退出。。"); Console.Read(); } }}

4、控制台项目ConsoleRabbitMQ01项目的Program代码如下

using RabbitMQ.Client;using System;using RabbitMQ.Client.Events;namespace ConsoleRabbitMQ01{ class Program { static void Main(string[] args) { Console.WriteLine("RabbitMQ 消费者开始。。。消费。。。!"); #region RabbitMQ 消费者 var connectionFactory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using (var connection = connectionFactory.CreateConnection()) { using var channel = connection.CreateModel(); // durable: true 队列持久化 channel.QueueDeclare(queue: "myqueue", durable: true, false, false, null); //durable: true 交换机持久化 channel.ExchangeDeclare(exchange: "myexchange", ExchangeType.Direct, durable: true, false, null); channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "myexchangekey", null); var consumer = new EventingBasicConsumer(channel);//消费事件 consumer.Received += (sender, e) => { //下面操作包括事务处理 var body = System.Text.Encoding.UTF8.GetString(e.Body.ToArray()); // //处理消息具体处理过程 Console.WriteLine("RabbitMQ 消费者已经消费消息"); // ////手动确认,正常消费,通知消息中心,该条消息可以删除了,手动确认的话,自动确认要设置为false,autoAck: true, //channel.BasicAck(e.DeliveryTag, false); //channel.BasicConsume(queue: "myqueue", autoAck: false, consumer); ////手动确认,非正常消费即出错出现异常,通知消息中心,手动确认的话,自动确认要设置为false,autoAck: true, //BasicReject 中requeue: true 告诉消息队列,出错,但是重新把消息插入到队列中,下次使用 //BasicReject 中requeue: false 告诉消息队列,出错,删除该条消息 channel.BasicReject(e.DeliveryTag,requeue: true); //channel.BasicConsume(queue: "myqueue", autoAck: false, consumer); //autoAck: true,自动确认,表示已成功从消息队列中读取消息,通知消息队列 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer); }; } #endregion Console.WriteLine("RabbitMQ 输入任何字符退出。。"); Console.Read(); } }}

5、启动RabbitMQ服务,输入命令:rabbitmq-service start

6、分别启动ConsoleRabbitMQ.exe和ConsoleRabbitMQ01.exe这个两个项目

龙腾一族至尊龙骑

1、新建.NET Core console控制台项目ConsoleRabbitMQ项目(生产者 productor,即产生消息的)以及ConsoleRabbitMQ01项目(consumer消费者,即使用消息的)

2、对控制台项目使用NuGet程序管理包添加RabbitMQ.Client

3、控制台项目ConsoleRabbitMQ项目的Program代码如下

using RabbitMQ.Client;using System;namespace ConsoleRabbitMQ{ class Program { static void Main(string[] args) { Console.WriteLine("RabbitMQ 生产者开始。。。生产。。。!"); #region RabbitMQ 生产者 var connectionFactory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using (var connection = connectionFactory.CreateConnection()) { using var channel = connection.CreateModel(); // durable: true 队列持久化 channel.QueueDeclare(queue: "myqueue", durable: true, false, false, null); //durable: true 交换机持久化 channel.ExchangeDeclare(exchange: "myexchange", ExchangeType.Direct, durable: true, false, null); //持久化消息,告诉消息队列,该条消息需要持久化和固化到磁盘 var propertyPersist = channel.CreateBasicProperties(); propertyPersist.Persistent = true; channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "myexchangekey", null); #region Tx事务处理,不推荐使用,处理过程较复杂 //channel.TxSelect();//开起事务 1 //for (int i = 0; i < 100; i++) //{ // var body = System.Text.Encoding.UTF8.GetBytes($"这是发布的数据。{i}。"); // //持久化消息 basicProperties: propertyPersist // channel.BasicPublish(exchange: "myexchange", routingKey: "myexchangekey", basicProperties: propertyPersist, body);//发送消息给消息队列,之后消息队列收到以后会进行初持久化处理,存储路径C:\Users\Administrator\AppData下面的RabbitMQ,query文件中 // System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); //} //try //{ // channel.TxCommit();//提交事务 1 //} //catch (Exception ex) //{ // //这个说明生产者发送消息到消息队列时出错了,这里可以记录错误,也可以重试再次发送等等处理 // Console.WriteLine($"RabbitMQ 生产者发送消息到消息队列时出错了,错误信息:{ex.Message}"); // channel.TxRollback();//回滚事务 1 //} #endregion #region Tx事务处理,推荐使用 try { channel.ConfirmSelect();//开起消息确认模式 2 这个rabbitmq的扩展,可以看成一个回调 for (int i = 0; i < 100; i++) { var body = System.Text.Encoding.UTF8.GetBytes($"这是发布的数据。{i}。"); //持久化消息 basicProperties: propertyPersist channel.BasicPublish(exchange: "myexchange", routingKey: "myexchangekey", basicProperties: propertyPersist, body);//发送消息给消息队列,之后消息队列收到以后会进行初持久化处理,存储路径C:\Users\Administrator\AppData下面的RabbitMQ,query文件中 System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); } //使用下面2中确认方式 //第一种 if (channel.WaitForConfirms())//返回true 表示消息发送到消息队列,否则发送失败 { Console.WriteLine("RabbitMQ 生产者发送消息到消息队成功"); } //第二种 //channel.WaitForConfirmsOrDie();//确认消息发送到消息队列,发送成功则继续执行,否则即没发成功的话就会报错,抛出异常,在catch中捕获处理 } catch (Exception ex) { //这个说明生产者发送消息到消息队列时出错了,这里可以记录错误,也可以重试再次发送等等处理 Console.WriteLine($"RabbitMQ 生产者发送消息到消息队列时出错了,错误信息:{ex.Message}"); channel.TxRollback();//回滚事务 2 } #endregion } #endregion Console.WriteLine("RabbitMQ 输入任何字符退出。。"); Console.Read(); } }}

4、控制台项目ConsoleRabbitMQ01项目的Program代码如下

using RabbitMQ.Client;using System;using RabbitMQ.Client.Events;namespace ConsoleRabbitMQ01{ class Program { static void Main(string[] args) { Console.WriteLine("RabbitMQ 消费者开始。。。消费。。。!"); #region RabbitMQ 消费者 var connectionFactory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest" }; using (var connection = connectionFactory.CreateConnection()) { using var channel = connection.CreateModel(); // durable: true 队列持久化 channel.QueueDeclare(queue: "myqueue", durable: true, false, false, null); //durable: true 交换机持久化 channel.ExchangeDeclare(exchange: "myexchange", ExchangeType.Direct, durable: true, false, null); channel.QueueBind(queue: "myqueue", exchange: "myexchange", routingKey: "myexchangekey", null); var consumer = new EventingBasicConsumer(channel);//消费事件 consumer.Received += (sender, e) => { //下面操作包括事务处理 var body = System.Text.Encoding.UTF8.GetString(e.Body.ToArray()); // //处理消息具体处理过程 Console.WriteLine("RabbitMQ 消费者已经消费消息"); // ////手动确认,正常消费,通知消息中心,该条消息可以删除了,手动确认的话,自动确认要设置为false,autoAck: true, //channel.BasicAck(e.DeliveryTag, false); //channel.BasicConsume(queue: "myqueue", autoAck: false, consumer); ////手动确认,非正常消费即出错出现异常,通知消息中心,手动确认的话,自动确认要设置为false,autoAck: true, //BasicReject 中requeue: true 告诉消息队列,出错,但是重新把消息插入到队列中,下次使用 //BasicReject 中requeue: false 告诉消息队列,出错,删除该条消息 channel.BasicReject(e.DeliveryTag,requeue: true); //channel.BasicConsume(queue: "myqueue", autoAck: false, consumer); //autoAck: true,自动确认,表示已成功从消息队列中读取消息,通知消息队列 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer); }; } #endregion Console.WriteLine("RabbitMQ 输入任何字符退出。。"); Console.Read(); } }}

5、启动RabbitMQ服务,输入命令:rabbitmq-service start

6、分别启动ConsoleRabbitMQ.exe和ConsoleRabbitMQ01.exe这个两个项目

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:HttpWebRequest Timeout
下一篇:聊一聊tcp 拥塞控制 二
相关文章

 发表评论

暂时没有评论,来抢沙发吧~