中间件学习--Rabbitmq 消息队列

Huang Zhiwei

RabbitMq消息队列

本节参考自:超详细的RabbitMQ入门,看这篇就够了!-阿里云开发者社区 (aliyun.com)

“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。

主要功能

异步处理:一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。

异步处理

解耦:如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

解耦

削峰:这是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃

数据库削峰

RabbitMQ的特点

  • 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。

入门

阿里云实验室RocketMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1. 执行如下命令,切换目录至rocketmq-4.9.3/bin下。
cd /root/rocketmq-4.9.3/bin
其目录下已有如下文件:
logs rocketmq-4.9.3 rocketmq-all-4.9.3-bin-release.zip rocketmq-handson-apply spring-mq-bus-demo
2. 启动服务器,和查看日志
nohup sh mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
看到The Name Server boot success. serializeType=JSON 表示启动成功
3. 启动端口映射
nohup sh mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
4. 执行如下命令,创建topic。
cd /root/rocketmq-4.9.3/bin
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendDataType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendOperationType
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t syncSendOrderly
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t sendAndReceive
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t broadcastConsumer
./mqadmin updateTopic -n localhost:9876 -b localhost:10911 -t transactionMessage

1
2
3
4
5
1. 启动java
cd /root/rocketmq-handson-apply
mvn install -Dmaven.test.skip
export local=$RANDOM
java -jar rocketmq-spring/target/rocketmq-spring-1.0.0.jar
1
2
3
4
5
6
7
发送各种请求,如发送对象、消息、集合,同步发送、异步发送、rpc发送、事务消息:
wget -q -O- "http://127.0.0.1:28082/messageSend/sendObject?id=1&name=xiaoming&action=go2&ags=3"
wget -q -O- "http://127.0.0.1:28082/messageSend/sendMessage?id=2&name=xiaoming&action=des&ags=3"
wget -q -O- "http://127.0.0.1:28082/messageSend/sendCollection?id=2&name=xiaoming&action=des&ags=3"
wget -q -O- "http://127.0.0.1:28082/messageSend/syncSend?id=2&name=xiaoming&action=des&ags=3"
wget -q -O- "http://127.0.0.1:28082/messageSend/asyncSend?id=2&name=xiaoming&action=des&ags=3"
wget -q -O- "http://127.0.0.1:28082/messageSend/sendAndReceive?id=2&name=xiaoming&action=des&ags=3"

windows11安装

使用docker拉取镜像,一开始使用了latest版本,启动management插件也不行,后来换成了指定的版本,自带management插件,springboot是不再报错了,但是有些页面无法访问。

1
2
3
docker pull rabbitmq:3.9.29-management
// 指定启动的端口和用户名密码
docker run -dit --name rabbitmq3.9.29 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 rabbitmq:3.9.29-management

rabbitmq:3.9.29-management版本在进入Admin页面时报错,此版本出现的问题如下,大意是EJS出问题了:

