redis redisson 集合的使用案例(RList、Rset、RMap)

网友投稿 1206 2023-01-01

redis redisson 集合的使用案例(RList、Rset、RMap)

redis redisson 集合的使用案例(RList、Rset、RMap)

redis redisson 集合操作

相关类及接口

Rlist:链表

public interface RList extends List, RExpirable, RListAsync, RSortable>, RandomAccess {

List get(int... var1); //获取指定的节点值

int addAfter(V var1, V var2); //在var1前添加var2

int addBefore(V var1, V var2); //在var1后添加var2

void fastSet(int var1, V var2); //修改var1处的只为var2

List readAll(); //获取链表的所有值

RList subList(int var1, int var2); //获取var1到var2的子链表

List range(int var1); //返回var1往后的链表

List range(int var1, int var2); //返回var1到var2的链表

void trim(int var1, int var2); //保留var1到var2处的链表,其余删除

void fastRemove(int var1); //删除var1处的值

boolean remove(Object var1, int var2); //判断元素是否删除

RCollectionMapReduce mapReduce(); //mapreduce操作

}

RSet:无序集合

public interface RSet extends Set, RExpirable, RSetAsync, RSortable> {

V removeRandom();

Set removeRandom(int var1); //删除对象

V random();

Set random(int var1); //随机返回对象

boolean move(String var1, V var2); //判断集合var1中是否存在var2,类似contains()方法

Set readAll(); //获取所有对象

int union(String... var1); //集合并集对象个数

Set readUnion(String... var1); //集合并集

int diff(String... var1); //集合差集对象个数

Set readDiff(String... var1); //集合差集

int intersection(String... var1); //集合交集的对象个数

Set readIntersection(String... var1); //集合交集

Iterator iterator(int var1);

Iterator iterator(String var1, int var2);

Iterator iterator(String var1); //遍历集合

RCollectionMapReduce mapReduce();

RSemaphore getSemaphore(V var1);

RCountDownLatch getCountDownLatch(V var1);

RPermitExpirableSemaphore getPermitExpirableSemaphore(V var1); //信号量

RLock getLock(V var1);

RLock getFairLock(V var1);

RReadWriteLock getReadWriteLock(V var1); //锁操作

Stream stream(int var1);

Stream stream(String var1, int var2);

Stream stream(String var1); //流操作

}

RMap:键值对

public interface RMap extends ConcurrentMap, RExpirable, RMapAsync {

void loadAll(boolean var1, int var2);

void loadAll(Set extends K> var1, boolean var2, int var3);

V get(Object var1); //获取var1的值

V put(K var1, V var2); //添加对象

V putIfAbsent(K var1, V var2); //对象不存在则设置

V replace(K var1, V var2); //替换对象

boolean replace(K var1, V var2, V var3); //替换对象

V remove(Object var1); //移除对象

boolean remove(Object var1, Object var2); //移除对象

void putAll(Map extends K, ? extends V> var1);

void putAll(Map extends K, ? extends V> var1, int var2); //添加对象

Map getAll(Set var1); //获取key在集合var1中的键值对

int valueSize(K var1); //key为var1的bxzaMvalue大小

V addAndGet(K var1, Number var2); //key为var1的value加var2

long fastRemove(K... var1); //移除对象

boolean fastPut(K var1, V var2); //添加对象

boolean fastReplace(K var1, V var2); //替换key为var1的值为var2

boolean fastPutIfAbsent(K var1, V var2); //如果不存在则设置

Set readAllKeySet(); //获取所有key,以set形式返回

Collection readAllValues(); //获取所有value,以collection返回

Set> readAllEntrySet(); //遍历键值对

Map readAllMap(); //集合形式转换为map类型

Set keySet();

Set keySet(int var1);

Set keySet(String var1, int var2);

Set keySet(String var1); //获取key集合

Collection values();

Collection values(String var1);

Collection values(String var1, int var2);

Collection values(int var1); //获取所有value

Set> entrySet();

Set> entrySet(String var1);

Set> entrySet(String var1, int var2);

Set> entrySet(int var1); //遍历键值对

RMapReduce mapReduce();

RSemaphore getSemaphore(K var1);

RCountDownLatch getCountDownLatch(K var1);

RPermitExpirableSemaphore getPermitExpirableSemaphore(K var1); //信号量操作

RLock getLock(K var1);

RLock getFairLock(K var1);

RReadWriteLock getReadWriteLock(K var1); //锁操作

}

使用示例

public class MyTest {

public static void main(String[] args){

Config config=new Config();

config.useSingleServer().setAddress("redis://******:6379").setPassword("123456");

RedissonClient client= Redisson.create(config);

RList list=client.getList("list");

for (int i=0;i<10;i++){

list.add("瓜田李下 "+i);

}

list.readAll().forEach(System.out::println);

System.out.println("list的数量为:"+list.size()+"\n");

RSet set=client.getSet("set");

for (int i=0;i<10;i++){

set.add("瓜田李下 "+i);

}

for (String s : set) {

System.out.println(s);

}

System.out.println("set的大小为:"+set.size()+"\n");

RMap map=client.getMap("map");

for (int i=0;i<10;i++){

map.put(i,"瓜田李下 "+i);

}

for (Map.Entry entry:map.entrySet()){

System.out.println(entry.getKey()+" ==> "+entry.getValue());

}

System.out.println("map的大小为:"+map.size());

}

}

控制台输出

瓜田李下 0

瓜田李下 1

瓜田李下 2

