RocketMQ 消息存储 Broker



重点内容

  • RocketMQ 存储概要设计
  • 消息发送存储流程
  • 存储文件组织与内存映射机制
  • RocketMQ 存储文件
  • 消息消费队列、索引文件构建机制
  • RocketMQ 文件恢复机制
  • RocketMQ 刷盘机制
  • RocketMQ 文件删除机制

一、存储概要设计

1、设计的核心

  • Commitlog:将所有topic 的 消息存储在同一个文件(Commitlog)中,确保消息发送时顺序写文件,尽最大努力确保消息发送的高性能 和 吞吐量;

  • ConsumeQueue:由于消息中间件 一般是基于 topic 的订阅机制,这样给 topic 检索带来了极大的不便。为了提高消费的效率,引入 ConsumeQueue 文件,每个 topic 对应 多个 MessageQueue,每个 MessageQueue 对应一个 ConsumeQueue文件;

  • IndexFile索引文件:为了加速消息的检索性能,根据消息属性 快速从 Commitlog文件中 检索消息。

2、数据流向

1、Commitlog:消息存储文件,所有消息主题的消息都存储在 Commitlog 文件中;

2、ConsumeQueue:消息消费队列,消息到达 Commitlog 文件后,将异步转发到 ConsumeQueue,供消费者消费;

3、IndexFile:消息索引文件,主要存储 消息Key 与 Offset 的对应关系

4、事务状态服务:存储每条消息的事务状态;

5、定时消息任务:每一个延迟级别对应一个消息消费队列,存储延迟队列的消息拉取进度;

3、消息存储架构

  1. RocketMQ Broker单个实例下所有的队列都使用同一个日志数据文件(CommitLog)来存储(即单个实例消息整体有序),这点与kafka不同(kafka采用每个分区一个日志文件存储)。

  2. CommitLog:日志文件,存储Producer发送的消息内容,单个文件大小默认1G (MessageStoreConfig类的mapedFileSizeCommitLog属性),文件文件名是起始偏移量,总共20位,起始偏移量是0。比如第一个文件的文件名为00000000000000000000,假设文件按照默认大小1G来算,当第一个文件被写满之后,开始写入第二个文件,第二个文件的文件名为 00000000001073741824(1073741824=1024*1024*1024),第三个文件的名是00000000002147483648(文件名相差1073741824=1024*1024*1024)

  3. ConsumeQueue:消息的消费的逻辑队列,RocketMQ的队列不存储实际的消息数据,只存储CommitLog中的【起始物理位置偏移量,消息的内容大小,消息Tag的哈希值】,对于物理存储来说,ConsumeQueue 对应每个 Topic 和 QueueId 下面的文件,文件路径是consumequeue/${topicName}/${queueId}/${fileName},每个文件默认由 30W 条数据组成(MessageStoreConfig 类的 mapedFileSizeConsumeQueue 属性),每条数据由20个字节组成,即每个文件为 600w 字节,单个消费队列的文件大小约为5.722M(600w/(1024*1024))

  4. IndexFile:索引文件,物理存储上,文件名为创建时间的时间戳命名,固定的单个 IndexFile 文件大小约为400M,一个 IndexFile 可以保存 2000W 个索引

  5. MapedFileQueue:对连续物理存储的抽象(存储目录的抽象),MapedFileQueue 可以看作是${ROCKET_HOME}/store/commitlog 文件夹,此文件夹下有多个 MappedFile

  6. MappedFile:消息字节写入Page Cache 缓存区(commit 方法),或者原子性地将消息持久化的刷盘(flush方法)

s


二、初识消息存储

消息存储实现类:org.apache.rocketmq.store.DefaultMessageStore

​ 它是存储模块里最重要的一个类,包含了很多对存储文件操作的 API,其他模块对消息实体的操作都是通过 DefaultMessageStore 进行操作。

核心属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 消息存储实现类
*/
public class DefaultMessageStore implements MessageStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 消息存储配置属性
private final MessageStoreConfig messageStoreConfig;
// CommitLog 消息存储文件
private final CommitLog commitLog;
// 消息队列 存储缓存表,按消息topic分组
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
// 消息队列文件 ConsumeQueue 刷盘线程
private final FlushConsumeQueueService flushConsumeQueueService;
// 清除 Commitlog 文件服务
private final CleanCommitLogService cleanCommitLogService;
// 清除 ConsumeQueue 文件服务
private final CleanConsumeQueueService cleanConsumeQueueService;
// 索引文件实现类
private final IndexService indexService;
// MappedFile 分配服务
private final AllocateMappedFileService allocateMappedFileService;
// Commitlog 消息分发,根据 Commitlog 文件构建 ConsumeQueue、IndexFile 文件
private final ReputMessageService reputMessageService;
// 存储 HA 机制
private final HAService haService;

