1、漫谈消息发送
消息发送方式:同步(sync)、异步(async)、单向(oneway)
- 同步(sync):发送消息时,同步等待 broker返回发送结果;
- 异步(async):发送消息后,通过回调函数 获取发送结果,发送过程不阻塞;
- 单向(oneway):只管发,不在乎是否发送成功;
3个疑问?
- 消息队列(broker)如何进行负载? 主动从NameServer拉取,更新缓存
- 消息发送如何实现高可用? 重试 + 规避
- 批量消息发送如何实现一致性? 压缩+校验码
重点骨架
1、消息生产者启动流程
MQClientInstance、消息生产者之间的关系
2、消息队列负载机制
生产者在发送消息时,如果本地路由表中未缓存topic的路由信息,向NameServer发送获取路由信息请求,更新本地路由表。并且消息生产者每隔 30s 从NameServer 更新路由表。
3、消息发送异常机制
消息发送高可用两个手段:重试 + Broker规避。
Borker规避:在一次消息发送出错时,在某一段时间内,消息生产者不会选择该Broker上的消息队列,提高消息发送成功率
4、批量消息发送
RocketMQ 支持将同一主题下的多条消息一次性发送到Broker
2、RocketMQ消息结构
消息:org.apache.rocketmq.common.message.Message
类设计如下:
1 | package org.apache.rocketmq.common.message; |
消息flag
org.apache.rocketmq.common.sysflag.MessageSysFlag
1 | package org.apache.rocketmq.common.sysflag; |
扩展属性properties
tag
:消息TAG,用于消息过滤
keys
:Message 索引键,多个用空格隔开。RocketMQ可以根据这些key快速检索到消息。
waitStoreMsgOK
:消息发送时是否等消息存储完成再返回。
delayTimeLevel
:消息延迟级别,用于定时消息 或 消息重试。
3、Producer启动流程
消息生产者代码都在 client
模块中,对于 RocketMQ
来说 它就是客户端。
1、初识DefaultMQProducer
DefaultMQProducer
—> 默认 消息生产者实现类,实现 MQAdmin
接口。
1 | public interface MQProducer extends MQAdmin {} |
主要方法
1)创建topic
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
- key: accesskey 目前没有实际作用,可以与 newTopic 相同;
- newTopic: topic name 主题名称;
- queueNum: topic’s queue number 队列数量;
- topicSysFlag: topic system flag 主题系统标签,默认为0;
2)根据时间戳从队列中查找其偏移量
long searchOffset(MessageQueue mq, long timestamp)
3)查找该消息队列中的 最大/最小 物理偏移量
long maxOffset(MessageQueue mq)
long minOffset(MessageQueue mq)
4)根据偏移量查找消息
MessageExt viewMessage(String offsetMsgId)
5)根据条件查找消息
QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- topic:主题
- key:消息索引字段
- maxNum:最大取出条数;
- begin:开始时间错
- end:结束时间戳
6)同步发送消息。若不指定messageQueue,由负载算法决定发到哪个队列
SendResult send(Message msg[, MessageQueue mq[, long timeout]])
同步批量发送
SendResult send(Collection<Message> msgs[, MessageQueue messageQueue[, long timeout]])
7)异步发送消息
void send(Message msg, MessageQueue mq, SendCallback sendCallback[, long timeout])
8)单向发送消息
void sendOneway(Message msg, MessageQueue mq)
void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
生产者核心属性
1 | public class DefaultMQProducer extends ClientConfig implements MQProducer { |
producerGroup
:rocketmq聚合角色相同的Producer为一组,在broker回查事务消息的状态时,随机选择该组中的任何一个生产者发起事务回查请求。对事务消息特别重要,对非事务消息,只要同一进程唯一就可以了;createTopicKey
:默认Topic Key;
2、Producer启动流程
DefaultMQProducer#start()
--> DefaultMQProducerImpl#start(final boolean startFactory)
1 | public void start(final boolean startFactory) throws MQClientException { |
- step1:检查 producerGroup 是否合法,并改变生产者的instanceName为pid;
- step2: 创建 MQClientInstance 实例。
整个JVM中只存在一个 MQClientManager实例,维护一个 MQClientInstance 缓存表 factoryTable。也就是同一个 clientId 只会创建一个 MQClientInstance clientId 格式:客户端IP@instanceName(@unitName)
- step3:向 MQClientInstance 注册,将当前生产者 加入到 MQClientInstance 的管理中。方便后续调用网络请求、进行心跳检测;
- step4;启动 MQClientInstance;
MQClientInstance:封装了 网络处理API,是Producer、Consumer 与 NameServer、Broker 打交道的网络通道。
4、消息发送流程
消息发送步骤:
- 验证消息;
- 查找路由;
- 消息发送(包括异常处理);
同步发送消息方法 调用链如下:
org.apache.rocketmq.client.producer.DefaultMQProducer
1 | public SendResult send(Message msg) { |
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl
1 | public SendResult send(Message msg) { |
下面主要分析 sendDefaultImpl(...)
方法的实现
1、消息长度验证
发送之前校验:
- 生产者处于运行状态
- 消息是否符合相应的规范(topic、消息体 不能为空,消息长度≠0,不超过允许的最大长度4M(1024*1024*4))
2、查找topic路由信息
消息发送之前,首先需要获取 topic的路由信息,然后才能知道要发往哪个 Broker 节点。
1 | /** |
关键类【Topic路由信息】: TopicPublishInfo
:
1 | public class TopicPublishInfo { |
1 | public class TopicRouteData extends RemotingSerializable { |
3、选择消息队列
返回一个
MessageQueue
用于发送消息,同时引入规避机制【规避上一次发送失败的Broker,提高发送效率】两种选择方式:
1、默认机制,不启用故障延迟机制
sendLatencyFaultEnable = false
2、启用故障延时机制
sendLatencyFaultEnable = true
API 调用链
DefaultMQProducerImpl#selectOneMessageQueue
--> MQFaultStrategy#selectOneMessageQueue
1 | /** |
1、默认机制
sendLatencyFaultEnable = false
不启用故障延迟机制
选择队列核心API: TopicPublishInfo#selectOneMessageQueue
1 | /** |
该机制的缺陷:虽然可以规避上一次发送失败的Broker,但每次查询都要走这个流程,太耗费性能;
为什么Broker不可用之后,路由信息中还包含该Broker的路由信息呢?
- 首先,
NameServer
检测 Broker 是有延迟的,最短的一次心跳为 10s;- 其次,
NameServer
检测到 Broker 故障后,不会马上推给Producer,而是 Producer 每隔 30s 拉取一次路由信息;- 所以, Producer 最快感知道Broker故障信息也需要 30s;
下面引入一种机制,如果一次消息发送失败之后,可以将该 Broker 暂时排除在消息队列的选择范围外,过一段时间再加入 可选择范围内。
2、Broker故障延迟机制
1)
sendLatencyFaultEnable = true
启用故障延迟机制2)【故障Broker存储】实现类
LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
选择队列核心API:MQFaultStrategy#selectOneMessageQueue
1 | /** |
重点【latencyFaultTolerance
】
故障延迟机制 存储、更新、判断:LatencyFaultToleranceImpl
4、消息发送
消息发送核心API入口:DefaultMQProducerImpl#sendKernelImpl
1 | /** |
发送步骤:
1、根据MessageQueue获取Broker网络地址。
如果mQClientFactory.brokerAddrTable
还没有缓存该地址,则从NameServer
主动更新一下topic路由信息。
如果还是找不到Broker信息,则抛出异常 MQClientException。
2、为消息分配全局唯一ID。
如果 消息提超过4K
,会对 消息体采用 zip 压缩,并将消息标记为 MessageSysFlag.COMPRESSED_FLAG
。
如果消息是事务Prepare消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE
。
3、如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。通过DefaultMQProducerImpl#registerSendMessageHook
注册钩子处理类
4、构建消息发送请求包。主要包含如下重要信息:
生产者组、主题名称、默认创建主题Key、该主题在单个Broker默认队列数、队列ID、
消息系统标记(MessageSysFlag)、消息发送时间、消息标记flag、消息扩展性、重试次数、是否批量消息
5、根据消息发送方式:同步、异步、单向 方式进行网络传输
发送消息核心函数 MQClientAPIImpl#sendMessage
:
真正网络发送函数:
1 | // 单向发送 |
5、批量消息发送
DefaultMQProducer.java#send(Collection<Message> msgs)
在 Producer,调用 batch方法,将一批消息封装成 MessageBatch
对象。
MessageBatch
继承自Message 对象,MessageBatch
内部持有 List<Message> messages
。
批量消息封装格式
批量封装:MessageDecoder#encodeMessages(List<Message> messages)
#####
单条封装MessageDecoder#encodeMessage(Message message)