NameServer 是 整个 RocketMQ 的 ”大脑“;
路由管理、服务注册、服务发现 机制;
本章重点内容:
- NameServer 整体架构设计;
- NameServer 动态路由 发现与剔除机制;
1、NameServer 架构设计
RocketMQ 逻辑部署图:
Broker 启动时,向 所有NameServer 注册;
Producer在 发送消息 之前,先从 NameServer 获取 Broker 服务器地址列表,然后 根据负载均衡 算法 从 列表中选择一台服务器进行消息发送;
NameServer 与 每台 Broker 保持 长链接,并且间隔 30s 检测 Broker是否存活。如果Broker宕机,则从 路由注册表中将其移除;
NameServer 本身的 高可用 通过 部署多台NameServer 服务器来实现;但彼此之间不通信;
2、NameServer 启动流程
NamaServer 启动类:org.apache.rocketmq.namesrv.NamesrvStartup
1 | public static NamesrvController main0(String[] args) { |
2.1、初始化 NamesrvController
NamesrvController controller = createNamesrvController(args);
创建:
org.apache.rocketmq.namesrv.NamesrvStartup.createNamesrvController
1 | public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { |
代码功能:
- 需要先加载 命令行中-c configFile 指定的配置文件 和 命令行中的其他参数 初始化 NamesrvConfig 和 NettyServerConfig;
- 使用 NamesrvConfig 和 NettyServerConfig 初始化 NamesrvController;
- 然后将 所有参数 持久化在 configFile 中;
org.apache.rocketmq.namesrv.NamesrvController
1 | public NamesrvController( |
初始化:
- kvConfigManager:使用 configTable(HashMap)存储 所有配置;
- routeInfoManager:存储路由信息;
- brokerHousekeepingService:
2.2、启动 NamesrvController
start(controller);
1 | org.apache.rocketmq.namesrv.NamesrvController; |
初始化 controller
boolean initResult = controller.initialize();
- 从 configFile 中 加在配置 放在 kvConfigManager 的 configTable(HashMap) 中;
- 初始化 NettyRemotingServer;
- 创建 业务线程池 remotingExecutor;
- 把 remotingExecutor 作为 remotingServer 的 默认处理线程池;
- 每 10s 扫描一次不活跃的 Broker,从 brokerLiveTable 中移除;
- 每 10s 打印一次 KV配置;
- 创建一个文件监控服务,当 sslContext 变更时,重新加载 sslContext;
1 | org.apache.rocketmq.namesrv.NamesrvController; |
controller 的 关闭和启动
controller.shutdown();
controller.start();
1 | public void start() throws Exception { |
2.3、NameServer 路由注册、故障剔除
NameServer的主要作用: 为Producer 和 Consumer 提供 Topic的 路由信息;
NameServer功能:存储路由信息 、管理Broker节点;
1)路由元信息
1 | HashMap<String/* topic */, List<QueueData>> topicQueueTable; |
- topicQueueTable:Topic消息队列 路由信息,Producer 根据 路由表 进行负载均衡;
- brokerAddrTable:Broker基础信息,包含 brokerName、所属集群名称、主备 broker地址;
- clusterAddrTable:Broker集群信息,存储集群中 所有 Broker 名称;
- brokerLiveTable:Broker状态信息。NameServer每次收到心跳包时会替换该信息;
- filterServerTable:Broker上的 FilterServer 列表,用于类模式消息过滤;
1 | package org.apache.rocketmq.common.protocol.route; |
2)路由注册
机制:通过Broker 与 NameServer 心跳功能实现的。
- Broker 启动时 向集群中 所有NameServer 发送心跳语句;
- Broker 每隔 30s 向集群中 所有NameServer 发送心跳语句;
- NameServer 收到 Broker 心跳包时,会更新 brokerLiveTable 缓存中 BrokerLiveInfo 的 lastUpdateTimestamp;
- NameServer 每隔 10s 扫描 brokerLiveTable,如果连续 120s 没有收到 心跳包,则 NameServer将移除 该Broker 的路由信息,同时关闭 socket 连接;
①、Broker发送心跳包核心代码:
org.apache.rocketmq.broker.BrokerController.start()
1 | public void start() throws Exception { |
遍历 NameServer 列表,Broker 消息服务器 依次向 NameServer 发送心跳包:
org.apache.rocketmq.broker.out.BrokerOuterAPI
1 | /** |
②、NameServer处理心跳包
处理机制:org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor.processRequest() 网络处理 解析请求类型,如果请求类型为 RequestCode.REGISTER_BROKER,则请求最终转发到 RouteInfoManager.registerBroker();
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
1 | public RemotingCommand processRequest(ChannelHandlerContext ctx, |
org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager.registerBroker()
- 路由注册需要加写锁(ReentrantReadWriteLock.writeLock()), 防止 并发 修改 RouteInfoManager 的路由表;首先判断 Broker 集群 是否存在,如果不存在,则 创建。然后将 broker名 加入到对应集群中;
- 维护 BrokerData信息;
- 如果Broker是Master,并且 broker中的topic 配置信息发生变化 或 首次注册,则创建或更新 Topic的路由元数据,填充 topicQueueTable;
- 更新 BrokerLiveInfo,存活的 Broker信息表;
- 注册Broker的过滤器 Server地址列表,一个Broker 上会关联 多个FilterSever消息过滤器;
- 如果 brokerName 不是Master,则 获取对应的 masterAddr 和 masterAddr 对应的 haServerAddr 作为返回值;
1 | /** |
3) 路由删除
心跳包:{BrokerId,Broker地址,Broker名称,Broker所属集群名称};
剔除失效Broker机制:NameServer 每隔 10s 扫描 brokerLiveTable 状态表,如果 BrokerLive的 lastUpdateTimestamp 的时间戳 距 当前时间超过 120s,则 认为 Broker失效;移除 该 Broker,关闭与Broker连接,并同时更新 topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable;
RocketMQ 有 两个触发点来 触发 路由删除:
- Namaserver 定时扫描brokerLiveTable,检测上次心跳包与当前系统时间差,如果时间间隔 > 120s,则移除该Broker信息;
- Broker 在正常被关闭的情况下,会执行 unregisterBroker 指令;
两种方法的触发方式的公共代码:
1 | org.apache.rocketmq.namesrv.NamesrvController; |
1 | org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager; |