private final ScheduleMessageService scheduleMessageService;

private final StoreStatsService storeStatsService;
// 消息内存缓存池
private final TransientStorePool transientStorePool;

private final RunningFlags runningFlags = new RunningFlags();
private final SystemClock systemClock = new SystemClock();

private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
private final BrokerStatsManager brokerStatsManager;
// 消息拉取,长轮询模式消息到达监听器
private final MessageArrivingListener messageArrivingListener;
// Broker 配置属性
private final BrokerConfig brokerConfig;

private volatile boolean shutdown = true;
// 文件刷盘监测点
private StoreCheckpoint storeCheckpoint;

private AtomicLong printTimes = new AtomicLong(0);
// Commitlog 文件转发请求
private final LinkedList<CommitLogDispatcher> dispatcherList;

private RandomAccessFile lockFile;

private FileLock lock;

boolean shutDownNormal = false;
}

三、消息发送存储流程

消息存储入口:org.apache.rocketmq.store.DefaultMessageStore#putMessage

方法:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 将 producer 发来的消息 存在 broker 的 commitlog 中
* @param msg Message instance to store
* @return
*/
public PutMessageResult putMessage(MessageExtBrokerInner msg) {...}
/**
* 将 producer 发来的消息 存在 broker 的 commitlog 中
* @param messageExtBatch Message batch.
* @return
*/
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {...}

1、这3中情况下拒绝消息写入,抛出异常

① 当前 Broker停止工作,角色是 SLAVE,或当前Broker不制止写入;

② 消息topic长度超过 256字符,消息属性长度超过 65536字符;

③ 系统PageCache 繁忙;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* 将 producer 发来的消息 存在 broker 的 commitlog 中
* @param msg Message instance to store
* @return
*/
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
if (this.shutdown) { // 如果已经shutdown 抛出异常
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}

if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
// 如果 当前broker是 slave,则抛出异常
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}

return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}

if (!this.runningFlags.isWriteable()) {
// 如果 当前flag中 有不可写 flag,则返回异常
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden "
+ this.runningFlags.getFlagBits());
}

return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}

if (msg.getTopic().length() > Byte.MAX_VALUE) {
// 如果msg的topic 太长,则抛出消息异常
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}

if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
// 如果 消息的 propertiesString 扩展属性字符串太长,则抛出异常
log.warn("putMessage message properties length too long "
+ msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}

if (this.isOSPageCacheBusy()) {
// 如果 系统PageCache 繁忙, 则 返回异常 系统繁忙
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}

long beginTime = this.getSystemClock().now();
// 调用 commitlog 实际保存 msg
PutMessageResult result = this.commitLog.putMessage(msg);

long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}",
eclipseTime, msg.getBody().length);
}
// 统计存储1次消息,耗时 大概处于哪个区间,方便分析性能
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

if (null == result || !result.isOk()) {
// 如果 失败,统计失败次数
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}

return result;
}

接下来调用 commitLog.putMessage(msg); 实际保存消息到 commitlog 文件


2、commitLog.putMessage(msg)



四、存储文件组织与内存映射机制


1、MappedFileQueue

MappedFileQueue 是 MappedFile 的管理容器,是对 存储目录的封装。

例如:${rocketmq_home}/store/commitlog 下会存在多个 MappedFile

1)、关键属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class MappedFileQueue {
// 批量删除的最大文件数
private static final int DELETE_FILES_BATCH_MAX = 10;

// 存储目录
private final String storePath;
// 单个文件的存储大小
private final int mappedFileSize;
// MappedFile 文件列表
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
// 创建 MappedFile 服务类
private final AllocateMappedFileService allocateMappedFileService;
// 当前刷盘指针,表示该指针之前的数据已经 全部持久化 到了磁盘
private long flushedWhere = 0;
// 当前提交指针,内存中 ByteBuffer 当前的写指针,该值大于等于 flushedWhere
private long committedWhere = 0;

private volatile long storeTimestamp = 0;
}

