websocket统一分布式集群通信设计

网友投稿 672 2022-11-19

websocket统一分布式集群通信设计

websocket统一分布式集群通信设计

1.对面向接口设计的个人理解

1.1 接口的核心作用:

1.2 抽象类的核心作用:

1.3 如何进行抽象设计

将要解决的是一类问题,而非一个问题对这一类问题,至少掌握两种及以上的通用处理方式;将通用的部分抽离成为框架(骨架);至少提供一个及多个不同层次的抽象类,使得流程健全可用;

2.websocket集群通信抽象设计

3.抽象代码设计与实现

3.1 代码组织结构

3.2 代码抽象设计-对内

import javax.websocket.Session;/** * @author automannn@163.com * @time 2020/10/26 9:24 */public interface WsService { /** * 剔除无用连接 * @param key * @return */ boolean eliminateDeadConnection(WsSessionKey key); /** * 接收来自连接的消息 * @param message * @param session */ void receiveMessage(String message,Session session); /** * 发送消息 * @param message * @param key */ void sendMessage(WsMessage message,WsSessionKey key); /** * 发送消息 * @param message */ void sendMessage(WsMessage message); void sendToLocal(WsMessage message);}

/** * @author automannn@163.com * @time 2020/10/26 9:18 */public interface WsClusterable { /** * 发送消息到集群 * @param msg 消息对象 * @return 是否发送成功 */ boolean sendToCluster(String msg); /** * 当前环境是否是集群环境 * @return */ String getClusterable();}

