rabbitmq-设计和功能



1、什么是消息中间件

消息队列中间件 (Message Queue Middleware,简称为 MQ) 是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。提供了以松散藕合的灵活方式集成应用程序的一种机制。它们提供了基于存储和转发的应用程序之间的异步数据发送,即应用程序彼此不直接通信,而是与作为中介的消息中间件通信。消息中间件提供了有保证的消息发送,应用程序开发人员无须了解远程过程调用 ( RPC) 和网络通信协议的细节。


2、消息中间件的作用

  • 异步(解决不必要的阻塞):消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理 。
  • 解耦(降低模块间的耦合关系):消息中间件在处理过程中间插入了一个隐含的、基于数据的接口层。
  • 削峰(峰值任务的平滑处理):在访问量剧增的情况下,应用仍然需要继续发挥作用。
  • 冗余(存储、补偿机制):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。
  • 顺序保证:数据的处理顺序

3、AMQP协议

AMQP协议是一套开放标准,支持不同语言的不同产品。

AMQP组件

  • 生产者:消息的创建者,将消息发送到消息中间件

  • 消费者:连接到消息中间件上,订阅在队列上,进行消息的消费。

  • 消息:包括有效载荷与标签。有效载荷:要传输的数据;标签:描述有效载荷的属性;RabbitMQ通过标签决定谁获得该消息,消费者只能得到有效载荷。

  • 信道:可理解为一个虚拟的连接,建立在真实的TCP/IP连接之上。所有AMQP上的消息都通过信道传输,TCP/IP连接的建立和释放 对服务器有很大的消耗、昂贵的资源。信道的创建没有数量限制,保护资源的利用。

  • 交换器、队列、绑定、路由键:队列通过路由键(routing key)绑定到交换器,生产者把消息发送到交换器,交换器根据绑定的路由键把消息路由到特定的队列中,再由订阅该队列的消费者进行消息的消费。

image-20200904170935073

img


5、客户端开发

5.1、连接rabbitmq

connection

channel

1
2
3
$connection = new AMQPStreamConnection(Yii::$app->params['AMQP']['host'],Yii::$app->params['AMQP']['port'], Yii::$app->params['AMQP']['user'],Yii::$app->params['AMQP']['password']);

$channel = $connection->channel();

5.2、声明exchange和queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

public function exchange_declare(
$exchange, // string, 交换器名
$type, // string, 例如 'direct', 'fanout', 'topic'
$passive = false, // 如果已经存在,是否报错
$durable = false, // 是否持久化
$auto_delete = true, // 所有与该交换器绑定的队列都解绑以后,自动删除
$internal = false, // 是否内置交换器,客户端消息无法发送到内置交换器
$nowait = false, // 是否不需要broker返回结果
$arguments = null, // 参数,如死信队列,ttl等
$ticket = null
) {}
public function queue_declare(
$queue = '',
$passive = false,
$durable = false,
$exclusive = false, // 是否排他,排他队列仅对首次声明它的连接可见,断开时自动删除
$auto_delete = true, // 自动删除,所有与这个队列连接的消费者都断开时自动删除
$nowait = false,
$arguments = null,
$ticket = null
){}

public function queue_bind($queue, $exchange, $routing_key = '', $nowait = false, $arguments = null, $ticket = null){}

举例

1
2
3
4
5
6
$channel->exchange_declare(self::FXC_EXCHANGE, 'direct', false, true, false);
$argsAMQ = new AMQPTable();
$argsAMQ->set('x-dead-letter-exchange', self::FXC_EXCHANGE);
$argsAMQ->set('x-dead-letter-routing-key', self::FXC_ROUTING_ERROR);
$channel->queue_declare(self::FXC_MSG_QUEUE, false, true, false, false, false, $argsAMQ);
$channel->queue_bind(self::FXC_MSG_QUEUE, self::FXC_EXCHANGE, self::FXC_ROUTING_NORMAL);

5.3、发送消息

