重点内容
RocketMQ
存储概要设计- 消息发送存储流程
- 存储文件组织与内存映射机制
RocketMQ
存储文件- 消息消费队列、索引文件构建机制
RocketMQ
文件恢复机制RocketMQ
刷盘机制RocketMQ
文件删除机制
一、存储概要设计
1、设计的核心
Commitlog
:将所有topic
的 消息存储在同一个文件(Commitlog
)中,确保消息发送时顺序写文件,尽最大努力确保消息发送的高性能 和 吞吐量;ConsumeQueue
:由于消息中间件 一般是基于 topic 的订阅机制,这样给 topic 检索带来了极大的不便。为了提高消费的效率,引入ConsumeQueue
文件,每个topic
对应 多个MessageQueue
,每个MessageQueue
对应一个ConsumeQueue
文件;IndexFile
索引文件:为了加速消息的检索性能,根据消息属性 快速从Commitlog
文件中 检索消息。
2、数据流向
1、Commitlog:消息存储文件,所有消息主题的消息都存储在 Commitlog 文件中;
2、ConsumeQueue:消息消费队列,消息到达 Commitlog 文件后,将异步转发到 ConsumeQueue,供消费者消费;
3、IndexFile:消息索引文件,主要存储 消息Key 与 Offset 的对应关系
4、事务状态服务:存储每条消息的事务状态;
5、定时消息任务:每一个延迟级别对应一个消息消费队列,存储延迟队列的消息拉取进度;
3、消息存储架构
RocketMQ Broker
单个实例下所有的队列都使用同一个日志数据文件(CommitLog)来存储(即单个实例消息整体有序),这点与kafka不同(kafka采用每个分区一个日志文件存储)。CommitLog
:日志文件,存储Producer发送的消息内容,单个文件大小默认1G (MessageStoreConfig
类的mapedFileSizeCommitLog
属性),文件文件名是起始偏移量,总共20位,起始偏移量是0。比如第一个文件的文件名为00000000000000000000,假设文件按照默认大小1G来算,当第一个文件被写满之后,开始写入第二个文件,第二个文件的文件名为00000000001073741824(1073741824=1024*1024*1024)
,第三个文件的名是00000000002147483648(文件名相差1073741824=1024*1024*1024
)ConsumeQueue
:消息的消费的逻辑队列,RocketMQ的队列不存储实际的消息数据,只存储CommitLog
中的【起始物理位置偏移量,消息的内容大小,消息Tag的哈希值】,对于物理存储来说,ConsumeQueue
对应每个 Topic 和 QueueId 下面的文件,文件路径是consumequeue/${topicName}/${queueId}/${fileName}
,每个文件默认由 30W 条数据组成(MessageStoreConfig
类的mapedFileSizeConsumeQueue
属性),每条数据由20个字节组成,即每个文件为 600w 字节,单个消费队列的文件大小约为5.722M(600w/(1024*1024))
IndexFile
:索引文件,物理存储上,文件名为创建时间的时间戳命名,固定的单个 IndexFile 文件大小约为400M,一个 IndexFile 可以保存 2000W 个索引MapedFileQueue
:对连续物理存储的抽象(存储目录的抽象),MapedFileQueue
可以看作是${ROCKET_HOME}/store/commitlog
文件夹,此文件夹下有多个MappedFile
MappedFile
:消息字节写入Page Cache
缓存区(commit 方法),或者原子性地将消息持久化的刷盘(flush方法)
s
二、初识消息存储
消息存储实现类:org.apache.rocketmq.store.DefaultMessageStore
它是存储模块里最重要的一个类,包含了很多对存储文件操作的 API,其他模块对消息实体的操作都是通过
DefaultMessageStore
进行操作。
核心属性如下:
1 | /** |
三、消息发送存储流程
消息存储入口:org.apache.rocketmq.store.DefaultMessageStore#putMessage
方法:
1 | /** |
1、这3中情况下拒绝消息写入,抛出异常
① 当前 Broker停止工作,角色是 SLAVE,或当前Broker不制止写入;
② 消息topic长度超过 256字符,消息属性长度超过 65536字符;
③ 系统PageCache 繁忙;
1 | /** |
接下来调用 commitLog.putMessage(msg);
实际保存消息到 commitlog
文件
2、commitLog.putMessage(msg)
四、存储文件组织与内存映射机制
1、MappedFileQueue
MappedFileQueue 是 MappedFile 的管理容器,是对 存储目录的封装。
例如:${rocketmq_home}/store/commitlog
下会存在多个 MappedFile
1)、关键属性
1 | public class MappedFileQueue { |
2)、关键方法
根据时间戳 查找MappedFile,找到第一个lastModifiedTimestamp >= timestamp 的文件
1 | /** |
查找哪个 MappedFile 包含 offset,则返回对应的 MappedFile
1 | /** |
2、MappedFile
org.apache.rocketmq.store.MappedFile
属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 > public static final int OS_PAGE_SIZE = 1024 * 4; // 操作系统每页大小
> protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
>
> // 当前 JVM 实例中 MappedFile 虚拟内存
> private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
> // 当前 JVM 实例中 MappedFile 对象个数
> private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
>
> // 当前该文件的写指针,从 0 开始(内存映射文件中的写指针)
> protected final AtomicInteger wrotePosition = new AtomicInteger(0);
> // 当前该文件的提交指针,如果开启transientStorePoolEnable,则数据会存在transientStorePool,然后提交到mappedByteBuffer,再刷到磁盘
> protected final AtomicInteger committedPosition = new AtomicInteger(0);
> // 刷盘指针,该指针之前的数据 已经持久化到次盘中
> private final AtomicInteger flushedPosition = new AtomicInteger(0);
> protected int fileSize; // 文件大小
> protected FileChannel fileChannel; // 写文件channel
>
> // 堆内存ByteBuffer。如果不为空(transientStorePoolEnable=true),数据首先写入writeBuffer中,
> // 然后提交到MappedFile对应的 mappedByteBuffer
> protected ByteBuffer writeBuffer = null;
> protected TransientStorePool transientStorePool = null; // 临时内存池,transientStorePoolEnable=true时启用
>
> private String fileName; // 文件名
> private long fileFromOffset; // 文件中 第一个msg 偏移量
> private File file; // 物理文件
> private MappedByteBuffer mappedByteBuffer; // 物理文件对应的 内存映射buffer
> private volatile long storeTimestamp = 0; // 文件最后一次内容写入时间戳
> private boolean firstCreateInQueue = false; // 是否是 MappedFileQueue 队列中第一个文件
>
1)、MappedFile 初始化
1 | org.apache.rocketmq.store.MappedFile#init(String fileName, int fileSize, |
分为 是否启用 TransientStorePool
TransientStorePoolEnable=true
:内容先存在堆外内存 –> 通过commit线程提交到mappedByteBuffer
–> flush线程 将mappedByteBuffer
中到数据持久化到磁盘;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* 启用 TransientStorePool
* ①、初始化MappedFile, 将 物理文件file 跟 内存关联依赖,用 mappedByteBuffer 来操作
* ②、使用 writeBuffer 和 transientStorePool
* @param fileName 物理文件名
* @param fileSize 文件大小
* @param transientStorePool 堆外内存池
* @throws IOException
*/
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer(); // 返回 transientStorePool 中的第1块 可用buffer
this.transientStorePool = transientStorePool;
}关键点:
transientStorePool.borrowBuffer()
:从transientStorePool
中借一块buffer使用
TransientStorePoolEnable=false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38/**
* 不启用 TransientStorePool
* 初始化MappedFile, 将 物理文件file 跟 内存关联依赖,用 mappedByteBuffer 来操作
* @param fileName fileName 物理文件名
* @param fileSize fileSize 文件大小
* @throws IOException
*/
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
// 根据文件名,拿到文件第1个msg的偏移量
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent()); // 确保目录存在
try {
// 把 file 使用NIO的内存映射机制,把文件映射到内存中,引用为 mappedByteBuffer
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
// JVM 持有的虚拟内存大小 +fileSize
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet(); // JVM 持有的文件数+1
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}关键点:
- 把 file 使用NIO的内存映射机制,把文件映射到内存中,引用为 mappedByteBuffer
2)、MappedFile 提交(commit)
1 | org.apache.rocketmq.store.MappedFile#commit(final int commitLeastPages); |
关键点:
this.transientStorePool.returnBuffer(writeBuffer);
:将writeBuffer
返还给transientStorePool
;
byteBuffer = writeBuffer.slice()
:声明一套新指针byteBuffer
,指向writeBuffer[position, limit]
。内容相同,指针不同;
this.fileChannel.write(byteBuffer);
:通过fileChannel
把byteBuffer
中的内容写入mappedByteBuffer
;
1 | /** |
3)、MappedFile 刷盘(flush)
刷盘是指 将内存中的数据 写到磁盘,永久存储在磁盘,具体由 MappedFile 的 flush 方法实现.
关键点:
- 启用了
transientStorePool
:this.fileChannel.force(false),然后将 flushedPosition 设置为 commitPosition;- 没启用
transientStorePool
:this.mappedByteBuffer.force(),然后 将 flushedPosition 指向 wrotePosition,因为没有 commitPosition;
代码如下所示。
1 | /** |
4)、获取MappedFile 最大读指针(getReadPostion)
1、获取当前最大可读指针
writeBuffer == null,说明未启用 transientStorePool,没有提交过程,所以 mappedByteBuffer的写指针wrotePosition 就是当前最大可读地址;
writeBuffer != null,则启用了 transientStorePool,最大可读指针 就是committedPosition,其他部分还没有提交
1 | /** |
2、查找 pos 到 getReadPosition() 之前的数据
1 | /** |
由于在整个写入期间都未曾改变 mappedByteBuffer 的指针,所以
mappedByteBuffer.slice()
方法返回的 共享缓存区空间 为 整个 MappedFile,然后通过设置 byteBuffer的 postion 为 待查找的值,读取字节为当前可读字节长度,最终返回的 ByteBuffer 的 limit(可读最大长度) 为 size。整个共享缓存区的容量为MappedFile#fileSize-pos
,故在操作 SelectMappedBufferResult 不能对里面的byteBuffer 调用 flip 方法。
5)、MappedFile 销毁(destory)
MappedFile文件销毁方法:org.apache.rocketmq.store.MappedFile#destroy
1 | /** |
1、关闭 MappedFile。
如果初次调用时 this.available 为 true,设置 this.available 为 false;
- 设置初次关闭的时间戳(firstShutdownTimestamp)为当前时间戳;
- 调用
this.release();
释放资源如果this.available 为 false,但 引用次数
this.getRefCount() > 0
,并且 当前时间戳已经超过了 最大被销毁存活时间,则强制把 引用数this.refCount
设为 负值,然后通过 release 释放资源。
1 | /** |
2、cleanup 释放资源
- 如果 available 为 true,表示MappedFile 当前可用,无须清理,返回false;
- 如果资源已经被清除,返回true;
- 如果是堆外内存,调用堆外内存的cleanup方法清除;
- 维护 TOTAL_MAPPED_VIRTUAL_MEMORY,TOTAL_MAPPED_FILES
- 返回清理完成
1 | /** |
1 | /** |
3、TransientStorePool
TransientStorePool:短暂的存储池。RocketMQ 单独创建 一个 MappedByteBuffer 内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由 commit 线程定时将数据从
该内存
复制到与目的物理文件对应的内存映射
中。
引入该机制的原因: 提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。
核心属性
1 | /** |
五、RocketMQ 存储文件
RocketMQ 存储路径为 ${ROCKET_MQ}/store,主要存储文件夹
commitlog:消息存储目录
config:运行期间的一些配置信息
consumerFilter.json:主题消息过滤信息;
consumerOffset.json:集群消费模式消息消费进度;
delayOffset.json:延时消息队列拉取进度;
subscriptionGroup.json:消息消费组配置信息;
topics.json:topic配置属性;consumequeue:消息消费队列存储目录
index:消息索引文件存储目录
abort:如果存在abort文件说明 broker 非正常关闭,该文件默认启动时创建,正常退出之前删除
checkpoint:文件监测点,存储 commitlog 文件最后一次刷盘时间戳、consumequeue 最后一次刷盘时间、index索引文件最后一次刷盘时间戳