RocketMQ 消息存储 Broker
重点内容
RocketMQ
存储概要设计- 消息发送存储流程
- 存储文件组织与内存映射机制
RocketMQ
存储文件- 消息消费队列、索引文件构建机制
RocketMQ
文件恢复机制RocketMQ
刷盘机制RocketMQ
文件删除机制
一、存储概要设计
1、设计的核心
Commitlog
:将所有topic
的 消息存储在同一个文件(Commitlog
)中,确保消息发送时顺序写文件,尽最大努力确保消息发送的高性能 和 吞吐量;ConsumeQueue
:由于消息中间件 一般是基于 topic 的订阅机制,这样给 topic 检索带来了极大的不便。为了提高消费的效率,引入ConsumeQueue
文件,每个topic
对应 多个MessageQueue
,每个MessageQueue
对应一个ConsumeQueue
文件;IndexFile
索引文件:为了加速消息的检索性能,根据消息属性 快速从Commitlog
文件中 检索消息。
2、数据流向
1、Commitlog:消息存储文件,所有消息主题的消息都存储在 Commitlog 文件中;
2、ConsumeQueue:消息消费队列,消息到达 Commitlog 文件后,将异步转发到 ConsumeQueue,供消费者消费;
3、IndexFile:消息索引文件,主要存储 消息Key 与 Offset 的对应关系
4、事务状态服务:存储每条消息的事务状态;
5、定时消息任务:每一个延迟级别对应一个消息消费队列,存储延迟队列的消息拉取进度;
3、消息存储架构
RocketMQ Broker
单个实例下所有的队列都使用同一个日志数据文件(CommitLog)来存储(即单个实例消息整体有序),这点与kafka不同(kafka采用每个分区一个日志文件存储)。CommitLog
:日志文件,存储Producer发送的消息内容,单个文件大小默认1G (MessageStoreConfig
类的mapedFileSizeCommitLog
属性),文件文件名是起始偏移量,总共20位,起始偏移量是0。比如第一个文件的文件名为00000000000000000000,假设文件按照默认大小1G来算,当第一个文件被写满之后,开始写入第二个文件,第二个文件的文件名为00000000001073741824(1073741824=1024*1024*1024)
,第三个文件的名是00000000002147483648(文件名相差1073741824=1024*1024*1024
)ConsumeQueue
:消息的消费的逻辑队列,RocketMQ的队列不存储实际的消息数据,只存储CommitLog
中的【起始物理位置偏移量,消息的内容大小,消息Tag的哈希值】,对于物理存储来说,ConsumeQueue
对应每个 Topic 和 QueueId 下面的文件,文件路径是consumequeue/${topicName}/${queueId}/${fileName}
,每个文件默认由 30W 条数据组成(MessageStoreConfig
类的mapedFileSizeConsumeQueue
属性),每条数据由20个字节组成,即每个文件为 600w 字节,单个消费队列的文件大小约为5.722M(600w/(1024*1024))
IndexFile
:索引文件,物理存储上,文件名为创建时间的时间戳命名,固定的单个 IndexFile 文件大小约为400M,一个 IndexFile 可以保存 2000W 个索引MapedFileQueue
:对连续物理存储的抽象(存储目录的抽象),MapedFileQueue
可以看作是${ROCKET_HOME}/store/commitlog
文件夹,此文件夹下有多个MappedFile
MappedFile
:消息字节写入Page Cache
缓存区(commit 方法),或者原子性地将消息持久化的刷盘(flush方法)
s
二、初识消息存储
消息存储实现类:org.apache.rocketmq.store.DefaultMessageStore
它是存储模块里最重要的一个类,包含了很多对存储文件操作的 API,其他模块对消息实体的操作都是通过
DefaultMessageStore
进行操作。
核心属性如下:
1 | /** |
三、消息发送存储流程
消息存储入口:org.apache.rocketmq.store.DefaultMessageStore#putMessage
方法:
1 | /** |
1、这3中情况下拒绝消息写入,抛出异常
① 当前 Broker停止工作,角色是 SLAVE,或当前Broker不制止写入;
② 消息topic长度超过 256字符,消息属性长度超过 65536字符;
③ 系统PageCache 繁忙;
1 | /** |
接下来调用 commitLog.putMessage(msg);
实际保存消息到 commitlog
文件
2、commitLog.putMessage(msg)
四、存储文件组织与内存映射机制
1、MappedFileQueue
MappedFileQueue 是 MappedFile 的管理容器,是对 存储目录的封装。
例如:${rocketmq_home}/store/commitlog
下会存在多个 MappedFile
1)、关键属性
1 | public class MappedFileQueue { |
2)、关键方法
根据时间戳 查找MappedFile,找到第一个lastModifiedTimestamp >= timestamp 的文件
1 | /** |
查找哪个 MappedFile 包含 offset,则返回对应的 MappedFile
1 | /** |
2、MappedFile
org.apache.rocketmq.store.MappedFile
属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 > public static final int OS_PAGE_SIZE = 1024 * 4; // 操作系统每页大小
> protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
>
> // 当前 JVM 实例中 MappedFile 虚拟内存
> private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
> // 当前 JVM 实例中 MappedFile 对象个数
> private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
>
> // 当前该文件的写指针,从 0 开始(内存映射文件中的写指针)
> protected final AtomicInteger wrotePosition = new AtomicInteger(0);
> // 当前该文件的提交指针,如果开启transientStorePoolEnable,则数据会存在transientStorePool,然后提交到mappedByteBuffer,再刷到磁盘
> protected final AtomicInteger committedPosition = new AtomicInteger(0);
> // 刷盘指针,该指针之前的数据 已经持久化到次盘中
> private final AtomicInteger flushedPosition = new AtomicInteger(0);
> protected int fileSize; // 文件大小
> protected FileChannel fileChannel; // 写文件channel
>
> // 堆内存ByteBuffer。如果不为空(transientStorePoolEnable=true),数据首先写入writeBuffer中,
> // 然后提交到MappedFile对应的 mappedByteBuffer
> protected ByteBuffer writeBuffer = null;
> protected TransientStorePool transientStorePool = null; // 临时内存池,transientStorePoolEnable=true时启用
>
> private String fileName; // 文件名
> private long fileFromOffset; // 文件中 第一个msg 偏移量
> private File file; // 物理文件
> private MappedByteBuffer mappedByteBuffer; // 物理文件对应的 内存映射buffer
> private volatile long storeTimestamp = 0; // 文件最后一次内容写入时间戳
> private boolean firstCreateInQueue = false; // 是否是 MappedFileQueue 队列中第一个文件
>
1)、MappedFile 初始化
1 | org.apache.rocketmq.store.MappedFile#init(String fileName, int fileSize, |
分为 是否启用 TransientStorePool
TransientStorePoolEnable=true
:内容先存在堆外内存 –> 通过commit线程提交到mappedByteBuffer
–> flush线程 将mappedByteBuffer
中到数据持久化到磁盘;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* 启用 TransientStorePool
* ①、初始化MappedFile, 将 物理文件file 跟 内存关联依赖,用 mappedByteBuffer 来操作
* ②、使用 writeBuffer 和 transientStorePool
* @param fileName 物理文件名
* @param fileSize 文件大小
* @param transientStorePool 堆外内存池
* @throws IOException
*/
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer(); // 返回 transientStorePool 中的第1块 可用buffer
this.transientStorePool = transientStorePool;
}关键点:
transientStorePool.borrowBuffer()
:从transientStorePool
中借一块buffer使用
TransientStorePoolEnable=false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38/**
* 不启用 TransientStorePool
* 初始化MappedFile, 将 物理文件file 跟 内存关联依赖,用 mappedByteBuffer 来操作
* @param fileName fileName 物理文件名
* @param fileSize fileSize 文件大小
* @throws IOException
*/
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
// 根据文件名,拿到文件第1个msg的偏移量
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent()); // 确保目录存在
try {
// 把 file 使用NIO的内存映射机制,把文件映射到内存中,引用为 mappedByteBuffer
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
// JVM 持有的虚拟内存大小 +fileSize
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet(); // JVM 持有的文件数+1
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}关键点:
- 把 file 使用NIO的内存映射机制,把文件映射到内存中,引用为 mappedByteBuffer
2)、MappedFile 提交(commit)
1 | org.apache.rocketmq.store.MappedFile#commit(final int commitLeastPages); |
关键点:
this.transientStorePool.returnBuffer(writeBuffer);
:将writeBuffer
返还给transientStorePool
;
byteBuffer = writeBuffer.slice()
:声明一套新指针byteBuffer
,指向writeBuffer[position, limit]
。内容相同,指针不同;
this.fileChannel.write(byteBuffer);
:通过fileChannel
把byteBuffer
中的内容写入mappedByteBuffer
;
1 | /** |
3)、MappedFile 刷盘(flush)
刷盘是指 将内存中的数据 写到磁盘,永久存储在磁盘,具体由 MappedFile 的 flush 方法实现.
关键点:
- 启用了
transientStorePool
:this.fileChannel.force(false),然后将 flushedPosition 设置为 commitPosition;- 没启用
transientStorePool
:this.mappedByteBuffer.force(),然后 将 flushedPosition 指向 wrotePosition,因为没有 commitPosition;
代码如下所示。
1 | /** |
4)、获取MappedFile 最大读指针(getReadPostion)
1、获取当前最大可读指针
writeBuffer == null,说明未启用 transientStorePool,没有提交过程,所以 mappedByteBuffer的写指针wrotePosition 就是当前最大可读地址;
writeBuffer != null,则启用了 transientStorePool,最大可读指针 就是committedPosition,其他部分还没有提交
1 | /** |
2、查找 pos 到 getReadPosition() 之前的数据
1 | /** |
由于在整个写入期间都未曾改变 mappedByteBuffer 的指针,所以
mappedByteBuffer.slice()
方法返回的 共享缓存区空间 为 整个 MappedFile,然后通过设置 byteBuffer的 postion 为 待查找的值,读取字节为当前可读字节长度,最终返回的 ByteBuffer 的 limit(可读最大长度) 为 size。整个共享缓存区的容量为MappedFile#fileSize-pos
,故在操作 SelectMappedBufferResult 不能对里面的byteBuffer 调用 flip 方法。
5)、MappedFile 销毁(destory)
MappedFile文件销毁方法:org.apache.rocketmq.store.MappedFile#destroy
1 | /** |
1、关闭 MappedFile。
如果初次调用时 this.available 为 true,设置 this.available 为 false;
- 设置初次关闭的时间戳(firstShutdownTimestamp)为当前时间戳;
- 调用
this.release();
释放资源如果this.available 为 false,但 引用次数
this.getRefCount() > 0
,并且 当前时间戳已经超过了 最大被销毁存活时间,则强制把 引用数this.refCount
设为 负值,然后通过 release 释放资源。
1 | /** |
2、cleanup 释放资源
- 如果 available 为 true,表示MappedFile 当前可用,无须清理,返回false;
- 如果资源已经被清除,返回true;
- 如果是堆外内存,调用堆外内存的cleanup方法清除;
- 维护 TOTAL_MAPPED_VIRTUAL_MEMORY,TOTAL_MAPPED_FILES
- 返回清理完成
1 | /** |
1 | /** |
3、TransientStorePool
TransientStorePool:短暂的存储池。RocketMQ 单独创建 一个 MappedByteBuffer 内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由 commit 线程定时将数据从
该内存
复制到与目的物理文件对应的内存映射
中。
引入该机制的原因: 提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。
核心属性
1 | /** |
五、RocketMQ 存储文件
RocketMQ 存储路径为 ${ROCKET_MQ}/store,主要存储文件夹
commitlog:消息存储目录
config:运行期间的一些配置信息
consumerFilter.json:主题消息过滤信息;
consumerOffset.json:集群消费模式消息消费进度;
delayOffset.json:延时消息队列拉取进度;
subscriptionGroup.json:消息消费组配置信息;
topics.json:topic配置属性;consumequeue:消息消费队列存储目录
index:消息索引文件存储目录
abort:如果存在abort文件说明 broker 非正常关闭,该文件默认启动时创建,正常退出之前删除
checkpoint:文件监测点,存储 commitlog 文件最后一次刷盘时间戳、consumequeue 最后一次刷盘时间、index索引文件最后一次刷盘时间戳
1、Commitlog 文件
2、ConsumeQueue 文件
3、Index 索引文件
4、checkpoint 文件
六、实时更新消息消费队列、索引文件
1、根据消息更新 ConsumeQueue
2、根据消息更新 Index 索引文件
七、消息队列和索引文件恢复
1、Broker正常停止文件恢复
2、Broker异常停止文件恢复
八、文件刷盘机制
1、Broker同步刷盘
2、Broker异步刷盘
九、过期文件删除机制
linux awk
awk 实现类似 group by功能
文件(filename.txt)内容
1 | 499 |
命令:
1 | $ awk '{arr[$1]+=1}END{for(i in arr)print i,arr[i]}' filename.txt |
awk 使用 if 条件过滤
1 | $ grep '2020:09' nginx-access.log* | grep -v 'DNSPod' | awk '{ if($9>=400&&$9<500) print $9 }' |
JVM基础
- class类文件结构
- 类加载机制
- jvm运行时数据区
一、Class文件结构
二、类加载机制
class的声明周期
1、加载
1、根据全限定名获取二进制字节流
1 | Class.forName("com.mysql.jdbc.Driver"); |
2、把字节流代表的静态存储结构(class文件) 转化为 方法区的运行时数据结构
3、Java堆上生成一个代表该类的对象
Class 类 ==> 元数据区(方法区)==> 蓝图
Object对象 ==> 根据蓝图做成的 成品 Kclass
2、验证
1、魔数:cafebabe
2、主版本号、次版本号
3、常量池:常量池里的常量是否有不被支持的类型
4、验证常量池中的值是否存在
3、准备
准备阶段是准备内存的过程,初始化为0
4、解析
5、初始化
jdk编译默认添加构造函数
clinit class init: 类的初始化
有static 变量 才有 clinit
clinit 父类 静态变量, 父类静态块,子类静态变量,子类静态块
init():V 父类的变量初始化,父类的构造方法,
6、使用
7、卸载
三、类加载器
8、ClassLoader
类加载器的默认机制:双亲委派机制
1 | protected Class<?> loadClass(String name, boolean resolve) |
C:/Program Files/Java/jdk1.8.0_60/jre/lib/rt.jar!/sun/misc/Launcher.class
BootstrapClassLoader 加载 System.getProperty(“sun.boot.class.path”); 路径下的class
ExtClassLoader 加载 System.getProperty(“java.ext.dirs”); 路径下的class
AppClassLoader 加载 System.getProperty(“java.class.path”); 路径下的class
使用双亲委派的原因:
保护jdk的基类
9、Tomcat打破了双亲委派机制
自定义类加载器,重写 loadClass,就能打破双亲委派
判断两个Class是否相等,
false,false,true,false
即使是同一个class文件,使用不同的ClassLoader加载进来,那么它就是不同的 class
9、Class.forName
四、JVM运行时数据区
JVM运行时数据结构
1、方法区(元空间)
各个线程的共享存储区域,用于存储:
- 已被虚拟机加载的类信息
- 常量 final
- 静态变量 static
- 即时编译器编译后的代码
热点代码:代码段被反复调用次数超过阈值(10000)。基于采样 和 计数
会出现的异常:OutOfMemory。原因:常量池撑爆
CPU
- 操作栈
- 操作寄存器
2、栈和栈帧
栈帧
局部变量表(本地变量表)
原子性协议:32位的机器(一个slot可以存放一个32位以内的数据类型),long是在并发更新的时候不是线程安全的
long、double 分为两个连续的slot空间,高低位存储
在32位机器上会跳出循环,在64位机器上不会
操作数栈
动态连接
类似于 “多态,反射”,只有在运行时才能 把 符号引用 --> 直接引用
其他大多数情况,在解释时 就确定了 符号引用 --> 直接引用
方法返回连接
帧数据区
false,true
栈默认大小:-Xss108K;
栈的调用深度 由 栈帧的大小 和 栈的大小 共同决定;栈帧大小不是确定的,所以栈的深度也是动态的
3、程序计数器
每条机器指令都有自己的地址
CPU从指令寄存器中获取要执行的指令,获取完后,清空指令寄存器;
程序计数器中存放下一条要执行的指令地址,指令寄存器为空后,将自己指向的指令放入指令寄存器中,同时,自己指向下一条指令;
五、字节码执行引擎
函数如何调用
符号引用 –> 直接引用
重写,重载
方法调用:
1、静态分派
函数调用,对应的字节码指令:
0xb6 invokevirtual 调用实例方法
0xb7 invokespecial 调用超类构建方法, 实例初始化方法, 私有方法
0xb8 invokestatic 调用静态方法
0xb9 invokeinterface 调用接口方法
0xba invokedynamic 调用动态方法
在编译期确定,并且是根据静态类型来确定函数调用版本。可以反编译class文件,看到实际调用方法。
函数overload
2、动态分派
不是根据静态类型来确定,而是在运行时根据实际类型来确定函数的版本
函数override
方法表
虚拟机动态分配机制
虚方法表(vtable)存在方法区上
单分派与多分派
静态多分派(方法重载),动态单分派(方法重写)
3、字节码
操作码
iload
istore
iadd
isub
imul
idiv
操作数
六、垃圾回收
垃圾回收:方法区 和 堆区 空间不够用,OutOfMemoryError
加载类:空间不够用(类信息)
堆区:空间不够用(分配实例)
垃圾回收的要点知识
引用计数法
给对象添加一个引用计数器。每当有一个地方引用这个对象,这个计数器就加1;每当引用失效,这个计数器就减1;当计数器为0的时候就代表该对象不能再被使用;
可达性分析算法
GC Root对象
虚拟机栈(栈帧中的本地变量表)中引用的对象;
方法区中 类静态属性引用的对象;
方法区中常量引用的对象;
本地方法栈JNI(即一般说的Native方法)引用的对象;
GC关心的东西:这块数据是不是一个指针
对象的生命周期
在
Java
中,对象的生命周期包括以下几个阶段:
- 创建阶段(
Created
)- 应用阶段(
In Use
)- 不可见阶段(
Invisible
)- 不可达阶段(
Unreachable
)- 收集阶段(
Collected
)- 终结阶段(
Finalized
)- 对象空间重分配阶段(
De-allocated
)
四种引用
1)、强引用
类似 Object a = new Object();
只要强引用存在,垃圾收集器永远不会回收它
jvm 在内存不够时,会抛出OutOfMemory
使用完以后a = null;
才会被回收
强引用是使用最普遍的引用。如果一个对象具有强引用,那垃圾收器绝不会回收它。当内存空间不足,Java虚拟机宁愿抛出
OutOfM moryError
错误,使程序异常终止,也不会靠随意回收具有强引用 对象来解决内存不足的问题
2)、软引用SoftReference
软引用是用来描述一些还有用但并非必须的对象。对于软引用关联着的对象,在系统将要发生内存溢出异常之前,将会把这些对象列进回收范围进行第二次回收。如果这次回收还没有足够的内存,才会抛出内存溢出异常
1 | Object obj = new Object(); |
3)、弱引用 WeekReference
弱引用也是用来描述非必须对象的,他的强度比软引用更弱一些,被弱引用关联的对象,在垃圾回收时,如果这个对象只被弱引用关联(没有任何强引用关联他),那么这个对象就会被回收。
4)、虚引用 PhanReference
一个对象是否有虚引用的存在,完全不会对其生存时间构成影响,也无法通过虚引用来获取一个对象的实例。为一个对象设置虚引用关联的唯一目的就是能在这个对象被收集器回收时收到一个系统通知。虚引用和弱引用对关联对象的回收都不会产生影响,如果只有虚引用活着弱引用关联着对象,那么这个对象就会被回收。它们的不同之处在于弱引用的
get
方法,虚引用的get
方法始终返回null
,弱引用可以使用ReferenceQueue
,虚引用必须配合ReferenceQueue
使用。
jdk
中直接内存的回收就用到虚引用,由于jvm
自动内存管理的范围是堆内存,而直接内存是在堆内存之外(其实是内存映射文件,自行去理解虚拟内存空间的相关概念),所以直接内存的分配和回收都是有Unsafe
类去操作,java
在申请一块直接内存之后,会在堆内存分配一个对象保存这个堆外内存的引用,这个对象被垃圾收集器管理,一旦这个对象被回收,相应的用户线程会收到通知并对直接内存进行清理工作。
GC最重要的几件事
1、哪些内存要回收?
GCRoot 里面没有引用的对象
代码不断变化,如果让GC Root不变 => stop the world
2、什么时候回收
安全点
3、如何回收
oopMap 存 对象位置,类型
垃圾回收算法
System.gc();
手动执行垃圾回收。程序告诉GC收集器执行回收动作,不过到底什么时候执行GC
属于FullGC
有哪些缺陷? STW(stop the world)
1 |
标记清除算法
分为两个阶段 “标记” 和 “清除”,首先标记出所有需要回收的对象,在标记完成后统一回收所有被标记的对象;
两个不足:
一个是效率问题,标记和清除效率都不高
另一个是空间问题,标记清除之后产生大量不连续的内存碎片
标记整理算法
标记阶段仍然和标记清除算法一样,
复制算法
效率最高
MinorGC:Eden-> S0/S1
MajorGC:Old区 内部收集算法
FullGC:Eden + S0/S1 -> Old,永久代回收,都是FullGC
现在商业虚拟机的垃圾收集都采用“分代收集算法”。不同分区使用不同的收集算法,达到最高的使用几率
安全点Safepoint
safeRegion
oopMap
JDK垃圾回收器
-XX:+Use[Name]]GC
Serial收集器
开启参数: -XX:+UseSerialGC
复制算法
单线程的收集器,它的“单线程”的意义并不仅仅说明它会使用一个CPU或者一条收集线程去完成垃圾收集工作,最重要的是在它进行垃圾收集时,必须暂停其他所有的工作线程,直到它收集结束
SerialOld收集器
复制算法
ParNew收集器
开启参数:-XX:UseParNewGC
复制算法
Serial收集器的多线程版本,除了使用多条线程进行垃圾收集之外,其他行为包括Serial收集器可用的所有控制参数,收集算法,STW,对象分配规则,回收策略 与 Serial收集器完全一样
Parallel Scavenge收集器
开启参数:-XX:UseParallelGC
3个参数选一个设置即可
Parallel Old收集器
Concurrent Mark Sweep收集器
开启参数:-XX:ConcMarkSweepGC
标记清除算法
优点:减小停顿时间,提高用户体验
缺陷:
- 对CPU特别敏感,cpu越少对程序影响越大。CMS默认的回收线程数 = (cpu核心数量 + 3)/4,进一法
- CMS 和 浮动垃圾,标记和清理阶段,用户线程仍在运行
-XX:CMSInitatingOccupancyFraction=68
,当老年代达到68%时,判断(100-68)的空间是否够用户线程使用:如果够则触发CMS垃圾回收;如果剩余内存不够用户线程运行,则会抛出Concurrent Mode Failure
错误,触发SerialOld回收(停顿时间过长) - 碎片整理 — FullGC
-XX:UseCMSCompactAtFullCollection
整理
碎片整理过程没法并发,通过SerialOld进行
G1收集器
Humongous:超大对象
G1最大可使用32G
整体上看是 标记整理算法,局部是复制算法
MaxGCPauseMillis 最大GC停顿时间
RememberSet
开启参数:-XX:+UseG1GC
垃圾回收机制
空间担保
FullGC的触发条件
七、jvm性能调优
jps
1 | jps -mlv |
jstat
1 | jstat -class pid x y |
打印GC失败原因
1 | jstat -gccause pid x y |
jinfo
1 | jinfo -flag pid |
jvm标准参数
java -help, 不会随着jdk版本变化
jvm非标准参数
java -X
java -XX
jmap
jmap -histo:live pid
jmap pid > pid.hprof
jstack
jstack pid
线程状态
基础线程
Finalizer线程 finaliner() 放在一个FQueue里,有一个守护线程负责清理FQueue中的
Monitor Ctrl-Break 监听Ctrl+C 中断任务
Attach Listener 接收外部命令,执行该命令
Signal Dispatcher 分发外部命令
Reference Handler 处理引用线程(软引用/弱引用/虚引用)的垃圾回收问题
实现调优
调优的原则
- 合理编写代码
文件句柄
网络 - 合理利用硬件资源
- 合理的进行调优
终极规则:降低FGC的频次,减少GC的时间
大对象导出
- 百万级
- 7~8W记录
- result
- for() => 本地没有测试
慢sql
- tomcat 每隔三个月必然爆一次,tomcat有非常多的hprof, -XX:HeapDumpOnOutOfMemoryError
- 32G
- 重启工程师
- 数据库设计有问题
- 一张表:上千万,更多, -> 分库
分表 冷热数据 很多冗余(空间换时间)
- 日志:亿
RocketMQ 消息发送 Producer
1、漫谈消息发送
消息发送方式:同步(sync)、异步(async)、单向(oneway)
- 同步(sync):发送消息时,同步等待 broker返回发送结果;
- 异步(async):发送消息后,通过回调函数 获取发送结果,发送过程不阻塞;
- 单向(oneway):只管发,不在乎是否发送成功;
3个疑问?
- 消息队列(broker)如何进行负载? 主动从NameServer拉取,更新缓存
- 消息发送如何实现高可用? 重试 + 规避
- 批量消息发送如何实现一致性? 压缩+校验码
重点骨架
1、消息生产者启动流程
MQClientInstance、消息生产者之间的关系
2、消息队列负载机制
生产者在发送消息时,如果本地路由表中未缓存topic的路由信息,向NameServer发送获取路由信息请求,更新本地路由表。并且消息生产者每隔 30s 从NameServer 更新路由表。
3、消息发送异常机制
消息发送高可用两个手段:重试 + Broker规避。
Borker规避:在一次消息发送出错时,在某一段时间内,消息生产者不会选择该Broker上的消息队列,提高消息发送成功率
4、批量消息发送
RocketMQ 支持将同一主题下的多条消息一次性发送到Broker
2、RocketMQ消息结构
消息:org.apache.rocketmq.common.message.Message
类设计如下:
1 | package org.apache.rocketmq.common.message; |
消息flag
org.apache.rocketmq.common.sysflag.MessageSysFlag
1 | package org.apache.rocketmq.common.sysflag; |
扩展属性properties
tag
:消息TAG,用于消息过滤
keys
:Message 索引键,多个用空格隔开。RocketMQ可以根据这些key快速检索到消息。
waitStoreMsgOK
:消息发送时是否等消息存储完成再返回。
delayTimeLevel
:消息延迟级别,用于定时消息 或 消息重试。
3、Producer启动流程
消息生产者代码都在 client
模块中,对于 RocketMQ
来说 它就是客户端。
1、初识DefaultMQProducer
DefaultMQProducer
—> 默认 消息生产者实现类,实现 MQAdmin
接口。
1 | public interface MQProducer extends MQAdmin {} |
主要方法
1)创建topic
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
- key: accesskey 目前没有实际作用,可以与 newTopic 相同;
- newTopic: topic name 主题名称;
- queueNum: topic’s queue number 队列数量;
- topicSysFlag: topic system flag 主题系统标签,默认为0;
2)根据时间戳从队列中查找其偏移量
long searchOffset(MessageQueue mq, long timestamp)
3)查找该消息队列中的 最大/最小 物理偏移量
long maxOffset(MessageQueue mq)
long minOffset(MessageQueue mq)
4)根据偏移量查找消息
MessageExt viewMessage(String offsetMsgId)
5)根据条件查找消息
QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- topic:主题
- key:消息索引字段
- maxNum:最大取出条数;
- begin:开始时间错
- end:结束时间戳
6)同步发送消息。若不指定messageQueue,由负载算法决定发到哪个队列
SendResult send(Message msg[, MessageQueue mq[, long timeout]])
同步批量发送
SendResult send(Collection<Message> msgs[, MessageQueue messageQueue[, long timeout]])
7)异步发送消息
void send(Message msg, MessageQueue mq, SendCallback sendCallback[, long timeout])
8)单向发送消息
void sendOneway(Message msg, MessageQueue mq)
void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
生产者核心属性
1 | public class DefaultMQProducer extends ClientConfig implements MQProducer { |
producerGroup
:rocketmq聚合角色相同的Producer为一组,在broker回查事务消息的状态时,随机选择该组中的任何一个生产者发起事务回查请求。对事务消息特别重要,对非事务消息,只要同一进程唯一就可以了;createTopicKey
:默认Topic Key;
2、Producer启动流程
DefaultMQProducer#start()
--> DefaultMQProducerImpl#start(final boolean startFactory)
1 | public void start(final boolean startFactory) throws MQClientException { |
- step1:检查 producerGroup 是否合法,并改变生产者的instanceName为pid;
- step2: 创建 MQClientInstance 实例。
整个JVM中只存在一个 MQClientManager实例,维护一个 MQClientInstance 缓存表 factoryTable。也就是同一个 clientId 只会创建一个 MQClientInstance clientId 格式:客户端IP@instanceName(@unitName)
- step3:向 MQClientInstance 注册,将当前生产者 加入到 MQClientInstance 的管理中。方便后续调用网络请求、进行心跳检测;
- step4;启动 MQClientInstance;
MQClientInstance:封装了 网络处理API,是Producer、Consumer 与 NameServer、Broker 打交道的网络通道。
4、消息发送流程
消息发送步骤:
- 验证消息;
- 查找路由;
- 消息发送(包括异常处理);
同步发送消息方法 调用链如下:
org.apache.rocketmq.client.producer.DefaultMQProducer
1 | public SendResult send(Message msg) { |
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
1 | public SendResult send(Message msg) { |
下面主要分析 sendDefaultImpl(...)
方法的实现
1、消息长度验证
发送之前校验:
- 生产者处于运行状态
- 消息是否符合相应的规范(topic、消息体 不能为空,消息长度≠0,不超过允许的最大长度4M(1024*1024*4))
2、查找topic路由信息
消息发送之前,首先需要获取 topic的路由信息,然后才能知道要发往哪个 Broker 节点。
1 | /** |
关键类【Topic路由信息】: TopicPublishInfo
:
1 | public class TopicPublishInfo { |
1 | public class TopicRouteData extends RemotingSerializable { |
3、选择消息队列
返回一个
MessageQueue
用于发送消息,同时引入规避机制【规避上一次发送失败的Broker,提高发送效率】两种选择方式:
1、默认机制,不启用故障延迟机制
sendLatencyFaultEnable = false
2、启用故障延时机制
sendLatencyFaultEnable = true
API 调用链
DefaultMQProducerImpl#selectOneMessageQueue
--> MQFaultStrategy#selectOneMessageQueue
1 | /** |
1、默认机制
sendLatencyFaultEnable = false
不启用故障延迟机制
选择队列核心API: TopicPublishInfo#selectOneMessageQueue
1 | /** |
该机制的缺陷:虽然可以规避上一次发送失败的Broker,但每次查询都要走这个流程,太耗费性能;
为什么Broker不可用之后,路由信息中还包含该Broker的路由信息呢?
- 首先,
NameServer
检测 Broker 是有延迟的,最短的一次心跳为 10s;- 其次,
NameServer
检测到 Broker 故障后,不会马上推给Producer,而是 Producer 每隔 30s 拉取一次路由信息;- 所以, Producer 最快感知道Broker故障信息也需要 30s;
下面引入一种机制,如果一次消息发送失败之后,可以将该 Broker 暂时排除在消息队列的选择范围外,过一段时间再加入 可选择范围内。
2、Broker故障延迟机制
1)
sendLatencyFaultEnable = true
启用故障延迟机制2)【故障Broker存储】实现类
LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
选择队列核心API:MQFaultStrategy#selectOneMessageQueue
1 | /** |
重点【latencyFaultTolerance
】
故障延迟机制 存储、更新、判断:LatencyFaultToleranceImpl
4、消息发送
消息发送核心API入口:DefaultMQProducerImpl#sendKernelImpl
1 | /** |
发送步骤:
1、根据MessageQueue获取Broker网络地址。
如果mQClientFactory.brokerAddrTable
还没有缓存该地址,则从NameServer
主动更新一下topic路由信息。
如果还是找不到Broker信息,则抛出异常 MQClientException。
2、为消息分配全局唯一ID。
如果 消息提超过4K
,会对 消息体采用 zip 压缩,并将消息标记为 MessageSysFlag.COMPRESSED_FLAG
。
如果消息是事务Prepare消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE
。
3、如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。通过DefaultMQProducerImpl#registerSendMessageHook
注册钩子处理类
4、构建消息发送请求包。主要包含如下重要信息:
生产者组、主题名称、默认创建主题Key、该主题在单个Broker默认队列数、队列ID、
消息系统标记(MessageSysFlag)、消息发送时间、消息标记flag、消息扩展性、重试次数、是否批量消息
5、根据消息发送方式:同步、异步、单向 方式进行网络传输
发送消息核心函数 MQClientAPIImpl#sendMessage
:
真正网络发送函数:
1 | // 单向发送 |
5、批量消息发送
DefaultMQProducer.java#send(Collection<Message> msgs)
在 Producer,调用 batch方法,将一批消息封装成 MessageBatch
对象。
MessageBatch
继承自Message 对象,MessageBatch
内部持有 List<Message> messages
。
批量消息封装格式
批量封装:MessageDecoder#encodeMessages(List<Message> messages)
#####
单条封装MessageDecoder#encodeMessage(Message message)
shell解析nginx日志
1 | grep '2019:14' nginx-access.log* | grep -v 'DNSPod' | awk '{print $1, $4, $9, $(NF-4), $NF, $(NF-2) }' > ~/trade20190210.log |
2、RocketMQ 路由中心 NameServer
NameServer 是 整个 RocketMQ 的 ”大脑“;
路由管理、服务注册、服务发现 机制;
本章重点内容:
- NameServer 整体架构设计;
- NameServer 动态路由 发现与剔除机制;
1、NameServer 架构设计
RocketMQ 逻辑部署图:
Broker 启动时,向 所有NameServer 注册;
Producer在 发送消息 之前,先从 NameServer 获取 Broker 服务器地址列表,然后 根据负载均衡 算法 从 列表中选择一台服务器进行消息发送;
NameServer 与 每台 Broker 保持 长链接,并且间隔 30s 检测 Broker是否存活。如果Broker宕机,则从 路由注册表中将其移除;
NameServer 本身的 高可用 通过 部署多台NameServer 服务器来实现;但彼此之间不通信;
2、NameServer 启动流程
NamaServer 启动类:org.apache.rocketmq.namesrv.NamesrvStartup
1 | public static NamesrvController main0(String[] args) { |
2.1、初始化 NamesrvController
NamesrvController controller = createNamesrvController(args);
创建:
org.apache.rocketmq.namesrv.NamesrvStartup.createNamesrvController
1 | public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { |
代码功能:
- 需要先加载 命令行中-c configFile 指定的配置文件 和 命令行中的其他参数 初始化 NamesrvConfig 和 NettyServerConfig;
- 使用 NamesrvConfig 和 NettyServerConfig 初始化 NamesrvController;
- 然后将 所有参数 持久化在 configFile 中;
org.apache.rocketmq.namesrv.NamesrvController
1 | public NamesrvController( |
初始化:
- kvConfigManager:使用 configTable(HashMap)存储 所有配置;
- routeInfoManager:存储路由信息;
- brokerHousekeepingService:
2.2、启动 NamesrvController
start(controller);
1 | org.apache.rocketmq.namesrv.NamesrvController; |
初始化 controller
boolean initResult = controller.initialize();
- 从 configFile 中 加在配置 放在 kvConfigManager 的 configTable(HashMap) 中;
- 初始化 NettyRemotingServer;
- 创建 业务线程池 remotingExecutor;
- 把 remotingExecutor 作为 remotingServer 的 默认处理线程池;
- 每 10s 扫描一次不活跃的 Broker,从 brokerLiveTable 中移除;
- 每 10s 打印一次 KV配置;
- 创建一个文件监控服务,当 sslContext 变更时,重新加载 sslContext;
1 | org.apache.rocketmq.namesrv.NamesrvController; |
controller 的 关闭和启动
controller.shutdown();
controller.start();
1 | public void start() throws Exception { |
2.3、NameServer 路由注册、故障剔除
NameServer的主要作用: 为Producer 和 Consumer 提供 Topic的 路由信息;
NameServer功能:存储路由信息 、管理Broker节点;
1)路由元信息
1 | HashMap<String/* topic */, List<QueueData>> topicQueueTable; |
- topicQueueTable:Topic消息队列 路由信息,Producer 根据 路由表 进行负载均衡;
- brokerAddrTable:Broker基础信息,包含 brokerName、所属集群名称、主备 broker地址;
- clusterAddrTable:Broker集群信息,存储集群中 所有 Broker 名称;
- brokerLiveTable:Broker状态信息。NameServer每次收到心跳包时会替换该信息;
- filterServerTable:Broker上的 FilterServer 列表,用于类模式消息过滤;
1 | package org.apache.rocketmq.common.protocol.route; |
2)路由注册
机制:通过Broker 与 NameServer 心跳功能实现的。
- Broker 启动时 向集群中 所有NameServer 发送心跳语句;
- Broker 每隔 30s 向集群中 所有NameServer 发送心跳语句;
- NameServer 收到 Broker 心跳包时,会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdateTimestamp;
- NameServer 每隔 10s 扫描 brokerLiveTable,如果连续 120s 没有收到 心跳包,则 NameServer将移除 该Broker 的路由信息,同时关闭 socket 连接;
①、Broker发送心跳包核心代码:
org.apache.rocketmq.broker.BrokerController.start()
1 | public void start() throws Exception { |
遍历 NameServer 列表,Broker 消息服务器 依次向 NameServer 发送心跳包:
org.apache.rocketmq.broker.out.BrokerOuterAPI
1 | /** |
②、NameServer处理心跳包
处理机制:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor.processRequest() 网络处理 解析请求类型,如果请求类型为 RequestCode.REGISTER_BROKER,则请求最终转发到 RouteInfoManager.registerBroker();
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
1 | public RemotingCommand processRequest(ChannelHandlerContext ctx, |
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager.registerBroker()
- 路由注册需要加写锁(ReentrantReadWriteLock.writeLock()), 防止 并发 修改 RouteInfoManager 的路由表;首先判断 Broker 集群 是否存在,如果不存在,则 创建。然后将 broker名 加入到对应集群中;
- 维护 BrokerData信息;
- 如果Broker是Master,并且 broker中的topic 配置信息发生变化 或 首次注册,则创建或更新 Topic的路由元数据,填充 topicQueueTable;
- 更新 BrokerLiveInfo,存活的 Broker信息表;
- 注册Broker的过滤器 Server地址列表,一个Broker 上会关联 多个FilterSever消息过滤器;
- 如果 brokerName 不是Master,则 获取对应的 masterAddr 和 masterAddr 对应的 haServerAddr 作为返回值;
1 | /** |
3) 路由删除
心跳包:{BrokerId,Broker地址,Broker名称,Broker所属集群名称};
剔除失效Broker机制:NameServer 每隔 10s 扫描 brokerLiveTable 状态表,如果 BrokerLive的 lastUpdateTimestamp 的时间戳 距 当前时间超过 120s,则 认为 Broker失效;移除 该 Broker,关闭与Broker连接,并同时更新 topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable;
RocketMQ 有 两个触发点来 触发 路由删除:
- Namaserver 定时扫描brokerLiveTable,检测上次心跳包与当前系统时间差,如果时间间隔 > 120s,则移除该Broker信息;
- Broker 在正常被关闭的情况下,会执行 unregisterBroker 指令;
两种方法的触发方式的公共代码:
1 | org.apache.rocketmq.namesrv.NamesrvController; |
1 | org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager; |
4)路由发现
IntelliJ IDEA 调试 rocketmq 源码
1、启动 NameServer
Step_1:源码路径,使用Idea导入工程
在命令行执行
1 | mvn clean install |
Step_2:展开 namesrv 模块
右键 NamasrvStartup.java,点击 NamaStart.main() 运行,报错
Step_3:在 Debug Configuration 界面设置 环境变量
1 | ROCKETMQ_HOME=D:\java\rocketmq |
Step_4:在 D:\java\rocketmq 下创建 conf、logs、store 三个文件夹
Step_5:将 D:\java\srcode\rocketmq-master\distribution\conf 下的 broker.conf、logback_broker.xml、logback_namesrv.xml、logback_tools.xml 复制到 D:\java\rocketmq\conf 目录下
修改 broker.conf
1 | brokerClusterName = DefaultCluster |
可以修改 logback_namesrv.xml 和 logback_tools.xml 中的log 路径;
Step_6:运行
控制台出现
2、启动 Broker
3、发送消息示例
修改 Producer.java ,指定 Namesrv地址 producer.setNamesrvAddr(“127.0.0.1:9876”):
1 | import org.apache.rocketmq.client.exception.MQClientException; |
运行结果:
4、测试消费示例
修改 Consumer.java ,指定 Namesrv地址 producer.setNamesrvAddr(“127.0.0.1:9876”):
1 | package org.apache.rocketmq.example.quickstart; |
运行结果:
5、RocketMQ 源代码目录结构
工程结构如下图:
RoctketMQ 核心目录说明如下:
- broker:broker 模块(broker 启动进程);
- client:消息客户端,包含 消息生产者、消息消费者 相关类;
- common:公共包;
- dev:开发者信息(非源代码);
- distribution:部署实例文件夹(非源码);
- example:RocketMQ 示例代码;
- filter:消息过滤相关基础类;
- filtersrv:消息过滤服务器 实现相关类(Filter启动进程);
- logappender:日志实现相关类;
- namesrv:NameServer 实现相关类(NameServer 启动进程);
- openmessaging:消息开放标准,正在指定中;
- remoting:远程通信模块,基于Netty;
- srvutil:服务器工具类;
- store:消息存储实现 相关类;
- style:checkstyle 相关实现;
- test:测试 相关类;
- tools:工具类,监控命令 相关实现类;
6、RocketMQ 的设计理念和目标
6.1、设计理念
1、基于 主题(Topic) 发布/订阅 模式
2、核心功能:消息发送,消息存储(Broker),消息消费
3、NameServer: 做元数据管理,舍弃 ZooKeeper,NameServer集群之间互不通信,降低实现复杂度,对网络要求也降低,但性能却 极大提升;
4、IO存储机制:
- 文件组(N个大小固定的文件),引入内存映射机制;
- 所有消息顺序写 —— 极大提升 写性能;
- 引入 消费队列文件 和 索引文件 —— 兼顾 消息消费 和 查找
5、容忍设计缺陷:保证消息至少被消费一次,不保证只消费一次,由 消费者自己保证;
6.2、设计目标
1、架构模式:发布订阅模式,消息发送者,消息Broker,消息消费,路由发现;
2、顺序消息:保证消息 严格有序;
3、消息过滤:Broker过滤, 消费者过滤;
4、消息存储:消息堆积能力 和 消息存储性能;
- 内存映射机制,所有主题消息顺序写入同一个文件中;
- 引入文件过期机制和 文件存储空间报警机制;
5、消息高可用:同步刷盘、异步刷盘,异步复制,双写机制;
6、消息到达低延迟:长轮询模式 实现 准实时的消息推送;
7、确保消息必须被消费一次:无法做到 只被消费一次,会重复消费;
8、回溯消息:消费过的消息,支持按时间 重新消费(可精确到毫秒)向前或向后回溯;
9、消息堆积:文件组无限循环使用。提供了过期机制;
10、定时消息:特定延迟级别的 延迟队列;
11、消息重试机制:通过ack机制,未确认的消息 可 重新消费