2)、关键方法

根据时间戳 查找MappedFile,找到第一个lastModifiedTimestamp >= timestamp 的文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 根据时间戳 查找MappedFile,找到第一个lastModifiedTimestamp >= timestamp 的文件
* @param timestamp 时间戳 long
* @return 满足条件的MappedFile
*/
public MappedFile getMappedFileByTime(final long timestamp) {
Object[] mfs = this.copyMappedFiles(0);

if (null == mfs)
return null;

for (int i = 0; i < mfs.length; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
return mappedFile;
}
}

return (MappedFile) mfs[mfs.length - 1];
}

查找哪个 MappedFile 包含 offset,则返回对应的 MappedFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* Finds a mapped file by offset.
* 查找哪个 MappedFile 包含 offset,则返回对应的 MappedFile
* @param offset Offset.
* @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
* @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
*/
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
try {
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null) {
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
// 如果参数 offset 不在 当前MappedFileQueue 范围内,返回 returnFirstOnNotFound
LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
offset,
firstMappedFile.getFileFromOffset(),
lastMappedFile.getFileFromOffset() + this.mappedFileSize,
this.mappedFileSize,
this.mappedFiles.size());
} else {
// 如果参数 offset 在 当前MappedFileQueue 范围内
// 拿到 offset 对应的 index
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
MappedFile targetFile = null;
try {
targetFile = this.mappedFiles.get(index);
} catch (Exception ignored) {
}

if (targetFile != null && offset >= targetFile.getFileFromOffset()
&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
// 如果 offset 在 index 文件的 偏移量范围内,返回 targetFile
return targetFile;
}

for (MappedFile tmpMappedFile : this.mappedFiles) {
// 遍历 所有 mappedFiles,查找满足条件的 mappedFile。
// 一般上一步可以找到,这里属于托底
if (offset >= tmpMappedFile.getFileFromOffset()
&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
return tmpMappedFile;
}
}
}

if (returnFirstOnNotFound) {
return firstMappedFile;
}
}
} catch (Exception e) {
log.error("findMappedFileByOffset Exception", e);
}

return null;
}

2、MappedFile

org.apache.rocketmq.store.MappedFile

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
> public static final int OS_PAGE_SIZE = 1024 * 4;  // 操作系统每页大小
> protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
>
> // 当前 JVM 实例中 MappedFile 虚拟内存
> private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
> // 当前 JVM 实例中 MappedFile 对象个数
> private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
>
> // 当前该文件的写指针,从 0 开始(内存映射文件中的写指针)
> protected final AtomicInteger wrotePosition = new AtomicInteger(0);
> // 当前该文件的提交指针,如果开启transientStorePoolEnable,则数据会存在transientStorePool,然后提交到mappedByteBuffer,再刷到磁盘
> protected final AtomicInteger committedPosition = new AtomicInteger(0);
> // 刷盘指针,该指针之前的数据 已经持久化到次盘中
> private final AtomicInteger flushedPosition = new AtomicInteger(0);
> protected int fileSize; // 文件大小
> protected FileChannel fileChannel; // 写文件channel
>
> // 堆内存ByteBuffer。如果不为空(transientStorePoolEnable=true),数据首先写入writeBuffer中,
> // 然后提交到MappedFile对应的 mappedByteBuffer
> protected ByteBuffer writeBuffer = null;
> protected TransientStorePool transientStorePool = null; // 临时内存池,transientStorePoolEnable=true时启用
>
> private String fileName; // 文件名
> private long fileFromOffset; // 文件中 第一个msg 偏移量
> private File file; // 物理文件
> private MappedByteBuffer mappedByteBuffer; // 物理文件对应的 内存映射buffer
> private volatile long storeTimestamp = 0; // 文件最后一次内容写入时间戳
> private boolean firstCreateInQueue = false; // 是否是 MappedFileQueue 队列中第一个文件
>

1)、MappedFile 初始化

1
2
3
4
org.apache.rocketmq.store.MappedFile#init(String fileName, int fileSize, 
TransientStorePool transientStorePool);

org.apache.rocketmq.store.MappedFile#init(String fileName, int fileSize);

