RocketMQ 消息发送 Producer



1、漫谈消息发送

消息发送方式:同步(sync)、异步(async)、单向(oneway)

  • 同步(sync):发送消息时,同步等待 broker返回发送结果;
  • 异步(async):发送消息后,通过回调函数 获取发送结果,发送过程不阻塞;
  • 单向(oneway):只管发,不在乎是否发送成功;

3个疑问?

  1. 消息队列(broker)如何进行负载? 主动从NameServer拉取,更新缓存
  2. 消息发送如何实现高可用? 重试 + 规避
  3. 批量消息发送如何实现一致性? 压缩+校验码

重点骨架

1、消息生产者启动流程

MQClientInstance、消息生产者之间的关系

2、消息队列负载机制

生产者在发送消息时,如果本地路由表中未缓存topic的路由信息,向NameServer发送获取路由信息请求,更新本地路由表。并且消息生产者每隔 30s 从NameServer 更新路由表。

3、消息发送异常机制

消息发送高可用两个手段:重试 + Broker规避。

Borker规避:在一次消息发送出错时,在某一段时间内,消息生产者不会选择该Broker上的消息队列,提高消息发送成功率

4、批量消息发送

RocketMQ 支持将同一主题下的多条消息一次性发送到Broker


2、RocketMQ消息结构

消息:org.apache.rocketmq.common.message.Message

类设计如下:

1
2
3
4
5
6
7
8
9
10
11
package org.apache.rocketmq.common.message;

public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;

private String topic; // 消息所属主题topic
private int flag; // 消息flag
private Map<String, String> properties; // 扩展属性
private byte[] body; // 消息体
private String transactionId; // 事务ID
}

消息flag

org.apache.rocketmq.common.sysflag.MessageSysFlag

1
2
3
4
5
6
7
8
9
10
package org.apache.rocketmq.common.sysflag;

public class MessageSysFlag {
public final static int COMPRESSED_FLAG = 0x1;
public final static int MULTI_TAGS_FLAG = 0x1 << 1;
public final static int TRANSACTION_NOT_TYPE = 0;
public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
}

扩展属性properties

tag:消息TAG,用于消息过滤

keys:Message 索引键,多个用空格隔开。RocketMQ可以根据这些key快速检索到消息。

waitStoreMsgOK:消息发送时是否等消息存储完成再返回

delayTimeLevel:消息延迟级别,用于定时消息 或 消息重试


3、Producer启动流程

消息生产者代码都在 client 模块中,对于 RocketMQ 来说 它就是客户端。

1、初识DefaultMQProducer

DefaultMQProducer —> 默认 消息生产者实现类,实现 MQAdmin 接口。

1
2
3
4
public interface MQProducer extends MQAdmin {}

package org.apache.rocketmq.client.producer;
public class DefaultMQProducer extends ClientConfig implements MQProducer {}

主要方法

1)创建topic

void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)

  • key: accesskey 目前没有实际作用,可以与 newTopic 相同;
  • newTopic: topic name 主题名称;
  • queueNum: topic’s queue number 队列数量;
  • topicSysFlag: topic system flag 主题系统标签,默认为0;

2)根据时间戳从队列中查找其偏移量

long searchOffset(MessageQueue mq, long timestamp)

3)查找该消息队列中的 最大/最小 物理偏移量

long maxOffset(MessageQueue mq)

long minOffset(MessageQueue mq)

4)根据偏移量查找消息

MessageExt viewMessage(String offsetMsgId)

5)根据条件查找消息

QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)

  • topic:主题
  • key:消息索引字段
  • maxNum:最大取出条数;
  • begin:开始时间错
  • end:结束时间戳

6)同步发送消息。若不指定messageQueue,由负载算法决定发到哪个队列

SendResult send(Message msg[, MessageQueue mq[, long timeout]])

同步批量发送

SendResult send(Collection<Message> msgs[, MessageQueue messageQueue[, long timeout]])

7)异步发送消息

void send(Message msg, MessageQueue mq, SendCallback sendCallback[, long timeout])

8)单向发送消息

void sendOneway(Message msg, MessageQueue mq)

void sendOneway(Message msg, MessageQueueSelector selector, Object arg)


生产者核心属性

1
2
3
4
5
6
7
8
9
10
11
public class DefaultMQProducer extends ClientConfig implements MQProducer {
private String producerGroup;
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
private volatile int defaultTopicQueueNums = 4; // 一个topic默认队列数量
private int sendMsgTimeout = 3000; // 发送消息超时ms
private int compressMsgBodyOverHowmuch = 1024 * 4; // 消息体超过多大时 压缩
private int retryTimesWhenSendFailed = 2; // 同步发送失败重试次数
private int retryTimesWhenSendAsyncFailed = 2; // 异步发送失败重试次数
private boolean retryAnotherBrokerWhenNotStoreOK = false; // 存储失败时尝试其他Broker
private int maxMessageSize = 1024 * 1024 * 4; // 4M,最大消息大小
}
  • producerGroup:rocketmq聚合角色相同的Producer为一组,在broker回查事务消息的状态时,随机选择该组中的任何一个生产者发起事务回查请求。对事务消息特别重要,对非事务消息,只要同一进程唯一就可以了;

  • createTopicKey:默认Topic Key;


