1、启动 NameServer
Step_1:源码路径,使用Idea导入工程
在命令行执行
1 | mvn clean install |
Step_2:展开 namesrv 模块
右键 NamasrvStartup.java,点击 NamaStart.main() 运行,报错
Step_3:在 Debug Configuration 界面设置 环境变量
1 | ROCKETMQ_HOME=D:\java\rocketmq |
Step_4:在 D:\java\rocketmq 下创建 conf、logs、store 三个文件夹
Step_5:将 D:\java\srcode\rocketmq-master\distribution\conf 下的 broker.conf、logback_broker.xml、logback_namesrv.xml、logback_tools.xml 复制到 D:\java\rocketmq\conf 目录下
修改 broker.conf
1 | brokerClusterName = DefaultCluster |
可以修改 logback_namesrv.xml 和 logback_tools.xml 中的log 路径;
Step_6:运行
控制台出现
2、启动 Broker
3、发送消息示例
修改 Producer.java ,指定 Namesrv地址 producer.setNamesrvAddr(“127.0.0.1:9876”):
1 | import org.apache.rocketmq.client.exception.MQClientException; |
运行结果:
4、测试消费示例
修改 Consumer.java ,指定 Namesrv地址 producer.setNamesrvAddr(“127.0.0.1:9876”):
1 | package org.apache.rocketmq.example.quickstart; |
运行结果:
5、RocketMQ 源代码目录结构
工程结构如下图:
RoctketMQ 核心目录说明如下:
- broker:broker 模块(broker 启动进程);
- client:消息客户端,包含 消息生产者、消息消费者 相关类;
- common:公共包;
- dev:开发者信息(非源代码);
- distribution:部署实例文件夹(非源码);
- example:RocketMQ 示例代码;
- filter:消息过滤相关基础类;
- filtersrv:消息过滤服务器 实现相关类(Filter启动进程);
- logappender:日志实现相关类;
- namesrv:NameServer 实现相关类(NameServer 启动进程);
- openmessaging:消息开放标准,正在指定中;
- remoting:远程通信模块,基于Netty;
- srvutil:服务器工具类;
- store:消息存储实现 相关类;
- style:checkstyle 相关实现;
- test:测试 相关类;
- tools:工具类,监控命令 相关实现类;
6、RocketMQ 的设计理念和目标
6.1、设计理念
1、基于 主题(Topic) 发布/订阅 模式
2、核心功能:消息发送,消息存储(Broker),消息消费
3、NameServer: 做元数据管理,舍弃 ZooKeeper,NameServer集群之间互不通信,降低实现复杂度,对网络要求也降低,但性能却 极大提升;
4、IO存储机制:
- 文件组(N个大小固定的文件),引入内存映射机制;
- 所有消息顺序写 —— 极大提升 写性能;
- 引入 消费队列文件 和 索引文件 —— 兼顾 消息消费 和 查找
5、容忍设计缺陷:保证消息至少被消费一次,不保证只消费一次,由 消费者自己保证;
6.2、设计目标
1、架构模式:发布订阅模式,消息发送者,消息Broker,消息消费,路由发现;
2、顺序消息:保证消息 严格有序;
3、消息过滤:Broker过滤, 消费者过滤;
4、消息存储:消息堆积能力 和 消息存储性能;
- 内存映射机制,所有主题消息顺序写入同一个文件中;
- 引入文件过期机制和 文件存储空间报警机制;
5、消息高可用:同步刷盘、异步刷盘,异步复制,双写机制;
6、消息到达低延迟:长轮询模式 实现 准实时的消息推送;
7、确保消息必须被消费一次:无法做到 只被消费一次,会重复消费;
8、回溯消息:消费过的消息,支持按时间 重新消费(可精确到毫秒)向前或向后回溯;
9、消息堆积:文件组无限循环使用。提供了过期机制;
10、定时消息:特定延迟级别的 延迟队列;
11、消息重试机制:通过ack机制,未确认的消息 可 重新消费