C# RocketMqHelper
using org.apache.rocketmq.client.consumer;using org.apache.rocketmq.client.consumer.listener;using org.apache.rocketmq.client.producer;using org.apache.rocketmq.common.consumer;using org.apache.rocketmq.common.protocol.heartbeat;using System;using System.Collections.Generic;using System.Configuration;using System.Linq;namespace SqlBulkCopyData.消息中间件{ public class RocketMqHelper { private static readonly string namesrvAddr = null; private static IList producers = new List(); private static IList consumers = new List(); private static object producer_lock = new object(); private static object consumer_lock = new object(); static RocketMqHelper() { namesrvAddr = ConfigurationManager.AppSettings["RocketMqIp"]; if(string.IsNullOrEmpty(namesrvAddr)) namesrvAddr = "47.106.232.106:9876"; } /// /// 创建生产者 /// /// /// public static DefaultMQProducer CreateDefaultMQProducer(string groupName, int queueCount = 6) { var producer = producers.Where(o => o.getProducerGroup() == groupName).FirstOrDefault(); if (producer == null) { lock (producer_lock) { producer = producers.Where(o => o.getProducerGroup() == groupName).FirstOrDefault(); if (producer == null) { producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(namesrvAddr); producer.setRetryTimesWhenSendFailed(3); producer.setDefaultTopicQueueNums(queueCount); producer.start(); producers.Add(producer); } } } return producer; } /// /// 创建消费者 /// /// /// public static DefaultMQPushConsumer CreateDefaultMQPushConsumer(string groupName) where T : MessageListenerConcurrently { var consumer = consumers.Where(o => o.getConsumerGroup() == groupName).FirstOrDefault(); if (consumer == null) //双if +lock { lock (consumer_lock) { consumer = consumers.Where(o => o.getConsumerGroup() == groupName).FirstOrDefault(); if (consumer == null) { consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(Activator.CreateInstance()); consumer.start(); consumers.Add(consumer); } } } return consumer; } }}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~