2、Producer启动流程

DefaultMQProducer#start()

--> DefaultMQProducerImpl#start(final boolean startFactory)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// step1:检查 producerGroup 是否合法,并改变生产者的instanceName为pid
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(
MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}

// step2: 创建 MQClientInstance 实例。
// 整个JVM中只存在一个 MQClientManager实例,维护一个 MQClientInstance 缓存表 factoryTable。也就是同一个 clientId 只会创建一个 MQClientInstance
// clientId 格式:客户端IP@instanceName(@unitName)
this.mQClientFactory = MQClientManager.getInstance()
.getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

// step3:向 MQClientInstance 注册,将当前生产者 加入到 MQClientInstance 的管理中。方便后续调用网络请求、进行心跳检测
boolean registerOK = mQClientFactory.registerProducer(
this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null);
}

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// step4;启动 MQClientInstance
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
default:
break;
}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
  • step1:检查 producerGroup 是否合法,并改变生产者的instanceName为pid;
  • step2: 创建 MQClientInstance 实例。
     整个JVM中只存在一个 MQClientManager实例,维护一个 MQClientInstance 缓存表 factoryTable。也就是同一个 clientId 只会创建一个 MQClientInstance
    clientId 格式:客户端IP@instanceName(@unitName)
  • step3:向 MQClientInstance 注册,将当前生产者 加入到 MQClientInstance 的管理中。方便后续调用网络请求、进行心跳检测;
  • step4;启动 MQClientInstance;

MQClientInstance:封装了 网络处理API,是Producer、Consumer 与 NameServer、Broker 打交道的网络通道。


4、消息发送流程

消息发送步骤:

  1. 验证消息;
  2. 查找路由;
  3. 消息发送(包括异常处理);

同步发送消息方法 调用链如下:

org.apache.rocketmq.client.producer.DefaultMQProducer

1
2
3
public SendResult send(Message msg) {
return this.defaultMQProducerImpl.send(msg);
}

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public SendResult send(Message msg) {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg, long timeout) {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
/**
* 发送消息方法 的 默认实现
* @param msg 消息
* @param communicationMode 同步、异步、单向
* @param sendCallback 异步发送的回调接口
* @param timeout 超时时间
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) {
...
// 查找 topic 对应的 路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
// 重试机制,最多重试 timesTotal 次
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 选择一个消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
// 实际发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}

return sendResult;
default:
break;
}
} catch (...) {
...
}
} // end-if
} // end-for
}
...
}

下面主要分析 sendDefaultImpl(...) 方法的实现

1、消息长度验证

发送之前校验:

  1. 生产者处于运行状态
  2. 消息是否符合相应的规范(topic、消息体 不能为空,消息长度≠0,不超过允许的最大长度4M(1024*1024*4))

2、查找topic路由信息

消息发送之前,首先需要获取 topic的路由信息,然后才能知道要发往哪个 Broker 节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 查找 topic 对应的路由信息
* @param topic 主题
* @return
*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 如果 producer 的 topicPublishInfoTable 中没有缓存 topic,则向 NameServer 查询topic的路由信息
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 从 NameServer 拉取 topic 的路由信息,并更新 topicRouteTable 缓存
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

// 如果producer中缓存了 topic 路由信息,则直接返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(
topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

关键类【Topic路由信息】: TopicPublishInfo:

1
2
3
4
5
6
7
8
9
public class TopicPublishInfo {
private boolean orderTopic = false; // 消息是否顺序消息
private boolean haveTopicRouterInfo = false; // 是否包含 topic 路由信息
// 该 topic 下的消息队列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
// 发送到哪个queue,[1, Integer.MAX_VALUE]
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData; // topic 的 路由信息
}
1
2
3
4
5
6
7
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf; //
private List<QueueData> queueDatas; // topic 队列元数据
private List<BrokerData> brokerDatas; // topic 分布的 broker 元数据
// broker 上 过滤服务器地址列表
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

3、选择消息队列

​ 返回一个 MessageQueue 用于发送消息,同时引入规避机制【规避上一次发送失败的Broker,提高发送效率】

两种选择方式:

1、默认机制,不启用故障延迟机制 sendLatencyFaultEnable = false

2、启用故障延时机制 sendLatencyFaultEnable = true

API 调用链

DefaultMQProducerImpl#selectOneMessageQueue

--> MQFaultStrategy#selectOneMessageQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 从路由信息中选择一个消息队列去发送消息
* @param tpInfo 路由信息
* @param lastBrokerName 上次发送消息出错的 broker,优先排除
* @return
*/
public MessageQueue selectOneMessageQueue(
final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
// 如果启用了故障延迟机制 (sendLatencyFaultEnable = true)
// 1、【在健康的Broker中】遍历 messageQueueList,找一个 lastBrokerName 之外的 消息队列
tpInfo.getMessageQueueList()
// 2、尝试从 【被 规避的Broker中(被关进小黑屋的Broker)】选择一个可用的Broker。如果没找到,返回 null
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 3、如果前两步都没找到,找一个托底的 MessageQueue
return tpInfo.selectOneMessageQueue();
}

// 如果没有启用故障延迟机制(sendLatencyFaultEnable = false)
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
1、默认机制

sendLatencyFaultEnable = false 不启用故障延迟机制

选择队列核心API: TopicPublishInfo#selectOneMessageQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 不启用故障延迟机制,获取消息队列
* @param lastBrokerName 上一次消息发送失败的 brokerName
* @return MessageQueue
*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 规避上次 发送失败的 MessageQueue 所在 Broker
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
// 上述机制找不到可用的MessageQueue时,找一个托底的 MessageQueue返回
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}