1
2
3
4
5
6
7
8
public function basic_publish(
$msg,
$exchange = '',
$routing_key = '', // 路由键
$mandatory = false, // 交换器无法根据 exchange和路由键找到对应的队列时,$mandatory 为 true,则会basic.return给生产者;如果为false,则直接丢弃
$immediate = false, // true-如果消息路由到队列以后,发现队列上没有任何消费者则返回给生产者,有消费者则立刻投递给消费者
$ticket = null
) {}


5.4、消费消息

1)推模式

broker向消费者持续不断的推消息

1
2
3
4
5
6
7
8
9
10
11
12
public function basic_consume(
$queue = '', // 队列名称
$consumer_tag = '', // 消费者标签,用于区分多个消费者
$no_local = false, // 设为true表示,不能见同一个connection中生产者发送的消息传送给这个connection的消费者
$no_ack = false, // 是否自动确认
$exclusive = false, // 是否排他
$nowait = false,
$callback = null,
$ticket = null,
$arguments = array()
) {
}

2)拉模式

消费者一条条从broker拉取消息

1
public function basic_get($queue = '', $no_ack = false, $ticket = null)

举例

1
2
3
4
5
6
7
while (true) {
$getResponse = $channel->basic_get(self::FXC_MSG_QUEUE, false);
if (empty($getResponse)) {
break;
}
$msg = json_decode($getResponse->body, true);
}

3) 幂等性

1
f(x) = f(f(x))

5.5、消费端的确认与拒绝

确认消息已消费

1
public function basic_ack($delivery_tag, $multiple = false){}

拒绝消息

1
2
3
4
5
6
// 拒绝单条信息
// requeue true-重新放回队列供其他消费者消费,false-从队列中彻底移除(有死信队列会放入死信队列)
public function basic_reject($delivery_tag, $requeue){}

// 拒绝多条信息
public function basic_nack($delivery_tag, $multiple = false, $requeue = false){}

5.6、关闭连接

1
2
$channel->close();
$connection->close();

6、rabbitmq使用进阶

6.1、消息何去何从

只有交换器没有绑定队列的消息会发送到哪?

1)mandatory

交换器无法根据 exchange和路由键找到对应的队列时,$mandatory 为 true,则会basic.return给生产者;如果为false,则直接丢弃

2)immediate

true-如果消息路由到队列以后,发现队列上没有任何消费者则返回给生产者,有消费者则立刻投递给消费者


6.2、过期时间TTL

1)设置消息的TTL

1
x-message-ttl 6000

2)设置队列的TTL

1
x-expires 180000

当队列上没有任何消费者,且过期时间内没有调用basic.get 则会被删除

如果broker重启了,持久化的队列的过期时间会被重新计算


6.3、死信队列

1
2
x-dead-letter-exchange
x-dead-letter-routing-key

6.4、延迟队列

rabbitmq本身并没有实现延迟队列,不过可以用 TTL + 死信队列 达到延迟队列的效果


6.5、优先级队列

1
2
3
4
# 1. 将队列设置成优先级队列
x-max-priority 10
# 2. 对消息设置优先级
priority

6.6、RPC实现

文档:https://www.rabbitmq.com/tutorials/tutorial-six-python.html

RPC处理流程如下:

  1. 客户端启动时,创建一个匿名回调队列(由rabbitmq自动创建);
  2. 客户端为RPC请求设置2个属性:replyTo用来告知RPC服务端回复请求时的目的队列,即回调队列;correlationId 用来标记一个请求;
  3. 请求被发送到 rpc_queue队列中;
  4. RPC 服务端 监听 rpc_queue 队列中的请求,当请求到来时,服务端会处理并且把带有结果的消息发送给客户端。接收的队列就是 replyTo设定的回调队列;
  5. 客户端监听回调队列,当有消息时,检查correlationId属性,如果与请求匹配,那就是结果了;

RPCClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";

public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

connection = factory.newConnection();
channel = connection.createChannel();
}

public static void main(String[] argv) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}

public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();

String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();

channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});

String result = response.take();
channel.basicCancel(ctag);
return result;
}