1
SyntaxError: Unexpected token 'catch' SyntaxError: Unexpected token 'catch' at EJS.Compiler.compile (http://localhost:15672/js/ejs-1.0.min.js:1:6659) at new EJS (http://localhost:15672/js/ejs-1.0.min.js:1:1625) at format (http://localhost:15672/js/main.js:1164:20) at http://localhost:15672/js/main.js:486:24 at with_reqs (http://localhost:15672/js/main.js:1147:9) at http://localhost:15672/js/main.js:1143:17 at req.onreadystatechange (http://localhost:15672/js/main.js:1238:17)

安装异常

后换成rabbitmq:management 按照Windows版Docker安装RabbitMq - 掘金 (juejin.cn) 一步步黏贴,该版本可以访问admin页面,但是无法访问队列queue页面。可以自行选择,admin页面其实只管理用户,这个用户管理可以进入docker容器来配置。现将常用指令贴出:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 进入容器
docker exec -it a6fd7b077c5f /bin/bash
// 查看当前用户列表
rabbitmqctl list_users
// 查询结果
user tags
admin [administrator]

// 添加用户指令和配置virtual host指令
rabbitmqctl add_user test test
rabbitmqctl set_user_tags test administrator
rabbitmqctl add_vhost /test
rabbitmqctl set_permissions -p /test test '.*' '.*' '.*'

在配置过程中,一开始以为端口号是启动时用到的15672,结果连接许久无法访问,有如下报错,都是未连接的错误,后来查询后发现配置的端口需要是5672:

1
2
3
Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.AmqpIOException: java.io.IOException
java.net.SocketException: Socket Closed

amqp协议

AMQP所覆盖的内容包含了网络协议以及服务端服务

AMQP的全称为:Advanced Message Queuing Protocol(高级消息队列协议),发布者(Publisher)发布消息(Message),经由交换机(Exchange)。交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

https://pic3.zhimg.com/80/v2-15033c8d6081f2b600802945620807a2_720w.webp

  • 一套被称作”高级消息队列协议模型(AMQ Model)“的消息能力定义。该模型涵盖了Broker服务中用于路由和存储消息的组件,以及把这些组件连在一起的规则。
  • 一个网络层协议AMQP。能够让客户端程序与实现了AMQ Model的服务端进行通信。

深入理解

1、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。

2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。

3、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。

4、在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

组成

  • Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。
  • Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列
  • Queue:消息队列,存储消息的队列。
  • Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
  • Consumer:消息消费者。消费队列中存储的消息。

Exchange交换机类型有四种,根据不同的类型工作的方式也有所不同。在HelloWord例子中,我们就使用了比较简单的Direct Exchange,翻译就是直连交换机。其余三种分别是:Fanout exchange、Topic exchange、Headers exchange

  • Direct Exchange,要求该消息与一个特定的路由键完全匹配;

  • Fanout exchange,这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。

  • Topic Exchange,主题交换机,如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:”“ 、 “#”。需要注意的是通配符前面必须要加上”.”符号。** 符号:有且只匹配一个词,**#** 符号:匹配一个或多个词。

  • Headers Exchange,这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。如上图所示,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。

启动和实验

本节记录实验过程中遇到的一些小坑

创建一个公共的pom配置,即RabbitMqDemo ,该pom使用spring版本为2.7.12,添加amqp依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<-- 子模块-->
<modules>
<module>Common</module>
<module>Consumer</module>
<module>Core</module>
</modules>

然后创建三个子模块:Common Consumer Core 三个子模块引用父模块的依赖和common的配置:

1
2
3
4
5
6
7
8
9
10
11
12
<parent>
<artifactId>RabbitMqDemo</artifactId>
<groupId>com.rabbitmq</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>

<dependency>
<groupId>com.common</groupId>
<artifactId>Common</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

common模块

Common负责配置文件,RabbitMQConfig是mq的名称配置,DirectRabbitConfig 是连接配置。

RabbitMQConfig 包含队列主题名称、四种Exchange交换机的名称和配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* RabbitMQ的队列主题名称
*/
public static final String RABBITMQ_DEMO_TOPIC = "rabbitmq.demo.topic";

/**
* RabbitMQ的DIRECT交换机名称
*/
public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE = "rabbipwetmq.demo.direct.exchange";

/**
* RabbitMQ的DIRECT交换机和队列绑定的匹配键 DirectRouting
*/
public static final String RABBITMQ_DEMO_DIRECT_ROUTING = "rabbitmq.demo.direct.routing";

/**
* RabbitMQ的FANOUT_EXCHANG交换机类型的队列 A 的名称
*/
public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_A = "fanout.A";

/**
* RabbitMQ的FANOUT_EXCHANG交换机类型的队列 B 的名称
*/
public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_B = "fanout.B";

/**
* RabbitMQ的FANOUT_EXCHANG交换机类型的名称
*/
public static final String FANOUT_EXCHANGE_DEMO_NAME = "fanout.exchange.demo.name";

/**
* RabbitMQ的TOPIC_EXCHANGE交换机名称
*/
public static final String TOPIC_EXCHANGE_DEMO_NAME = "topic.exchange.demo.name";

/**
* RabbitMQ的TOPIC_EXCHANGE交换机的队列A的名称
*/
public static final String TOPIC_EXCHANGE_QUEUE_A = "topic.queue.a";

/**
* RabbitMQ的TOPIC_EXCHANGE交换机的队列B的名称
*/
public static final String TOPIC_EXCHANGE_QUEUE_B = "topic.queue.b";

/**
* RabbitMQ的TOPIC_EXCHANGE交换机的队列C的名称
*/
public static final String TOPIC_EXCHANGE_QUEUE_C = "topic.queue.c";

/**
* HEADERS_EXCHANGE交换机名称
*/
public static final String HEADERS_EXCHANGE_DEMO_NAME = "headers.exchange.demo.name";

/**
* RabbitMQ的HEADERS_EXCHANGE交换机的队列A的名称
*/
public static final String HEADERS_EXCHANGE_QUEUE_A = "headers.queue.a";

/**
* RabbitMQ的HEADERS_EXCHANGE交换机的队列B的名称
*/
public static final String HEADERS_EXCHANGE_QUEUE_B = "headers.queue.b";

DirectRabbitConfig 配置连接、消息队列、队列和交换机的绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@Component
@Configuration
public class DirectRabbitConfig implements BeanPostProcessor {

// @Resource
// private RabbitAdmin rabbitAdmin;

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}

@Bean
public Queue rabbitmqDemoDirectQueue() {
//第一个参数:队列主题名称 第二个参数:是否持久化
return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
}

@Bean
public DirectExchange rabbitmqDemoDirectExchange() {
//Direct交换机
return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
}

@Bean
public Queue fanoutExchangeQueueA() {
//队列A
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true, false, false);
}

