RocketMQ设计之异步刷盘

网友投稿 1016 2022-10-19

RocketMQ设计之异步刷盘

RocketMQ设计之异步刷盘

上一篇RocketMQ设计之同步刷盘

异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入

RocketMQ默认采用异步刷盘,异步刷盘两种策略:开启缓冲池,不开启缓冲池

CommitLog的handleDiskFlush方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {

// Synchronization flush

if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {

final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;

if (messageExt.isWaitStoreMsgOK()) {

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

service.putRequest(request);

boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

if (!flushOK) {

log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()

+ " client address: " + messageExt.getBornHostString());

putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);

}

} else {

service.wakeup();

}

}

// Asynchronous flush

http:// else {

if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {

flushCommitLogService.wakeup();

} else {

commitLogService.wakeup();

}

}

}

不开启缓冲池:默认不开启,刷盘线程FlushRealTimeService会每间隔500毫秒尝试去刷盘。

class FlushRealTimeService extends FlushCommitLogService {

private long lastFlushTimestamp = 0;

private long printTimes = 0;

public void run() {

CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {

boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

//每次Flush间隔500毫秒

int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();

//每次Flush最少4页内存数据(16KB)

int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

//距离上次刷盘时间阈值为10秒

int flushPhysicQueueThoroughInterval =

CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

boolean printFlushProgress = false;

// Print flush progress

long currentTimeMillis = System.currentTimeMillis();

if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {

this.lastFlushTimestamp = currentTimeMillis;

flushPhysicQueueLeastPages = 0;

printFlushProgress = (printTimes++ % 10) == 0;

}

try {

if (flushCommitLogTimed) {

Thread.sleep(interval);

} else {

this.waitForRunning(interval);

}

if (printFlushProgress) {

this.printFlushProgress();

}

long begin = System.currentTimeMillis();

CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();

if (storeTimestamp > 0) {

CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicmsgTimestamp(storeTimestamp);

}

long past = System.currentTimeMillis() - begin;

if (past > 500) {

log.info("Flush data to disk costs {} ms", past);

}

} catch (Throwable e) {

CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);

this.printFlushProgress();

}

}

// Normal shutdown, to ensure that all the flush before exit

boolean result = false;

for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {

result = CommitLog.this.mappedFileQueue.flush(0);

CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));

}

this.printFlushProgress();

CommitLog.log.info(this.getServiceName() + " service end");

}

@Override

public String getServiceName() {

return FlushRealTimeService.class.getSimpleName();

}

private void printFlushProgress() {

// CommitLog.log.info("how much disk fall behind memory, "

// + CommitLog.this.mappedFileQueue.howMuchFallBehind());

}

@Override

public long getJointime() {

return 1000 * 60 * 5;

}

}

判断是否超过10秒没刷盘了,如果超过强制刷盘等待Flush间隔500ms通过MappedFile刷盘设置StoreCheckpoint刷盘时间点超过500ms的刷盘记录日志Broker正常停止前,把内存page中的数据刷盘

开启缓冲池:

class CommitRealTimeService extends FlushCommitLogService {

private long lastCommitTimestamp = 0;

@Override

public String getServiceName() {

return CommitRealTimeService.class.getSimpleName();

}

@Override

public void run() {

CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {

//每次提交间隔200毫秒

int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

//每次提交最少4页内存数据(16KB)

int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

//距离上次提交时间阈值为200毫秒

int commitDataThoroughInterval =

CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

long begin = System.currentTimeMillis();

if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {

this.lastCommitTimestamp = begin;

http:// commitDataLeastPages = 0;

}

try {

boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);

long end = System.currentTimeMillis();

if (!result) {

this.lastCommitTimestamp = end; // result = false means some data committed.

//now wake up flush thread.

flushCommitLogService.wakeup();

}

if (end - begin > 500) {

log.info("Commit data to file costs {} ms", end - begin);

}

this.waitForRunning(interval);

} catch (Throwable e) {

CommitLog.log.error(this.getServiceName() + " service has exception. ", e);

}

}

boolean result = false;

for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {

result = CommitLog.this.mappedFileQueue.commit(0);

CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));

}

CommitLog.log.info(this.getServicbKNLFleName() + " service end");

}

}

RocketMQ申请一块和CommitLog文件相同大小的堆外内存来做缓冲池,数据会先写入缓冲池,提交线程CommitRealTimeService也每间隔500毫秒尝试提交到文件通道等待刷盘,刷盘最终由FlushRealTimeService来完成,和不开启缓冲池的处理一致。使用缓冲池的目的是多条数据合并写入,从而提高io性能。

判断是否超过200毫秒没提交,需要强制提交提交到MappedFile,此时还未刷盘然后唤醒刷盘线程在Broker正常停止前,提交内存page中的数据

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:访问器中谨慎返回引用类型对象
下一篇:Lore ORM- 对象映射框架
相关文章

 发表评论

暂时没有评论,来抢沙发吧~