public void close() throws IOException {
connection.close();
}
}

RPCServer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import com.rabbitmq.client.*;

public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);

channel.basicQos(1);

System.out.println(" [x] Awaiting RPC requests");

Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();

String response = "";

try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};

channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}

6.7、持久化

  • 交换器持久化
  • 队列持久化
  • 消息持久化

只有 queue 和 msg 都持久化,broker重启之后才会存在;

queue不持久化,msg持久化,broker重启之后消息也会丢失;

  • 重要的消息要考虑持久化,但持久化会严重影响rabbitmq的性能。在 可靠性和吞吐量之间要做取舍;
  • 持久化并不能保证消息不会丢失。还可以考虑镜像队列、发送方确认、事务机制

6.8、生产者确认

生产者将消息发送出去以后,消息是否到达了 broker呢?

1)事务机制

1
2
3
4
5
6
7
try {
channel.txSelect();
channel.basicPublish();
channel.txCommit();
} cacth (Exception e) {
channel.txRollback();
}

不足:事务机制会吸干rabbitmq的性能。所以有些场景考虑用发送方确认机制来达到相同的目的

2)发送方确认机制

​ 生产者将新到设置为confirm模式,一旦进入confirm模式,所有在该信到上发布的消息都会被指派一个唯一ID,一旦消息被投递到匹配的队列,rabbitmq就会发送一个确认(basic.Ack)给生产者(包含唯一ID);

​ 如果消息和队列是持久化的,则会在消息写入磁盘后发出。

对比

  • 事务机制在一条消息发送之后使发送端阻塞,以等待rabbitmq的回应,之后才能发送下一条消息;
  • 发送方确认最大的好处是它是异步的

同步确认

1
2
3
4
5
6
7
8
9
try {
channel.confirmSelect();
channel.basicPublic();
if (!channel.waitForConfirms()) {
println("send msg fail.");
}
} catch (Exception e) {
e.printStackTrace();
}

批量确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
try {
channel.confirmSelect();
int msgCount = 0;
while(true) {
channel.basicPublish();
// TODO:将发送出去的消息放如缓存(List 或 BlockingQueue)
if (++msgCount >= BATCH_COUNT) {
msgCount = 0;
try {
if (channel.waitForConfirms()) {
// TODO: 将缓存中的消息清空
continue;
}
// TODO: 将缓存中的消息重新发送
} catch (InterruptedException e) {
// TODO: 将缓存中的消息重新发送
}
}
}
} catch (Exception e) {
e.printStackTrace();
}

异步确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
// TODO:注意这里需要添加 消息重发的 场景
}
});

while(true) {
long nextSeqNo = channel.getnnnextPublishSeqNo();
channel.basicPublish(...., ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}

6.9、消费端要点确认

1)消息分发

​ 默认清空下,通过轮询将消息发送给每个消费者,但如果有些消费者负载已经满了,另一些消费者空闲,就会造成整体吞吐量下降。

​ 可以通过 channel.basicQos(n) 设置信道上最大未确认消息数量,如果未确认消息数量已达到n,则不会再向这个消费者发送消息;

注意:channel.BasicQos 对 拉模式无效

1
2
3
channel.basicQos(int perfetchCount);
channel.basicQos(int perfetchCount, boolean global);
channel.basicQos(int prefetchSize, int perfetchCount, boolean global);
  • perfetchCount:预取个数,0表示无限;大于0,则信道跟队列需要协调发送的消息没有超过限定的值;
  • prefetchSize:未确认消息个数,0表示无限;大于0,则需要协调
global参数 AMQP 0-9-1 rabbitmq
false 信道上所有消费者都要遵循 prefetchCount 限制 信道上新的消费者需要遵循prefetchCount限定
true 当前链路(Connection)上所有消费者都要遵循 信道上所有消费者都要遵循 prefetchCount 限制

2)消息顺序性

在只有一个生产者一个消费者的情况下,且不使用任何rabbitmq的高级特性的情况下,消息是有序的。否则需要自己保证消息有序