@Bean
public Queue fanoutExchangeQueueB() {
//队列B
return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true, false, false);
}

@Bean
public FanoutExchange rabbitmqDemoFanoutExchange() {
//创建FanoutExchange类型交换机
return new FanoutExchange(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true, false);
}

@Bean
public Binding bindFanoutA() {
//队列A绑定到FanoutExchange交换机
return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange());
}

@Bean
public Binding bindFanoutB() {
//队列B绑定到FanoutExchange交换机
return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange());
}

@Bean
public Binding bindDirect() {
//链式写法,绑定交换机和队列,并设置匹配键
return BindingBuilder
//绑定队列
.bind(rabbitmqDemoDirectQueue())
//到交换机
.to(rabbitmqDemoDirectExchange())
//并设置匹配键
.with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
}

@Bean
public TopicExchange rabbitmqDemoTopicExchange() {
//配置TopicExchange交换机
return new TopicExchange(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true, false);
}

@Bean
public Queue topicExchangeQueueA() {
//创建队列1
return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true, false, false);
}

@Bean
public Queue topicExchangeQueueB() {
//创建队列2
return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true, false, false);
}

@Bean
public Queue topicExchangeQueueC() {
//创建队列3
return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true, false, false);
}

@Bean
public Binding bindTopicA() {
//队列A绑定到FanoutExchange交换机
return BindingBuilder.bind(topicExchangeQueueB())
.to(rabbitmqDemoTopicExchange())
.with("a.*");
}

@Bean
public Binding bindTopicB() {
//队列A绑定到FanoutExchange交换机
return BindingBuilder.bind(topicExchangeQueueC())
.to(rabbitmqDemoTopicExchange())
.with("a.*");
}

@Bean
public Binding bindTopicC() {
//队列A绑定到FanoutExchange交换机
return BindingBuilder.bind(topicExchangeQueueA())
.to(rabbitmqDemoTopicExchange())
.with("rabbit.#");
}

@Bean
public Queue headersQueueA() {
return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true, false, false);
}

@Bean
public Queue headersQueueB() {
return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B, true, false, false);
}

@Bean
public HeadersExchange rabbitmqDemoHeadersExchange() {
return new HeadersExchange(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, true, false);
}

@Bean
public Binding bindHeadersA() {
Map<String, Object> map = new HashMap<>();
map.put("key_one", "java");
map.put("key_two", "rabbit");
//全匹配
return BindingBuilder.bind(headersQueueA())
.to(rabbitmqDemoHeadersExchange())
.whereAll(map).match();
}