import com.alibaba.fastjson.JSON;import xxx.xxx.xxx.core.websocket.outer.WsPublisher;import org.apache.commons.lang3.StringUtils;import org.springframework.util.Assert;import javax.websocket.Session;/** * @author automannn@163.com * @time 2020/10/27 9:12 */public abstract class AbstractClusterableWsService implements WsService,WsClusterable { protected static final WsSessionCache clients = new WsSessionCache<>(); public AbstractClusterableWsService() { } protected abstract WsPublisher getPublisher(); protected abstract String getTopic(); @Override public boolean eliminateDeadConnection(WsSessionKey key) { clients.remove(key); return true; } @Override public void sendMessage(WsMessage message, WsSessionKey key) { Assert.isTrue(message!=null&&key!=null,"参数非法"); key.setUserId(message.getReceiver()); if (clients.get(key)!=null){ sendToLocal(message,key); }else { if (Boolean.valueOf(getClusterable())){ boolean flag= sendToCluster(new WsMessageBuilder(message).get()); if (!flag){ //todo: 是否进行补偿? } }else { } } } @Override public void sendMessage(WsMessage message) { Assert.isTrue(message!=null,"参数不能为空"); Assert.isTrue(StringUtils.isNotEmpty(message.getReceiver()),"参数有误"); this.sendMessage(message,new WsSessionKey(message.getReceiver())); } @Override public boolean sendToCluster(String msg) { WsPublisher publisher= getPublisher(); Assert.isTrue(publisher!=null,"parameter cannot be null!"); String topic = getTopic(); Assert.isTrue(StringUtils.isNotEmpty(topic),"parameter cannot be null!"); return publisher.publish(topic,msg); } public void sendToLocal(WsMessage message){ sendToLocal(message,new WsSessionKey(null,message.getReceiver())); } private void sendToLocal(WsMessage message,WsSessionKey key){ String textMessage = JSON.toJSONString(message); Session session = clients.get(key); if (session!=null) session.getAsyncRemote().sendText(textMessage); }}

为完成抽象流程,需要提供的支撑类:

import org.springframework.util.Assert;import org.springframework.util.StringUtils;import java.util.concurrent.ConcurrentHashMap;/** * @author automannn@163.com * @time 2020/11/4 9:36 */public class WsSessionCache extends ConcurrentHashMap { @Override public V get(Object key) { Assert.isInstanceOf(WsSessionKey.class,key,"illegal parameters"); WsSessionKey theKey = (WsSessionKey) key; Assert.isTrue(theKey!=null,"parameter cannot be null!"); //通过遍历查找用户 if (StringUtils.isEmpty(theKey.getWsSessionId())){ if (StringUtils.isEmpty(theKey.getUserId())) return null; for (Entry entry:entrySet()){ WsSessionKey targetKey= (WsSessionKey) entry.getKey(); if (theKey.equals(targetKey)){ return (V) entry.getValue(); } } return null; }else { return super.get(key); } }}

import java.io.Serializable;import java.util.Objects;import org.apache.commons.lang3.StringUtils;import org.springframework.util.Assert;import javax.websocket.Session;/** * @author automannn@163.com * @time 2020/10/27 9:48 */public class WsSessionKey implements Serializable { private static final long serialVersionUID = 1; //wsSession的id号,递增 private String wsSessionId; //系统内用户id号 private String userId; public WsSessionKey(String id,String userId){ this.wsSessionId = id; this.userId = userId; } public WsSessionKey(Session session){ Assert.isTrue(session!=null,"parameter cannot be null!"); this.wsSessionId = session.getId(); } public WsSessionKey(String receiver){ Assert.isTrue(StringUtils.isNotEmpty(receiver),"parameter cannot be null!"); this.userId = receiver; } public String getWsSessionId() { return wsSessionId; } public void setWsSessionId(String wsSessionId) { this.wsSessionId = wsSessionId; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } @Override public boolean equals(Object obj) { //obj参数代表 表中的目标对象 if (!(obj instanceof WsSessionKey)) return false; String targetId = ((WsSessionKey)obj).getWsSessionId(); String targetUserId = ((WsSessionKey)obj).getUserId(); //既包括连接号,又包括用户 id ======> 本地消息发送 if (StringUtils.isEmpty(targetId) && StringUtils.isEmpty(targetId)){ if (this.wsSessionId.equals(targetId)&&this.userId.equals(targetUserId)) return true; } //当目标对象中不含 用户id号时,此时连接将失效, ========>剔除无效连接 if (StringUtils.isEmpty(this.userId)){ if (targetId.equals(this.wsSessionId)) return true; //当目标对象中 不含 连接id号时, =====>集群信息发送 }else{ if (targetUserId.equals(this.userId)) return true; } return false; } @Override public int hashCode() { return Objects.hashCode(wsSessionId); }}

import java.io.Serializable;/** * @author automannn@163.com * @time 2020/10/27 9:25 */public class WsMessage implements Serializable { private static final long serialVersionUID = 1; //发送者 private String sender; //接收者 private String receiver; //标题 private String title; //内容 private String content; //业务内容 private String bizContent; public WsMessage() { } public String getSender() { return sender; } public void setSender(String sender) { this.sender = sender; } public String getReceiver() { return receiver; } public void setReceiver(String receiver) { this.receiver = receiver; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public String getBizContent() { return bizContent; } public void setBizContent(String bizContent) { this.bizContent = bizContent; }}

3.3 抽象代码设计-对外

import org.springframework.beans.factory.InitializingBean;/** * @author automannn@163.com * @time 2020/10/27 9:08 */public interface WsSubscriber extends InitializingBean { void subscribe(); String getTopic(); Object getSubscribeHolder();}

/** * @author automannn@163.com * @time 2020/10/27 9:08 */public interface WsPublisher { /** * 发布消息 * @param topic * @param message * @return */ boolean publish(String topic,String message);}

import com.yinhai.msg.send.core.websocket.inner.WsMessage;/** * @author chenkh * @time 2020/10/27 10:48 */public interface WsMessageListener { boolean receiveMessage(String topic,WsMessage message); void dispatchMessage(WsMessage wsMessage);}

4.具体渠道设计(设计者与使用者,均可在遵循骨架的情况下进行扩展)

4.1 rabbitmq渠道实现

import xxx.xxx.xxx.core.websocket.outer.WsSubscriber;import org.springframework.amqp.core.MessageListener;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.MessageListenerContainer;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.util.Assert;/** * @author automannn@163.com * @time 2020/11/2 15:51 */public class WsRabbitmqSubscriber extends SimpleMessageListenerContainer implements WsSubscriber { private MessageListener messageListener; Object subscribeHolder; private String topic; public WsRabbitmqSubscriber(MessageListener messageListener, Object subscribeHolder, String topic) { super((ConnectionFactory) subscribeHolder); Assert.isTrue(messageListener!=null,"parameter cannot be null!"); this.messageListener = messageListener; this.subscribeHolder = subscribeHolder; this-ic = topic; } @Override public void subscribe() { getSubscribeHolder().setupMessageListener(this.messageListener); } @Override public String getTopic() { return this-ic; } @Override public MessageListenerContainer getSubscribeHolder() { return (MessageListenerContainer) this.subscribeHolder; }}

import xxx.xxx.xxx.core.websocket.outer.WsPublisher;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.core.env.Environment;import org.springframework.util.Assert;/** * @author automannn@163.com * @time 2020/11/2 15:51 */public class WsRabbitmqPublisher implements WsPublisher { public static final String WS_TOPIC = "xxx.websocket-ic"; private RabbitTemplate rabbitTemplate; private Environment environment; public WsRabbitmqPublisher(RabbitTemplate rabbitTemplate,Environment environment) { Assert.isTrue(rabbitTemplate!=null,"This parameter cannot be null!"); Assert.isTrue(environment!=null,"This parameter cannot be null!"); this.rabbitTemplate = rabbitTemplate; this.environment = environment; } @Override public boolean publish(String exchange, String message) { //这里的topic,相当于交换机 的概念 String topic = environment.getProperty(WS_TOPIC); rabbitTemplate.convertAndSend(exchange, topic+".*",message); return true; }}

import com.alibaba.fastjson.JSON;import com.rabbitmq.client.Channel;import xxx.xxx.xxx.core.websocket.inner.WsMessage;import xxx.xxx.xxx.core.websocket.inner.WsService;import xxx.xxx.xxx.core.websocket.outer.WsMessageListener;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;import org.springframework.util.Assert;import java.nio.charset.StandardCharsets;/** * @author automannn@163.com * @time 2020/11/2 15:51 */public class WsRabbitmqMessageListener implements WsMessageListener, ChannelAwareMessageListener { private WsService wsService; public WsRabbitmqMessageListener(WsService wsService) { Assert.isTrue(wsService!=null,"parameter cannot be null!"); this.wsService = wsService; } @Override public boolean receiveMessage(String topic, WsMessage message) { //todo:do something return true; } @Override public void dispatchMessage(WsMessage wsMessage) { wsService.sendToLocal(wsMessage); } @Override public void onMessage(Message message, Channel channel) { String topic= message.getMessageProperties().getConsumerQueue(); byte[] body = message.getBody(); String wsMessageStr = new String(body, StandardCharsets.UTF_8); WsMessage wsMessage= JSON.parseObject(wsMessageStr,WsMessage.class); receiveMessage(topic,wsMessage); dispatchMessage(wsMessage); }}

4.2 rocketmq渠道实现

import xxx.xxx.xxx.core.websocket.outer.WsSubscriber;import xxx.xxx.xxx.core.exception.AppException;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import org.springframework.util.Assert;/** * @author automannn@163.com * @time 2020/11/2 15:11 */public class WsRocketmqSubscriber implements WsSubscriber { public static final String SPLIT_FLAG = "~"; private MessageListenerConcurrently messageListener; Object subscribeHolder; private String topic; public WsRocketmqSubscriber(MessageListenerConcurrently messageListener, Object subscribeHolder, String topic) { Assert.isTrue(messageListener!=null,"parameter cannot be null!"); Assert.isInstanceOf(DefaultMQPushConsumer.class,subscribeHolder,"illegal parameter!"); this.messageListener = messageListener; this.subscribeHolder = subscribeHolder; this-ic = topic; } @Override public void subscribe() { getSubscribeHolder().registerMessageListener(this.messageListener); getSubscribeHolder().setMessageModel(MessageModel.BROADCASTING); try { String topic = getTopic(); String[] topicArr= topic.split(SPLIT_FLAG); getSubscribeHolder().subscribe(topicArr[0],topicArr[1]); getSubscribeHolder().start(); } catch (MQClientException e) { throw new AppException("rocketmq subscribe failed!"); } } @Override public String getTopic() { return this-ic; } @Override public DefaultMQPushConsumer getSubscribeHolder() { return (DefaultMQPushConsumer) this.subscribeHolder; } @Override public void afterPropertiesSet() throws Exception { subscribe(); }}

import xxx.xxx.xxx.core.websocket.outer.WsPublisher;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.SendStatus;import org.apache.rocketmq.common.message.Message;import org.springframework.util.Assert;/** * @author automannn@163.com * @time 2020/11/2 15:11 */public class WsRocketmqPublisher implements WsPublisher { private DefaultMQProducer producer; public WsRocketmqPublisher(DefaultMQProducer producer) { Assert.isTrue(producer!=null,"This parameter cannot be null!"); this.producer = producer; } @Override public boolean publish(String topic, String message) { Message sendMsg = new Message(topic,"*",message.getBytes()); try { SendResult sendResult = producer.send(sendMsg); if (sendResult.getSendStatus()== SendStatus.SEND_OK){ return true; }else { return false; } } catch (Exception e) { return false; } }}

import com.alibaba.fastjson.JSON;import xxx.xxx.xxx.core.websocket.inner.WsMessage;import xxx.xxx.xxx.core.websocket.inner.WsService;import xxx.xxx.xxx.core.websocket.outer.WsMessageListener;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.util.Assert;import org.springframework.util.CollectionUtils;import java.io.UnsupportedEncodingException;import java.util.List;/** * @author automannn@163.com * @time 2020/11/2 15:11 */public class WsRocketmqMessageListener implements MessageListenerConcurrently, WsMessageListener { public static final Logger LOGGER = LoggerFactory.getLogger(WsRocketmqMessageListener.class); private WsService wsService; public WsRocketmqMessageListener(WsService wsService) { Assert.isTrue(wsService!=null,"parameter cannot be null!"); this.wsService = wsService; } @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { if (CollectionUtils.isEmpty(list)){ LOGGER.info("mq接收消息为空,直接返回成功"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = list.get(0); LOGGER.info("mq接收到的消息为:"+messageExt.toString()); try{ String topic = messageExt.getTopic(); String tags = messageExt.getTags(); String body = new String(messageExt.getBody(),"utf-8"); LOGGER.info("mq消息topic={},tags={},消息内容={}",topic,tags,body); WsMessage wsMessage= JSON.parseObject(body,WsMessage.class); receiveMessage(topic,wsMessage); dispatchMessage(wsMessage); } catch (UnsupportedEncodingException e) { LOGGER.error("获取mq消息内容异常!"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } @Override public boolean receiveMessage(String topic, WsMessage message) { //todo:do some thing return true; } @Override public void dispatchMessage(WsMessage wsMessage) { wsService.sendToLocal(wsMessage); }}

4.3 redis渠道实现

import xxx.xxx.xxx.core.websocket.outer.WsSubscriber;import org.apache.commons.lang3.StringUtils;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.listener.PatternTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.listener.Topic;import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;import org.springframework.util.Assert;import java.util.ArrayList;import java.util.List;/** * @author automannn@163.com * @time 2020/10/27 10:40 */public class WsRedisSubscriber implements WsSubscriber{ private MessageListener messageListener; Object subscribeHolder; private String topic; public WsRedisSubscriber(RedisMessageListenerContainer subscribeHolder,MessageListener messageListener,String topic){ Assert.isTrue(messageListener!=null,"parameter cannot be null!"); Assert.isInstanceOf(RedisMessageListenerContainer.class,subscribeHolder,"illegal parameter!"); this-ic = topic; this.messageListener = messageListener; this.subscribeHolder = subscribeHolder; } @Override public void subscribe() { Assert.isTrue(StringUtils.isNotEmpty(topic),"parameter cannot be null!"); Assert.isTrue(getSubscribeHolder()!=null,"parameter cannot be null!"); List topicList = new ArrayList<>(); topicList.add(new PatternTopic(topic)); getSubscribeHolder().addMessageListener(listenerAdapter(),topicList); } private MessageListenerAdapter listenerAdapter(){ return new MessageListenerAdapter(this.messageListener); } @Override public String getTopic() { return this-ic; } @Override public RedisMessageListenerContainer getSubscribeHolder() { return (RedisMessageListenerContainer) this.subscribeHolder; } @Override public void afterPropertiesSet() throws Exception { subscribe(); }}

import com.alibaba.fastjson.JSON;import xxx.xxx.xxx.core.websocket.inner.WsMessage;import xxx.xxx.xxx.core.websocket.outer.WsPublisher;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.util.Assert;/** * @author automannn@163.com * @time 2020/10/27 10:34 */public class WsRedisPublisher implements WsPublisher { private RedisTemplate redisTemplate; public WsRedisPublisher(RedisTemplate redisTemplate){ Assert.isTrue(redisTemplate!=null,"This parameter cannot be null!"); this.redisTemplate = redisTemplate; } @Override public boolean publish(String topic, String message) { //由于redis在存储的时候,自身做了序列化与反序列化的操作,这里选择存入对象 方便消费者使用 WsMessage wsMessage = JSON.parseObject(message,WsMessage.class); redisTemplate.convertAndSend(topic,wsMessage); return true; }}

import xxx.xxx.xxx.core.websocket.inner.WsMessage;import xxx.xxx.xxx.core.websocket.inner.WsService;import xxx.xxx.xxx.core.websocket.outer.WsMessageListener;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.util.Assert;/** * @author automannn@163.com * @time 2020/10/27 11:00 */public class WsRedisMessageListener implements WsMessageListener, MessageListener { private WsService wsService; private RedisTemplate redisTemplate; public WsRedisMessageListener(WsService wsService, RedisTemplate redisTemplate) { Assert.isTrue(wsService!=null,"parameter cannot be null!"); Assert.isTrue(redisTemplate!=null,"parameter cannot be null!"); this.wsService = wsService; this.redisTemplate = redisTemplate; } @Override public boolean receiveMessage(String topic, WsMessage message) { //todo: dosomething return true; } @Override public void dispatchMessage(WsMessage wsMessage) { Assert.isTrue(wsMessage!=null,"parameter cannot be null!"); wsService.sendToLocal(wsMessage); } @Override public void onMessage(Message message, byte[] bytes) { String topic= new String(message.getChannel()); RedisSerializer redisSerializer= redisTemplate.getDefaultSerializer(); WsMessage wsMessage= (WsMessage) redisSerializer.deserialize(message.getBody()); receiveMessage(topic,wsMessage); dispatchMessage(wsMessage); }}

5.基于springBoot的配置设计

5.1 代码层面渠道bean配置示例

import xxx.xxx.xxx.core.websocket.inner.WsService;import xxx.xxx.xxx.core.websocket.outer.WsPublisher;import xxx.xxx.xxx.core.websocket.outer.WsSubscriber;import xxx.xxx.xxx.core.websocket.outer.rabbitMqImpl.WsRabbitmqMessageListener;import xxx.xxx.xxx.core.websocket.outer.rabbitMqImpl.WsRabbitmqPublisher;import xxx.xxx.xxx.core.websocket.outer.rabbitMqImpl.WsRabbitmqSubscriber;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.annotation.EnableRabbit;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.EnvironmentAware;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.env.Environment;import javax.annotation.Resource;/** * @author automannn@163.com * @time 2020/11/3 9:53 */@Configuration@ConditionalOnClass(EnableRabbit.class)@ConfigurationProperties(prefix = "xxx.websocket.rabbitmq")@ConditionalOnProperty(prefix = "xxx.websocket.cluterable",value = "type",havingValue = "rabbitmq")public class RabbitMqPubsubConfiguration implements EnvironmentAware { public static final String WS_TOPIC = "msg.websocket-ic"; @Autowired private WsService wsService; @Resource private RabbitTemplate rabbitTemplate; @Autowired private CachingConnectionFactory connectionFactory; private Environment environment; /*队列名称*/ private String queueName; /*交换机名称,需要在rabbitmq新建*/ private String exchangeName; /*每个消费者获取的最大消息数量*/ private int prefetchCount; /*消费者个数*/ private int concurrentConsumers = 1; /** * 队列配置 */ @Bean public Queue theQueue(){ return new Queue(this.queueName); } /** * 交换机配置 */ @Bean public TopicExchange topicExchange(){ return new TopicExchange(this.exchangeName); } /** * 绑定 严格路由键 */ @Bean public Binding bindingTopicExchangeMessage(){ String topic= this.environment.getProperty(WS_TOPIC); return BindingBuilder.bind(theQueue()).to(topicExchange()).with(topic+".#"); } @Bean public WsPublisher wsPublisher(){ return new WsRabbitmqPublisher(rabbitTemplate,this.environment); } @Bean public MessageListener messageListener(){ return new WsRabbitmqMessageListener(wsService); } @Bean public WsSubscriber wsSubscriber(){ WsRabbitmqSubscriber container = new WsRabbitmqSubscriber(messageListener(),connectionFactory,this.queueName); container.setQueueNames(this.queueName); container.setExposeListenerChannel(true); container.setPrefetchCount(this.prefetchCount);//设置每个消费者获取的最大的消息数量 container.setConcurrentConsumers(concurrentConsumers);//消费者个数 container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setMessageListener(messageListener());//监听处理类 return container; } public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } public String getExchangeName() { return exchangeName; } public void setExchangeName(String exchangeName) { this.exchangeName = exchangeName; } public int getPrefetchCount() { return prefetchCount; } public void setPrefetchCount(int prefetchCount) { this.prefetchCount = prefetchCount; } public int getConcurrentConsumers() { return concurrentConsumers; } public void setConcurrentConsumers(int concurrentConsumers) { this.concurrentConsumers = concurrentConsumers; } @Override public void setEnvironment(Environment environment) { this.environment = environment; }}

5.2 配置文件配置示例

xxx: websocket: topic: wsExchange isCluster: false cluterable: type: redis #redis,rabbitmq,rocketmq rocketmq: #生产者配置 groupName: ${spring.application.name} namesrvAddr: 192.168.10.21:9876 maxMessageSize: 4096 sendMsgTimeOut: 3000 #消费者配置 retryTimesWhenSendFailed: 2 topic: ${msg.websocket-ic}~* consumeThreadMin: 5 consumeThreadMax: 32 consumeMessageBatchMaxSize: 1 rabbitmq: #rabbitmq的连接配置信息,与spring保持一致 queueName: ${msg.websocket-ic}.${spring.application.name} exchangeName: ${msg.websocket-ic} prefetchCount: 1 concurrentConsumers: 1

6.备注

代码中的包以及配置之类,均做了敏感信息的处理,小朋友请在大朋友的陪同下观看。要讲武德,不要搞骗、搞偷袭.....

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:数据结构——平衡二叉树(AVL)
下一篇:大数据环境---zookeeper的安装
相关文章

 发表评论

暂时没有评论,来抢沙发吧~