分为 是否启用 TransientStorePool

  1. TransientStorePoolEnable=true:内容先存在堆外内存 –> 通过commit线程提交到 mappedByteBuffer –> flush线程 将 mappedByteBuffer 中到数据持久化到磁盘;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * 启用 TransientStorePool
    * ①、初始化MappedFile, 将 物理文件file 跟 内存关联依赖,用 mappedByteBuffer 来操作
    * ②、使用 writeBuffer 和 transientStorePool
    * @param fileName 物理文件名
    * @param fileSize 文件大小
    * @param transientStorePool 堆外内存池
    * @throws IOException
    */
    public void init(final String fileName, final int fileSize,
    final TransientStorePool transientStorePool) throws IOException {
    init(fileName, fileSize);
    this.writeBuffer = transientStorePool.borrowBuffer(); // 返回 transientStorePool 中的第1块 可用buffer
    this.transientStorePool = transientStorePool;
    }

    关键点:

    • transientStorePool.borrowBuffer():从transientStorePool中借一块buffer使用
  2. TransientStorePoolEnable=false

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    /**
    * 不启用 TransientStorePool
    * 初始化MappedFile, 将 物理文件file 跟 内存关联依赖,用 mappedByteBuffer 来操作
    * @param fileName fileName 物理文件名
    * @param fileSize fileSize 文件大小
    * @throws IOException
    */
    private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    // 根据文件名,拿到文件第1个msg的偏移量
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    ensureDirOK(this.file.getParent()); // 确保目录存在

    try {
    // 把 file 使用NIO的内存映射机制,把文件映射到内存中,引用为 mappedByteBuffer
    this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
    this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

    // JVM 持有的虚拟内存大小 +fileSize
    TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
    TOTAL_MAPPED_FILES.incrementAndGet(); // JVM 持有的文件数+1
    ok = true;
    } catch (FileNotFoundException e) {
    log.error("create file channel " + this.fileName + " Failed. ", e);
    throw e;
    } catch (IOException e) {
    log.error("map file " + this.fileName + " Failed. ", e);
    throw e;
    } finally {
    if (!ok && this.fileChannel != null) {
    this.fileChannel.close();
    }
    }
    }

    关键点:

    • 把 file 使用NIO的内存映射机制,把文件映射到内存中,引用为 mappedByteBuffer

2)、MappedFile 提交(commit)

1
2
3
org.apache.rocketmq.store.MappedFile#commit(final int commitLeastPages);

org.apache.rocketmq.store.MappedFile#commit0(final int commitLeastPages);

关键点:

  • this.transientStorePool.returnBuffer(writeBuffer);:将 writeBuffer 返还给 transientStorePool

  • byteBuffer = writeBuffer.slice():声明一套新指针byteBuffer,指向writeBuffer[position, limit]。内容相同,指针不同;

  • this.fileChannel.write(byteBuffer);:通过 fileChannelbyteBuffer 中的内容写入 mappedByteBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* writeBuffer 中的 内容提交到 fileChannel
* @param commitLeastPages 至少提交的页数
* @return 返回 已提交位置 committedPosition
*/
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
// 不启用 TransientStorePool,就不需要内存拷贝。仅需要 committedPosition = wrotePosition 就行了
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
// 当前的脏页 大于commitLeastPages 才能提交
if (this.hold()) {
// 当前 MappedFile 可用
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}

// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
// 如果启用了 transientStorePool,并且 已提交位置 已经写满整个物理文件,则 将 writeBuffer 归还 transientStorePool
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}

return this.committedPosition.get();
}

/**
* 提交 writeBuffer 中的 未提交内容 到 fileChannel
* @param commitLeastPages 至少提交页数
*/
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get(); // 当前写指针位置
int lastCommittedPosition = this.committedPosition.get(); // 上次 提交指针 位置

if (writePos - this.committedPosition.get() > 0) {
try {
// 声明一套新指针byteBuffer,指向 writeBuffer[position, limit]。内容相同,指针不同
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
// 将 writeBuffer[lastCommittedPosition, writePos] 之间的数据 通过fileChannel 写入 物理文件
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
// committedPosition 执向当前 写指针位置(writePos)
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}

3)、MappedFile 刷盘(flush)

刷盘是指 将内存中的数据 写到磁盘,永久存储在磁盘,具体由 MappedFile 的 flush 方法实现.