该机制的缺陷:虽然可以规避上一次发送失败的Broker,但每次查询都要走这个流程,太耗费性能;

为什么Broker不可用之后,路由信息中还包含该Broker的路由信息呢?

  • 首先, NameServer 检测 Broker 是有延迟的,最短的一次心跳为 10s;
  • 其次,NameServer 检测到 Broker 故障后,不会马上推给Producer,而是 Producer 每隔 30s 拉取一次路由信息;
  • 所以, Producer 最快感知道Broker故障信息也需要 30s;

​ 下面引入一种机制,如果一次消息发送失败之后,可以将该 Broker 暂时排除在消息队列的选择范围外,过一段时间再加入 可选择范围内。

2、Broker故障延迟机制

1)sendLatencyFaultEnable = true 启用故障延迟机制

2)【故障Broker存储】实现类

LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

选择队列核心API:MQFaultStrategy#selectOneMessageQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 从路由信息中选择一个消息队列去发送消息
* @param tpInfo 路由信息
* @param lastBrokerName 上次发送消息出错的 broker,优先排除
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
// 如果启用了故障延迟机制 (sendLatencyFaultEnable = true)
try {
// 1、【在健康的Broker中】遍历 messageQueueList,找一个 lastBrokerName 之外的 消息队列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 根据 入狱时间(startTimestamp) 判断是否小黑屋期满,期满 则 重新可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName
|| !mq.getBrokerName().equals(lastBrokerName)) // bug
return mq;
}
}

// 2、尝试从 【被 规避的Broker中(被关进小黑屋的Broker)】选择一个可用的Broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue()
.getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}

// 3、如果前两步都没找到,找一个托底的 MessageQueue
return tpInfo.selectOneMessageQueue();
}

// 如果没有启用故障延迟机制(sendLatencyFaultEnable = false)
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
重点【latencyFaultTolerance

故障延迟机制 存储、更新、判断:LatencyFaultToleranceImpl


4、消息发送

消息发送核心API入口:DefaultMQProducerImpl#sendKernelImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 消息发送API 核心入口
* @param msg 待发送消息
* @param mq MessageQueue【topic,brokerName,queueId】
* @param communicationMode 发送模式, 同步,异步,Oneway
* @param sendCallback 异步发送 回调函数
* @param topicPublishInfo 路由信息
* @param timeout 消息发送超时时间
* @return SendResult 发送结果
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
private SendResult sendKernelImpl(
final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout)

发送步骤:

1、根据MessageQueue获取Broker网络地址。
如果mQClientFactory.brokerAddrTable还没有缓存该地址,则从NameServer主动更新一下topic路由信息。
如果还是找不到Broker信息,则抛出异常 MQClientException。

2、为消息分配全局唯一ID。
如果 消息提超过4K,会对 消息体采用 zip 压缩,并将消息标记为 MessageSysFlag.COMPRESSED_FLAG
如果消息是事务Prepare消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE

3、如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。通过DefaultMQProducerImpl#registerSendMessageHook 注册钩子处理类

4、构建消息发送请求包。主要包含如下重要信息:
生产者组、主题名称、默认创建主题Key、该主题在单个Broker默认队列数、队列ID、
消息系统标记(MessageSysFlag)、消息发送时间、消息标记flag、消息扩展性、重试次数、是否批量消息

5、根据消息发送方式:同步、异步、单向 方式进行网络传输

发送消息核心函数 MQClientAPIImpl#sendMessage

真正网络发送函数:

1
2
3
4
5
6
// 单向发送
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
// 同步发送
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
// 异步发送
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback(){...});

5、批量消息发送

DefaultMQProducer.java#send(Collection<Message> msgs)

在 Producer,调用 batch方法,将一批消息封装成 MessageBatch 对象。

MessageBatch 继承自Message 对象,MessageBatch 内部持有 List<Message> messages

批量消息封装格式

批量封装:MessageDecoder#encodeMessages(List<Message> messages)

#####

单条封装MessageDecoder#encodeMessage(Message message)