RocketMQ源码解析:RocketMQ是如何存储消息的?

网友投稿 1174 2022-10-07

RocketMQ源码解析:RocketMQ是如何存储消息的?

RocketMQ源码解析:RocketMQ是如何存储消息的?

Broker端使用到的文件

我们先来看一下RocketMQ的消息存储流程,当消息发送到RocketMQ上时,会被顺序写入CommitLog文件,这样能保证消息存储的高性能和高吞吐量。

但是消息是按照Topic来消费的,如果消费时从CommitLog上查找对应的消息时,会比较慢。为了提高消息消费的效率,RocketMQ会将Topic一样的消息放在ConsumerQueue中,每个ConsumerQueue又分为几个写队列,一个队列一个文件。

假如创建一个名为TopicTest的topic,并创建4个写队列。那么在RocketMQ是通过如下形式存储的

需要注意的是,CommitLog和ComsumerQueue并不是将相同的消息存储了2份。CommitLog存储了消息原始的内容,而ComsumerQueue主要存储了消息在CommitLog中的偏移量,具体的消息格式看下图

borker端存储的消息格式如下

内容

解释

长度

TOTALSIZE

消息总长度

4字节

MAGICCODE

魔术,固定值Oxdaa320a7

4字节

BODYCRC

消息crc校验码

4字节

QUEUEID

消息队列id

4字节

FLAG

消息flag,供应用程序使用

4字节

QUEUEOFFSET

消息在消费队列的偏移量

8字节

PHYSICALOFFSET

消息在CommitLog文件中的偏移量

8字节

SYSFLAG

消息系统flag,例如是否压缩,是否是事务消息等

4字节

BORNTIMESTAMP

生产者调用消息发送API的时间戳

8字节

BORNHOST

消息发送者ip,端口号

8字节

STORETIMESTAMP

消息存储时间戳

8字节

STOREHOSTADDRESS

Broker服务器ip+端口号

8字节

RECONSUMETIMES

消息重试次数

4字节

Prepared Transaction Offset

事务消息物理偏移量

8字节

BodyLength

消息体长度

4字节

Body

消息体内容

BodyLength字节

TopicLength

topic长度,1字节,即主题名称不能超过255个字符

1字节

Topic

主题

TopicLength字节

PropertiesLength

消息属性长度

2字节

Properties

消息属性

PropertiesLength字节

ConsumerQueue中消息的格式如下

根据commitlog offset 和 size 就能从IndexFile中获取到具体的消息内容,而 tag hashcode 用来根据topic+tag消费时过滤消息

从存储图看到还有一个IndexFile和CommitLog也有关系

IndexFile的主要作用就是用来根据Message Key和Unique Key查找对应的消息

IndexFile文件结构如下所示

从图中可以看出,IndexFile主要分为如下3部分,IndexHead,Hash槽,Index条目

IndexHead的格式如下

字段

解释

beginTimestamp

消息的最小存储时间

endTimestamp

消息的最大存储时间

beginPhyOffset

消息的最小偏移量(commitLog文件中的偏移量)

endPhyOffset

消息的最大偏移量(commitLog文件中的偏移量)

hashSlotCount

hash槽个数

indexCount

index条目当前已使用的个数

Hash槽存储的内容为落在该Hash槽内的Index的索引(看后面图示你就会很清楚了)

每个Index条目的格式如下

字段

解释

hashcode

key的hashcode

phyoffset

消息的偏移量(commitLog文件中的偏移量)

timedif

该消息存储时间与第一条消息的时间戳的差值,小于0该消息无效

pre index no

该条目的前一条记录的Index索引,当hash冲突时,用来构建链表

key的组成有如下两种形式

Topic#Unique KeyTopic#Message Key

Unique Key是在producer端发送消息生成的

// DefaultMQProducerImpl#sendKernelImplif (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg);}

Message Key是我们在发送消息的时候设置的哈,通常具有业务意义,方便我们快速查找消息

// 指定 topicName,tagName,MessageKey,消息内容,然后发送消息String messageKey = UUID.randomUUID().toString();Message message = new Message(TOPIC_NAME, TAG_NAME, messageKey, ("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(message);System.out.println(sendResult);

IndexFile构成过程比较麻烦,画图演示一下把,你可以把IndexFile想成基于文件实现的HashMap。

假如说往数组长度为10的HashMap依次放入3个key为11,34,21的数据(以尾插法演示了哈),HashMap的结构如下

将key为11,34,21的数据放到IndexFile中的过程如下(假如hash槽的数量为10)

具体的过程为

将消息顺序放到Index条目中,将11放到index=1的位置(用index[1]表示哈),11%1=1,算出hash槽的位置是1,存的值是0(刚开始都是0,用hash[0]表示),将index[1].preIndexNo=hash[0]=0,hash[0]=1(1为index数组下标哈)将34放到index[2],34%10=4,index[2].preIndexNo=hash[0]=0将21放到index[3],21%10=1,index[3].preIndexNo=hash[1]=1

从图中可以看出来,当发生hash冲突时Index条目的preIndexNo属性充当了链表的作用。查找的过程和HashMap基本类似,先定位到槽的位置,然后顺着链表找就行了。

对具体算法感兴趣的可以看看源码,我就不贴代码了,有点多也不重要

// IndexFile的构建过程org.apache.rocketmq.store.index.IndexFile#putKey// IndexFile的查找过程org.apache.rocketmq.store.index.IndexFile#selectPhyOffset

其他文件

除了上述三种文件外,在rocketmq store文件夹下还有如下几种其他文件

lock:有时候一台机器上会起多个broker,如果数据文件放在一个目录,这时候可以通过锁来提示你使用另一个目录,防止冲突

checkpoint:文件检查点,存储commitLog最后一次刷盘时间戳,consumeQueue最后一次刷盘时间戳,IndexFile最后一次刷盘时间戳

config:运行期间一些配置信息

abort:如果存在abort文件说明Broker非正常关闭,该文件默认启动时创建,正常退出时删除

源码解析

消息写入commitLog的过程比较重要,后面会开单独的章节来分析,这次我们就分析一下从CommitLog读取数据构建ConsumeQueue和IndexFile的过程

ReputMessageService每隔1ms执行一次doReput,构建ConsumeQueue和IndexFileDefaultMessageStore.ReputMessageService#run

如下图所示,我挑了一部分比较重要的代码,标红的就是执行构建的过程。接着的一块代码和长轮询相关(长轮询相关的内容我们后面会详细介绍)DefaultMessageStore.ReputMessageService#doReput

在dispatcherList中总共有2个类

CommitLogDispatcherBuildConsumeQueue:用来构建ConsumeQueue

CommitLogDispatcherBuildIndex:用来构建IndexFileDefaultMessageStore#doDispatch

构建ConsumeQueue

非事务消息,事务提交消息会被放入ConsumeQueue。而半消息和回滚消息则不会,因为他们不会被用户消费哈

构建IndexFile

前面演示过了哈,构建IndexFile的过程和往hashmap放值类似。

可以看到key的形式有如下两种

Topic#Unique KeyTopic#Message Key

下面我们就详细分析commitLog的构建过程

参考博客

[1] 好文 [3]https://zhuanlan.zhihu.com/p/360912438

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:如何使用微信小程序做出图片上传(上传照片到微信小程序怎么操作)
下一篇:微信小程序文件类API详解(微信小程序 文件)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~