@Bean
public Binding bindHeadersB() {
Map<String, Object> map = new HashMap<>();
map.put("headers_A", "coke");
map.put("headers_B", "sky");
//部分匹配
return BindingBuilder.bind(headersQueueB())
.to(rabbitmqDemoHeadersExchange())
.whereAny(map).match();
}
// @Override
// public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// rabbitAdmin.declareExchange(rabbitmqDemoDirectExchange());
// rabbitAdmin.declareQueue(rabbitmqDemoDirectQueue());
// return null;
// }
}

和教程中的不同,在操作时报错出现循环依赖的问题,不使用rabbitAdmin 也没事。

Core模块

Core模块向消息队列写入,用若干个不同的controller控制向不同的queue写入。包含controller、service(impl),下面从Impl来看四种交换机方式。

1
2
3
4
String sendMsg(String msg) throws Exception;
String sendMsgByFanoutExchange(String msg) throws Exception;
String sendMsgByTopicExchange(String msg, String routingKey) throws Exception;
String sendMsgByHeadersExchange(String msg, Map<String, Object> map) throws Exception;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//直连
@Override
public String sendMsg(String msg) throws Exception {
try {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
//Fanout发布,给到fanout交换机,交换机发布到与之绑定的fanoutqueueA和fanoutqueueB上
@Override
public String sendMsgByFanoutExchange(String msg) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
//topic发布,参考config中的队列和交换机配置,看到A队列和B队列都是绑定到"a.*"的,C队列绑定到"rabbit.#"
@Override
public String sendMsgByTopicExchange(String msg, String routingKey) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
//发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, routingKey, message);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//headers发布,参考config中的队列和交换机配置,.whereAll(map).match()和whereAny(map).match()的绑定区别
@Override
public String sendMsgByHeadersExchange(String msg, Map<String, Object> map) throws Exception {
try {
MessageProperties messageProperties = new MessageProperties();
//消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
//添加消息
messageProperties.getHeaders().putAll(map);
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null, message);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}

Consumer模块

消费者模块用来获取对应监听队列的消息,主要有以下两个注意点:

1
2
3
1.@RabbitListener注解声明监听的队列
2.@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
其中queuesToDeclare 表示如果没有这个队列则创建一个队列,防止报错;

启动后可以看到各条线路的监听消息:

1
2
3
4
5
队列A收到消息:{msg=广播发布,UUID, msgId=1fe4602e0c054b878f9d21f8fbf25840, sendTime=2023-06-09 13:09:39}
队列B收到消息:{msg=广播发布,UUID, msgId=1fe4602e0c054b878f9d21f8fbf25840, sendTime=2023-06-09 13:09:39}
队列[headers.queue.a]收到消息:UUID
队列[headers.queue.b]收到消息:UUID
消费者RabbitMQDemo从消息队列中消费信息: {msg=直连写入,UUID, msgId=b58befd9034c495290540640abd8c0a4, sendTime=2023-06-09 13:04:03}

死信队列

本节参考自RabbitMQ - 死信队列(Dead-Letter-Exchange ) - 知乎 (zhihu.com)

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。

消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//设置死信队列
@Bean
public Queue ttlDirectQueue(){
HashMap<String,Object> args = new HashMap<>();
//消息过期时间
args.put("x-message-ttl",5000);
//消息队列最大长度、
args.put("x-max-length",8);
//绑定交换机
args.put("x-dead-letter-exchange","dead_direct_exchange");
//死信队列路由
args.put("x-dead-letter-routing-key","dead");
return new Queue("ttl.direct.queue",true,false,false,args);
}
  • 标题: 中间件学习--Rabbitmq 消息队列
  • 作者: Huang Zhiwei
  • 创建于: 2023-06-09 09:53:23
  • 更新于: 2023-09-02 23:06:15
  • 链接: https://huangzhw0221.github.io/2023/06/09/Middleware-Rabbitmq/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
 评论