瓜田李下 3

瓜田李下 4

瓜田李下 5

瓜田李下 6

瓜田李下 7

瓜田李下 8

瓜田李下 9

list的数量为:10

瓜田李下 0

瓜田李下 1

瓜田李下 7

瓜田李下 3

瓜田李下 5

瓜田李下 4

瓜田李下 9

瓜田李下 8

瓜田李下 6

瓜田李下 2

set的大小为:10

0 ==> 瓜田李下 0

1 ==> 瓜田李下 1

2 ==> 瓜田李下 2

3 ==> 瓜田李下 3

4 ==> 瓜田李下 4

5 ==> 瓜田李下 5

6 ==> 瓜田李下 6

7 ==> 瓜田李下 7

8 ==> 瓜田李下 8

9 ==> 瓜田李下 9

map的大小为:10

Redisson使用注意事项

Redisson 是一个在 Redis 的基础上实现的 java 驻内存数据网格,相较于暴露底层操作的Jedis,Redisson提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。

特性 & 功能:

支持 Redis 单节点(single)模式、哨兵(sentinel)模式、主从(Master/Slave)模式以及集群(Redis Cluster)模式

程序接口调用方式采用异步执行和异步流执行两种方式

数据序列化,Redisson 的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在 Redis 里的读取和存储

单个集合数据分片,在集群模式下,Redisson 为单个 Redis 集合类型提供了自动分片的功能

提供多种分布式对象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等

提供丰富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等

分布式锁和同步器的实现,可重入锁(Reentrant Lock),公平锁(Fair Lock),联锁(MultiLock),红锁(Red Lock),信号量(Semaphonre),可过期性信号锁(PermitExpirableSemaphore)等

提供先进的分布式服务,如分布式远程服务(Remote Service),分布式实时对象(Live Object)服务,分布式执行服务(Executor Service),分布式调度任务服务(Schedule Service)和分布式映射归纳服务(MapReduce)

更多特性和功能,请关注官网:http://redisson.org

实现原理

redis本身是不支持上述的分布式对象和集合,Redisson是通过利用redis的特性在客户端实现了高级数据结构和特性,例如优先队列的实现,是通过客户端排序整理后再存入redis。

客户端实现,意味着当没有任何客户端在线时,这些所有的数据结构和特性都不会保留,也不会自动生效,例如过期事件的触发或原来优先队列的元素增加。

注意事项

实时性

RMap中有一个功能是可以设置键值对的过期时间的,并可以注册键值对的事件-

元素淘汰功能(Eviction)

Redisson的分布式的RMapCache Java对象在基于RMap的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于RMapCache是基于RMap实现的,使它同时继承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于这样的功能来实现的。

目前的Redis自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理300个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到1小时之间。比如该次清理时删除了300条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。

正如官方wiki所述,这个功能是通过后台线程定时去清理的, 所以这个是非实时的(issue-1234:on expired event is not executed in real-time.),延迟在5秒到2小时之间,因此对实时性要求比较高的场景就得自己衡量了。

由于过期时间的非实时性,所以导致过期事件的发生也是非实时的,相应的-可能会延迟了一会儿才收到通知,在我的测试中,ttl设置在秒级误差是比较大的,分钟级别的ttl倒还好(左侧设置值,右侧实际耗时):

1s _ 5s

3s _ 5s

4s _ 5s

5s _ 9s

6s _ 10s

10s _ 15s

1m _ 1m11s

序列化

由Redisson默认的编码器为jsonJacksonCodec,JsonJackson在序列化有双向引用的对象时,会出现无限循环异常。而fastjson在检查出双向引用后会自动用引用符$ref替换,终止循环。

在我的情况中,我序列化了一个service,这个service已被spring托管,而且和另一个service之间也相互注入了,用fastjson能 正常序列化到redis,而JsonJackson则抛出无限循环异常。

为了序列化后的内容可见,所以不用redission其他自带的二进制编码器,自行实现编码器:

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.serializer.SerializerFeature;

import io-ty.buffer.ByteBuf;

import io-ty.buffer.ByteBufAllocator;

import io-ty.buffer.ByteBufInputSbxzaMtream;

import io-ty.buffer.ByteBufOutputStream;

import org.redisson.client.codec.BaseCodec;

import org.redisson.client.protocol.Decoder;

import org.redisson.client.protocol.Encoder;​

import java.io.IOException;

public class FastjsonCodec extends BaseCodec {​

private final Encoder encoder = in -> {

ByteBuf out = ByteBufAllocator.DEFAULT.buffer();

try {

ByteBufOutputStream os = new ByteBufOutputStream(out);

JSON.writeJSONString(os, in,SerializerFeature.WriteClassName);

return os.buffer();

} catch (IOException e) {

out.release();

throw e;

} catch (Exception e) {

out.release();

throw new IOException(e);

}

};

private final Decoder decoder = (buf, state) ->

JSON.parseObject(new ByteBufInputStream(buf), Object.class);

@Override

public Decoder getValueDecoder() {

return decoder;

}

@Override

public Encoder getValueEncoder() {

return encoder;

}

}

订阅发布

Redisson对订阅发布的封装是RTopic,这也是Redisson中很多事件监听的实现原理(例如键值对的事件监听)。

使用单元测试时发现,在事件发布后,订阅方需要延时一下才能收到事件。具体原因待查

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:redis redisson 限流器的实例(RRateLimiter)
下一篇:创建小程序组件是什么(小程序怎么引入组件)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~