IntelliJ IDEA 调试 rocketmq 源码



1、启动 NameServer

Step_1:源码路径,使用Idea导入工程

在命令行执行

1
mvn clean install

Step_2:展开 namesrv 模块

右键 NamasrvStartup.java,点击 NamaStart.main() 运行,报错


Step_3:在 Debug Configuration 界面设置 环境变量

1
ROCKETMQ_HOME=D:\java\rocketmq


Step_4:在 D:\java\rocketmq 下创建 conf、logs、store 三个文件夹

Step_5:将 D:\java\srcode\rocketmq-master\distribution\conf 下的 broker.conf、logback_broker.xml、logback_namesrv.xml、logback_tools.xml 复制到 D:\java\rocketmq\conf 目录下

修改 broker.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

namesrvAddr = 127.0.0.1:9876
storePathRoot = D:\\java\\rocketmq\\store
storePathCommitLog = D:\\java\\rocketmq\\store\\commitlog
storePathConsumeQueue = D:\\java\\rocketmq\\store\\consumequeue
storePathIndex = D:\\java\\rocketmq\\store\\index
storeCheckpoint = D:\\java\\rocketmq\\store\\checkpoint
abortFile = D:\\java\\rocketmq\\store\\abort

可以修改 logback_namesrv.xml 和 logback_tools.xml 中的log 路径;


Step_6:运行

控制台出现


2、启动 Broker


3、发送消息示例

修改 Producer.java ,指定 Namesrv地址 producer.setNamesrvAddr(“127.0.0.1:9876”):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
public static void main(String[] args)
throws MQClientException, InterruptedException {

DefaultMQProducer producer =
new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest", "TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);

SendResult sendResult = producer.send(msg);

System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}

运行结果:


4、测试消费示例

修改 Consumer.java ,指定 Namesrv地址 producer.setNamesrvAddr(“127.0.0.1:9876”):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {

public static void main(String[] args)
throws MQClientException {
DefaultMQPushConsumer consumer =
new DefaultMQPushConsumer("please_rename_unique_group_name_4");

consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
System.out.printf("Consumer Started.%n");
}
}

运行结果:


5、RocketMQ 源代码目录结构

工程结构如下图:

RoctketMQ 核心目录说明如下

  1. broker:broker 模块(broker 启动进程);
  2. client:消息客户端,包含 消息生产者、消息消费者 相关类;
  3. common:公共包;
  4. dev:开发者信息(非源代码);
  5. distribution:部署实例文件夹(非源码);
  6. example:RocketMQ 示例代码;
  7. filter:消息过滤相关基础类;
  8. filtersrv:消息过滤服务器 实现相关类(Filter启动进程);
  9. logappender:日志实现相关类;
  10. namesrv:NameServer 实现相关类(NameServer 启动进程);
  11. openmessaging:消息开放标准,正在指定中;
  12. remoting:远程通信模块,基于Netty;
  13. srvutil:服务器工具类;
  14. store:消息存储实现 相关类;
  15. style:checkstyle 相关实现;
  16. test:测试 相关类;
  17. tools:工具类,监控命令 相关实现类;

6、RocketMQ 的设计理念和目标

6.1、设计理念

1、基于 主题(Topic) 发布/订阅 模式

2、核心功能:消息发送,消息存储(Broker),消息消费

3、NameServer: 做元数据管理,舍弃 ZooKeeper,NameServer集群之间互不通信,降低实现复杂度,对网络要求也降低,但性能却 极大提升;

4、IO存储机制:

  • 文件组(N个大小固定的文件),引入内存映射机制;
  • 所有消息顺序写 —— 极大提升 写性能;
  • 引入 消费队列文件 和 索引文件 —— 兼顾 消息消费 和 查找

5、容忍设计缺陷:保证消息至少被消费一次,不保证只消费一次,由 消费者自己保证;


6.2、设计目标

1、架构模式:发布订阅模式,消息发送者,消息Broker,消息消费,路由发现;

2、顺序消息:保证消息 严格有序;

3、消息过滤:Broker过滤, 消费者过滤;

4、消息存储:消息堆积能力 和 消息存储性能;

  • 内存映射机制,所有主题消息顺序写入同一个文件中;
  • 引入文件过期机制和 文件存储空间报警机制;

5、消息高可用:同步刷盘、异步刷盘,异步复制,双写机制;

6、消息到达低延迟:长轮询模式 实现 准实时的消息推送;

7、确保消息必须被消费一次:无法做到 只被消费一次,会重复消费;

8、回溯消息:消费过的消息,支持按时间 重新消费(可精确到毫秒)向前或向后回溯;

9、消息堆积:文件组无限循环使用。提供了过期机制;

10、定时消息:特定延迟级别的 延迟队列;

11、消息重试机制:通过ack机制,未确认的消息 可 重新消费