SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用

网友投稿 944 2022-11-13

SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用

SpringBoot Redis 发布订阅模式(Pub/Sub)的具体使用

目录Redis命令行下使用发布订阅publish 发布subscribe 订阅SpringBoot中使用Redis的发布订阅功能发布者订阅者消息监听容器

注意:redis的发布订阅模式不可以将消息进行持久化,订阅者发生网络断开、宕机等可能导致错过消息。

Redis命令行下使用发布订阅

publish 发布

发布者通过以下命令可以往指定channel发布message

redis> publish channel message

subscribe 订阅

订阅者通过以下命令可以订阅一个或多个频道,如果频道不存在则会创建

redis> subscribe channel [channel ...]

对于redis的发布订阅的命令就这么简单。那么接下来我们在springboot中如何使用发布订阅的功能呢?

SpringBoot中使用Redis的发布订阅功能

添加依赖配置redis信息和连接池什么的就不说了,如果添加的有commons-pool2依赖的话,会自动帮我们配置redis连接池的

发布者

相对于订阅者来说,发布者的实现方式很简单,以下方式就可以往channel中发送message了。

@Resource

private RedisTemplate redisTemplate;

public void publish(){

// 使用高级的redisTemplate

redisTemplate.convertAndSend("channel","message");

// 使用低级的connection 实际上redisTemplate的底层就是使用的下面的方式

redisTemplate.execute(new RedisCallback() {

@Override

public Object doInRedis(RedisConnection connection) throws DataAccessException {

connection.publish("channel".getBytes(StandardCharsets.UTF_8), "message".getBytes(StandardCharsets.UTF_8));

return null;

}

}, true);

// true这个参数意思是 是否将redis连接暴露给回调代码,大多数情况下设置true就可以了,往后深入的话可以看到

RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); 如果为false的话会创建redis连接的代理

}

订阅者

订阅者因为涉及到连接、线程等 所以内容相对会多一点

@Resource

private RedisTemplate redisTemplate;

public void subscribe() {

redisTemplate.execute(new RedisCallback() {

@Override

public Object doInRedis(RedisConnection connection) throws DataAccessException {

// 我定义了一个全局的 ConcurrentHashMap 用来存放连接 因为后面的取消订阅的线程要和订阅的线程用同一个连接

map.put("connection",connection);

// subscribe 按频道订阅 该方法会阻塞该线程 只有取消订阅才会释放该线程

connection.subscribe(new MessageListener() {

@Override

public void onMessage(Message message, byte[] pattern) {

log.info("接收到消息");

System.out.println(new String(message.getBody()));

}

}, "channelOne".getBytes(StandardCharsets.UTF_8), "channelTwo".getBytes(StandardCharsets.UTF_8));

// 按模式http://订阅 pSubscribe 只有取消订阅才会释放该线程

// connection.pSubscribe(new MessageListener() {

// @Override

// public void onMessage(Message message, byte[] pattern) {

// System.out.println(new String(message.getBody()));

// }

// }, "patternOne".getBytes(StandardCharsets.UTF_8), "patternOne".getBytes(StandardCharsets.UTF_8));

return null;

}

}, true);

}

如何取消订阅呢?从刚才的map里取到连接

RedisConnection the = map.get("connection");

Subscription subscription = the.getSubscription();

subscription.unsubscribe();

消息监听容器

上面的那种订阅为低级订阅,由于连接在调用subscribe的时候会导致当前线程阻塞,这种方式需要对每个-连接和线程管理,所以spring提供了RedisMessageListenerContainer类来帮我们完成这些工作。

RedisMessageListenerContainer顾名思义可以知道它是一个消息监听容器

详情请参考官方文档

如何实现

@Configuration

public class DefaultMessageListenerContainerConfig {

@Bean

public RedisMessageListenerContainer container(RedisConnectionFactory factory) {

RedisMessageListenerContainer container = new RedisMessageListenerContainer();

container.setConnectionFactory(factory);

// 官方推荐我们使用自定义的线程池或者使用TaskExecutor

container.setTaskExecutor(executor());

container.addMessageListener(new MessageListener() {

@Override

public void onMessage(Message message, byte[] pattern) {

System.out.println(Thread.currentThread().getName() + ": " + new String(message.getBody()));

}

}, new ChannelTopic("message"));

return container;

}

@Bean

public TaskExecutor executor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());

executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);

executor.setQueueCapacity(100);

executor.initialize();

return executor;

}

}

这个时候我们在redis命令行内使用 publish channel message 的时候,我们的spring程序就可以订阅到消息了。

再说下 MessageListenerAdapter

我们可以通过 MessageListenerAdapter 消息接收者包装进去,消息接收者不会和redis有任何耦合。

官方文档给了spring传统的xml的方式配置的,下面我给出基于configuration配置的代码

public interface MessageDelegate {

void handleMessage(String message);

}

public class DefaultMessageDelegate implements MessageDelegate {

@Override

public void handleMessage(String message) {

System.out.println(message);

}

}

@Configuration

public class MessageListenerContainerConfig {

@Autowired

private DefaultMessageDelegate defaultMessageDelegate;

@Bean

public RedisMessageListenerContainer container(RedisConnectionFactory factory,

MessageListenerAdapter messageListenerAdapter) {

RedisMessageListenerContainer container = new RedisMessageListenerContainer();

container.setConnectionFactory(factory);

container.setTaskExecutor(executor());

Map> map = new HashMap<>();

List channelTopics = new ArrayList<>();

ChannelTopic channelTopic = new ChannelTopic("message");

channelTopics.add(channelTopic);

map.put(messageListenerAdapter, channelTopics);

container.setMessageListeners(map);

return container;

}

@Bean

public TaskExecutor executor() {

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());

executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);

executor.setQueueCapacity(100);

executor.initialize();

return executor;

}

@Bean

public MessageListenerAdapter messageListenerAdapter() {

// handleMessage 参数消息来的时候要调用的方法 默认是 handleMessage

return new MessageListenerAdapter(defaultMessageDelegate, "handleMessage");

}

}

如果我们要在程序运行时添加订阅或者取消订阅的时候该怎么办呢?

我们需要提前准备好消息侦听器,添加的时候把侦听器注入到消息容器

取消的时候就调用消息容器的remove方法把侦听器删除掉即可。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:PostgreSQL数据库事务系统Upper Layer——BeginImplicitTransactionBlock
下一篇:PostgreSQL数据库并发事务——AssignTransactionId和GetSnapshotData调用者
相关文章

 发表评论

暂时没有评论,来抢沙发吧~