RocketMQ源码解析:同步刷盘和异步刷盘的实现

网友投稿 800 2022-10-07

RocketMQ源码解析:同步刷盘和异步刷盘的实现

RocketMQ源码解析:同步刷盘和异步刷盘的实现

同步刷盘

在RocketMQ中有同步刷盘和异步刷盘两种方式

2种刷盘方式适用的场景如下

刷盘方式

适用场景

同步刷盘

数据可靠性高,适用于金融等对数据可靠性要求高的场景,性能比异步刷盘要低

异步刷盘

性能和吞吐量高 , Broker端异常关闭时,有少量消息丢失

根据前面的章节我们知道RocketMQ会通过SendMessageProcessor来处理刷盘的消息,当消息存储到内存中后,就开始刷盘

异步刷盘的方式有两种,第一种Mmap+PageCache(默认的异步刷盘方式),上面说到的同步刷盘也是这种机制,代码实现如下

@Testpublic void writeCaseOne() throws Exception { File file = new File("/Users/peng/software/rocketmq/test/case1.txt"); FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel(); MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 2048); byteBuffer.put("hello mmap\n".getBytes()); // 将 pagecache 中的内容强制刷到磁盘 byteBuffer.force();}

第二种是DirectByteBuffer+PageCache,也就是直接写堆外内存

@Testpublic void writeCaseTwo() throws Exception { File file = new File("/Users/peng/software/rocketmq/test/case2.txt"); FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(20); byteBuffer.put("hello mmap\n".getBytes()); byteBuffer.flip(); while (byteBuffer.hasRemaining()) { fileChannel.write(byteBuffer); } // 将 pagecache 中的内容强制刷到磁盘 fileChannel.force(false);}

从CommitLog#submitFlushRequest方法可以看到刷盘的逻辑

当broker端配置的是同步刷盘,但是发送过来的消息不需要等待消息刷盘完成,就会退化成异步刷盘,我们先看同步刷盘,在RocketMQ中,并不是往内存中放一条消息,就刷盘一次,这样效率太低。RocketMQ会每隔10ms统一执行刷盘请求来提高效率

public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 有数据过来会结束等待的 this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // 省略部分逻辑}

不断执行doCommit方法进行刷盘,当刷盘完成时,会唤醒等待刷盘的线程

这里有个需要注意的细节点,我我们放请求的时候是放到requestsWrite中,但是读的时候却是在requestsRead中,那么requestsRead中能读取到值吗?

// GroupCommitService// 读请求列表private volatile LinkedList requestsWrite = new LinkedList();// 读请求列表private volatile LinkedList requestsRead = new LinkedList();

我们来看ServiceThread类的waitForRunning方法

其实当每次等待结束后都会调用onWaitEnd方法,而GroupCommitService重写了这个方法,在这个方法内部调用swapRequests方法

private void swapRequests() { lock.lock(); try { LinkedList tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } finally { lock.unlock(); }}

swapRequests方法会将requestsWrite和requestsRead中的内容进行交换。

首先通过上次刷盘位置定位到MappedFile,然后开始刷盘

可以看到有两种刷盘的方式,调用FileChannel#force(异步刷盘并且开启transientStorePool)或者MappedByteBuffer#force(同步刷盘或者异步刷盘但是不开启transientStorePool)

当刷盘的时候,需要累积到一定页数才开始刷,同步刷盘是0页,异步输盘是4页。至此同步输盘的逻辑就梳理完了。

其实异步输盘不开启transientStorePool时,执行的逻辑和这个差不多,只是累计的页数不相同而已

异步刷盘

不开启TransientStorePool

当不开启TransientStorePoo时,会先唤醒FlushRealTimeService线程,然后开始开始刷盘

先算出输盘的页数,默认4页,如果10s没有刷盘了,则将页数设为0,然后执行MappedFileQueue#flush方法,这个方法在同步刷盘已经分析过了,不再分析。

开启TransientStorePool

当开启TransientStorePool是会先唤醒CommitRealTimeService,将ByteBuffer中的内容刷入FileChannel,接着唤醒FlushRealTimeService线程,将FileChannel中的数据刷入磁盘

先算出commit的页数,默认4页,如果200ms没有commit了,则将页数设为0(在后续执行流程可以看到commit也对页数有要求),然后执行MappedFileQueue#commit方法,将将ByteBuffer中的内容刷入FileChannelMappedFile#commit0

至于这两种刷盘方式的好处,我个人理解也不是很深刻,因此转一下社区胡宗棠老师对这个问题的解读

一般有两种,有两种方式进行读写

第一种,Mmap+PageCache的方式,读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写。第二种,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。

参考博客

[1]

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于mybatis
下一篇:php 实现微信开发获取用户信息
相关文章

 发表评论

暂时没有评论,来抢沙发吧~