关键点:

  • 启用了 transientStorePool:this.fileChannel.force(false),然后将 flushedPosition 设置为 commitPosition;
  • 没启用 transientStorePool:this.mappedByteBuffer.force(),然后 将 flushedPosition 指向 wrotePosition,因为没有 commitPosition;

代码如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 刷盘
* @param flushLeastPages 刷盘的最小页数
* @return The current flushed position, 已刷盘的位置
*/
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {
if (this.hold()) {
// 当前 MappedFile 可用,记录正在使用
int value = getReadPosition();

try {
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
// writeBuffer不为空,flushedPosition 相当于 上一次 commit指针
this.fileChannel.force(false);
} else {
// writeBuffer为空, 数据直接进入mappedByteBuffer,wrotePosition代表mappedByteBuffer的指针
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}

this.flushedPosition.set(value);
this.release(); // 记录本次 MappedFile 使用完成
} else {
log.warn("in flush, hold failed, flush offset = "
+ this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}

4)、获取MappedFile 最大读指针(getReadPostion)

1、获取当前最大可读指针

  • writeBuffer == null,说明未启用 transientStorePool,没有提交过程,所以 mappedByteBuffer的写指针wrotePosition 就是当前最大可读地址;

  • writeBuffer != null,则启用了 transientStorePool,最大可读指针 就是committedPosition,其他部分还没有提交

1
2
3
4
5
6
7
8
9
/**
* writeBuffer == null,说明未启用 transientStorePool,没有提交过程,所以 mappedByteBuffer的
* 写指针wrotePosition 就是当前最大可读地址
* writeBuffer != null,则启用了 transientStorePool,最大可读指针 就是committedPosition,其他部分还没有提交
* @return The max position which have valid data
*/
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}

2、查找 pos 到 getReadPosition() 之前的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 从 MappedFile 中 选择一块区域
* @param pos MappedFile 中 相对于 fileFromOffset 的 位置
* @return 一套指针,指向 MappedFile中 的 指定位置
*/
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos,
byteBufferNew, size, this);
}
}

return null;
}

​ 由于在整个写入期间都未曾改变 mappedByteBuffer 的指针,所以 mappedByteBuffer.slice() 方法返回的 共享缓存区空间 为 整个 MappedFile,然后通过设置 byteBuffer的 postion 为 待查找的值,读取字节为当前可读字节长度,最终返回的 ByteBuffer 的 limit(可读最大长度) 为 size。整个共享缓存区的容量为 MappedFile#fileSize-pos,故在操作 SelectMappedBufferResult 不能对里面的byteBuffer 调用 flip 方法。


5)、MappedFile 销毁(destory)

MappedFile文件销毁方法:org.apache.rocketmq.store.MappedFile#destroy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* MappedFile 销毁
* @param intervalForcibly 拒绝被销毁的最大存活时间
* @return boolean 是否销毁成功
*/
public boolean destroy(final long intervalForcibly) {
// 释放资源
this.shutdown(intervalForcibly);

if (this.isCleanupOver()) {
// 如果已经清理完成
try {
this.fileChannel.close(); // 关闭channel
log.info("close file channel " + this.fileName + " OK");

long beginTime = System.currentTimeMillis();
boolean result = this.file.delete(); // 删除文件
// 记录删除时信息
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeEclipseTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}

return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}

return false;
}

1、关闭 MappedFile。

如果初次调用时 this.available 为 true,设置 this.available 为 false;

  • 设置初次关闭的时间戳(firstShutdownTimestamp)为当前时间戳;
  • 调用 this.release(); 释放资源

如果this.available 为 false,但 引用次数this.getRefCount() > 0,并且 当前时间戳已经超过了 最大被销毁存活时间,则强制把 引用数 this.refCount 设为 负值,然后通过 release 释放资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 将 对象置为 不可用,引用数降低
* @param intervalForcibly
*/
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release(); // 减少refCount,如果时机合适,调用 cleanup() 释放资源
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}

2、cleanup 释放资源

  1. 如果 available 为 true,表示MappedFile 当前可用,无须清理,返回false;
  2. 如果资源已经被清除,返回true;
  3. 如果是堆外内存,调用堆外内存的cleanup方法清除;
  4. 维护 TOTAL_MAPPED_VIRTUAL_MEMORY,TOTAL_MAPPED_FILES
  5. 返回清理完成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 释放资源