3)弃用QueueingConsumer

  • 有内存溢出问题
  • 会拖累一个connection下的所有信道
  • 同步递归调用会产生死锁
  • 不是时间驱动的

6.10、消息传输保障

  • At most once:最多一次
  • At least once:至少一次
  • Exactly once:恰好一次

7、rabbitmq核心设计

7.1、存储机制

1
2
3
4
5
6
7
8
9
bash-5.0# pwd
/var/lib/rabbitmq/mnesia/rabbit@rabbitmq001/msg_stores/vhosts/628WB79CIFDYO9LJI6DKMI09L
bash-5.0# ls -l
total 20
drwxr-xr-x 2 rabbitmq rabbitmq 4096 Jan 7 06:29 msg_store_persistent
drwxr-xr-x 2 rabbitmq rabbitmq 4096 Jan 7 06:29 msg_store_transient
drwxr-xr-x 3 rabbitmq rabbitmq 4096 Apr 1 04:09 queues
-rw-r--r-- 1 rabbitmq rabbitmq 5464 Jan 7 06:29 recovery.dets
bash-5.0#

1)队列的结构

rabbitmq 中的消息可能处于一下4中状态:

  • alpha:消息内容、索引都存在内存中;
  • beta:消息内容保存在磁盘中,索引保存在内存中;
  • gamma:消息内容保存在磁盘中,消息索引保存在磁盘和内存中;
  • delta:消息内容和索引都在磁盘中;

这4种状态的主要作用是满足不同的内存和CPI需求。 alpha最耗内存,最少耗cpu;delta基本不耗内存,却最耗cpu和I/O;

  • Q1 和 Q4 只包含 alpha 状态的消息
  • Q2 和 Q3 包含 beta 和 gamma 状态的消息
  • Delta 只包含 delta 状态的消息

2)惰性队列

惰性队列会尽可能的 将消息存入磁盘,减少内存的小号,增加 I/O 的使用

队列具备两种模式:default 和 lazy

1
x-queue-mode lazy

7.2、内存和磁盘告警

1)内存告警

Connection 处于 blocking 或 blocked 状态下

可以通过 rabbitmq.conf 来设置

1
2
vm_memory_high_watermark
vm_memory_high_watermark_paging_ratio

因为erlang虚拟机的垃圾回收可能占用 2被的内存,所以 vm_memory_high_watermark 一般不超过 70%, 取值范围为 [0.4, 0.66]

vm_memory_high_watermark_paging_ratio 表示 当 vm_memory_high_watermark 范围内的内存占用超过 vm_memory_high_watermark_paging_ratio时,就会从内存swap到磁盘上


2)磁盘告警

默认情况下,当系统磁盘占用只剩 50MB时,就会报警。但50MB可能不够用,一个谨慎的做法是 设置为和操作系统的内存一样大

1
2
# 建议范围 1.0 ~ 2.0 之间
disk_free_limit.mem_relative = 1.0

rabbitmq 每 10s 会进行一次检测


7.3、流控

1)流控的原理

信用证算法(credit-based algorithm):当消息堆积到一定程度时,就会阻塞 接收上游新消息

当 connection 、channel、queue 处于 flow 状态时,表示触发了限流,在 blocked 和 unblock 之间切换

  • rabbit_reader:connection 的 处理进程,负责接收、解析 AMQP 协议数据包;
  • rabbit_channel: channel 处理进程,负责处理 AMQP 协议的各种方法,进而进行路由解析;
  • rabbit_amqqueue_process:队列的处理进程,负责实现队列的所有逻辑;
  • rabbit_msg_store:负责实现消息的持久化;

2)打破队列的瓶颈

向一个队列中推送消息时,往往会在 rabbit_amqqueue_process 中 产生瓶颈。

提升性能方案:

  1. 使用HiPE功能,保守估计可以提升 30%~40% 的 性能(不过erlang的版本至少为 18.X)
  2. 使用多个rabbit_amqqueue_process, 负载均衡的方案(创建多个队列,不同的队列绑定不同的路由键)


7.4、镜像队列