* @param currentRef 当前引用数,用于记日志 不做判断
* @return 是否释放成功
*/
@Override
public boolean cleanup(final long currentRef) {
if (this.isAvailable()) { // 当前MappedFile是否可用
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have not shutdown, stop unmapping.");
return false;
}

if (this.isCleanupOver()) { // 是否清理完成
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have cleanup, do not do it again.");
return true;
}

// 释放 mappedByteBuffer
clean(this.mappedByteBuffer);
// JVM 虚拟内存数减少
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
// JVM 引用文件数减少
TOTAL_MAPPED_FILES.decrementAndGet();
log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 通过反射清理 MappedByteBuffer
* @param buffer MappedByteBuffer
*/
public static void clean(final ByteBuffer buffer) {
if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
return;
// 嵌套递归获取directByteBuffer的最内部的attachment或者viewedBuffer方法
// 获取directByteBuffer的Cleaner对象,然后调用cleaner.clean方法,进行释放资源
invoke(invoke(viewed(buffer), "cleaner"), "clean");
}

/**
* 递归获取directByteBuffer的最内部的attachment或者viewedBuffer方法,调用
* @param buffer MappedByteBuffer
* @return MappedByteBuffer
*/
private static ByteBuffer viewed(ByteBuffer buffer) {
String methodName = "viewedBuffer";

Method[] methods = buffer.getClass().getMethods();
for (int i = 0; i < methods.length; i++) {
if (methods[i].getName().equals("attachment")) {
methodName = "attachment";
break;
}
}
// 如果 有attachment,则调用attachment;如果没有,则调用 viewedBuffer
ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
if (viewedBuffer == null)
return buffer;
else
return viewed(viewedBuffer);
}

/**
* 通过反射去调用 target 的 方法 methodName
* @param target 目标对象
* @param methodName 目标对象的方法
* @param args 目标对象的方法 的参数
* @return
*/
private static Object invoke(final Object target, final String methodName,
final Class<?>... args) {
return AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
Method method = method(target, methodName, args);
method.setAccessible(true);
return method.invoke(target);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
});
}

3、TransientStorePool

TransientStorePool:短暂的存储池。RocketMQ 单独创建 一个 MappedByteBuffer 内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由 commit 线程定时将数据从 该内存 复制到 与目的物理文件对应的内存映射 中。

引入该机制的原因: 提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* TransientStorePool 是一个队列,包含多个可用的 buffer
*/
public class TransientStorePool {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

// avaliableBuffers 个数,可通过在 broker中配置文件中设置 transientStorePoolSize大小,默认为5
// 创建 poolSize 个堆外内存,并利用 com.sum.jna.Library 类库将该批内存锁定,避免被置换到 交换区,提高存储性能
private final int poolSize;
// 每个 ByteBuffer 大小,默认为 mapedFileSizeCommitLog,表明TransientStorePool为 commitLog 文件服务
private final int fileSize;
// 存放 avaliableBuffer 的 双端队列
private final Deque<ByteBuffer> availableBuffers;
// 存储相关的配置
private final MessageStoreConfig storeConfig;
}

五、RocketMQ 存储文件

RocketMQ 存储路径为 ${ROCKET_MQ}/store,主要存储文件夹

  1. commitlog:消息存储目录

  2. config:运行期间的一些配置信息
    consumerFilter.json:主题消息过滤信息;
    consumerOffset.json:集群消费模式消息消费进度;
    delayOffset.json:延时消息队列拉取进度;
    subscriptionGroup.json:消息消费组配置信息;
    topics.json:topic配置属性;

  3. consumequeue:消息消费队列存储目录

  4. index:消息索引文件存储目录

  5. abort:如果存在abort文件说明 broker 非正常关闭,该文件默认启动时创建,正常退出之前删除

  6. checkpoint:文件监测点,存储 commitlog 文件最后一次刷盘时间戳、consumequeue 最后一次刷盘时间、index索引文件最后一次刷盘时间戳

1、Commitlog 文件


2、ConsumeQueue 文件


3、Index 索引文件


4、checkpoint 文件


六、实时更新消息消费队列、索引文件

1、根据消息更新 ConsumeQueue


2、根据消息更新 Index 索引文件


七、消息队列和索引文件恢复

1、Broker正常停止文件恢复


2、Broker异常停止文件恢复


八、文件刷盘机制

1、Broker同步刷盘


2、Broker异步刷盘


九、过期文件删除机制