RabbitMq消息队列 本节参考自:超详细的RabbitMQ入门,看这篇就够了!-阿里云开发者社区 (aliyun.com)
👉 “消息队列(Message Queue)”的作用是在消息的传输过程中保存消息的容器,用于解耦和异步处理。
在以下业务场景下可能需要使用消息队列:
任务异步处理:将任务放入消息队列,由消费者异步处理任务,提高系统吞吐量和处理速度。
流量削峰:将请求放入消息队列,由消费者异步处理请求,避免高并发情况下系统宕机。
应用解耦:通过消息队列进行应用间通信,使得应用间解耦,提高系统的可维护性和可扩展性。
日志处理:将日志放入消息队列,由消费者异步处理日志,避免日志处理阻塞主线程。
消息广播:将消息放入消息队列,由多个消费者消费消息,实现消息广播的功能。
“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器 。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。
主要功能 异步处理 :一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能 。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。
解耦: 如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可 。
削峰 :这是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃 。
RabbitMQ的特点
可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。
入门 阿里云实验室RocketMQ 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 1. 执行如下命令,切换目录至rocketmq-4.9 .3 /bin下。cd /root/rocketmq-4.9 .3 /bin 其目录下已有如下文件: logs rocketmq-4.9 .3 rocketmq-all-4.9 .3 -bin-release.zip rocketmq-handson-apply spring-mq-bus-demo 2. 启动服务器,和查看日志nohup sh mqnamesrv & tail -f ~/logs/ rocketmqlogs/namesrv.log 看到The Name Server boot success. serializeType=JSON 表示启动成功 3. 启动端口映射nohup sh mqbroker -n localhost :9876 & tail -f ~/logs/ rocketmqlogs/broker.log 4. 执行如下命令,创建topic。cd /root/rocketmq-4.9 .3 /bin ./mqadmin updateTopic -n localhost :9876 -b localhost :10911 -t sendDataType ./mqadmin updateTopic -n localhost :9876 -b localhost :10911 -t sendOperationType ./mqadmin updateTopic -n localhost :9876 -b localhost :10911 -t syncSendOrderly ./mqadmin updateTopic -n localhost :9876 -b localhost :10911 -t sendAndReceive ./mqadmin updateTopic -n localhost :9876 -b localhost :10911 -t broadcastConsumer ./mqadmin updateTopic -n localhost :9876 -b localhost :10911 -t transactionMessage
1 2 3 4 5 1. 启动javacd /root/rocketmq-handson-apply mvn install -Dmaven .test .skip export local=$RANDOMjava -jar rocketmq-spring/target/rocketmq-spring-1.0 .0 .jar
1 2 3 4 5 6 7 发送各种请求,如发送对象、消息、集合,同步发送、异步发送、rpc发送、事务消息: wget -q -O- "http://127.0.0.1:28082/messageSend/sendObject?id=1&name=xiaoming&action=go2&ags=3" wget -q -O- "http://127.0.0.1:28082/messageSend/sendMessage?id=2&name=xiaoming&action=des&ags=3" wget -q -O- "http://127.0.0.1:28082/messageSend/sendCollection?id=2&name=xiaoming&action=des&ags=3" wget -q -O- "http://127.0.0.1:28082/messageSend/syncSend?id=2&name=xiaoming&action=des&ags=3" wget -q -O- "http://127.0.0.1:28082/messageSend/asyncSend?id=2&name=xiaoming&action=des&ags=3" wget -q -O- "http://127.0.0.1:28082/messageSend/sendAndReceive?id=2&name=xiaoming&action=des&ags=3"
windows11安装 使用docker拉取镜像,一开始使用了latest版本,启动management插件也不行,后来换成了指定的版本,自带management插件,springboot是不再报错了,但是有些页面无法访问。
1 2 3 docker pull rabbitmq :3.9 .29 -management docker run -dit --name rabbitmq3.9 .29 -e RABBITMQ_DEFAULT_USER =guest -e RABBITMQ_DEFAULT_PASS =guest -p 15672 :15672 -p 5672 :5672 rabbitmq :3.9 .29 -management
rabbitmq:3.9.29-management
版本在进入Admin页面时报错,此版本出现的问题如下,大意是EJS出问题了:
1 SyntaxError : Unexpected token 'catch' SyntaxError : Unexpected token 'catch' at EJS .Compiler .compile (http :
后换成rabbitmq:management
按照Windows版Docker安装RabbitMq - 掘金 (juejin.cn) 一步步黏贴,该版本可以访问admin页面,但是无法访问队列queue页面。可以自行选择,admin页面其实只管理用户,这个用户管理可以进入docker容器来配置。现将常用指令贴出:
1 2 3 4 5 6 7 8 9 10 11 12 13 docker exec -it a6fd7b077c5f /bin/bash rabbitmqctl list_users user tags admin [administrator] rabbitmqctl add_user test test rabbitmqctl set_user_tags test administrator rabbitmqctl add_vhost /test rabbitmqctl set_permissions -p /test test '.*' '.*' '.*'
在配置过程中,一开始以为端口号是启动时用到的15672,结果连接许久无法访问,有如下报错,都是未连接的错误,后来查询后发现配置的端口需要是5672:
1 2 3 Failed to check/redeclare auto-delete queue (s).org.springframework .amqp .AmqpIOException : java.io .IOException java.net .SocketException : Socket Closed
amqp协议 AMQP所覆盖的内容包含了网络协议 以及服务端服务
AMQP的全称为:Advanced Message Queuing Protocol(高级消息队列协议),发布者(Publisher)发布消息(Message),经由交换机(Exchange)。交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
一套被称作”高级消息队列协议模型(AMQ Model)“的消息能力定义 。该模型涵盖了Broker服务中用于路由和存储消息的组件,以及把这些组件连在一起的规则。
一个网络层协议AMQP 。能够让客户端程序与实现了AMQ Model的服务端进行通信。
深入理解 1、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。
2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。
3、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。
4、在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。
组成
Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列 。
Queue:消息队列,存储消息的队列。
Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
Consumer:消息消费者。消费队列中存储的消息。
Exchange交换机类型有四种,根据不同的类型工作的方式也有所不同。在HelloWord例子中,我们就使用了比较简单的Direct Exchange ,翻译就是直连交换机。其余三种分别是:Fanout exchange、Topic exchange、Headers exchange 。
Direct Exchange,要求该消息与一个特定的路由键完全匹配;
Fanout exchange,这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。
Topic Exchange,主题交换机,如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:”“ 、 “#”。需要注意的是通配符前面必须要加上”.”符号。 *
* 符号:有且只匹配一个词,**#
** 符号:匹配一个或多个词。
Headers Exchange,这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配 。如上图所示,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。
启动和实验 本节记录实验过程中遇到的一些小坑
创建一个公共的pom配置,即RabbitMqDemo
,该pom使用spring版本为2.7.12,添加amqp依赖
1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <-- 子模块--> <modules > <module > Common</module > <module > Consumer</module > <module > Core</module > </modules >
然后创建三个子模块:Common Consumer Core
三个子模块引用父模块的依赖和common的配置:
1 2 3 4 5 6 7 8 9 10 11 12 <parent> <artifactId>RabbitMqDemo</artifactId> <groupId>com.rabbitmq</groupId> <version>0.0 .1 -SNAPSHOT</version> </parent> <dependency> <groupId>com.common</groupId> <artifactId>Common</artifactId> <version>0.0 .1 -SNAPSHOT</version> <scope>compile</scope> </dependency>
common模块 Common负责配置文件,RabbitMQConfig
是mq的名称配置,DirectRabbitConfig
是连接配置。
RabbitMQConfig
包含队列主题名称、四种Exchange交换机的名称和配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public static final String RABBITMQ_DEMO_TOPIC = "rabbitmq.demo.topic" ; public static final String RABBITMQ_DEMO_DIRECT_EXCHANGE = "rabbipwetmq.demo.direct.exchange" ; public static final String RABBITMQ_DEMO_DIRECT_ROUTING = "rabbitmq.demo.direct.routing" ; public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_A = "fanout.A" ; public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_B = "fanout.B" ; public static final String FANOUT_EXCHANGE_DEMO_NAME = "fanout.exchange.demo.name" ; public static final String TOPIC_EXCHANGE_DEMO_NAME = "topic.exchange.demo.name" ; public static final String TOPIC_EXCHANGE_QUEUE_A = "topic.queue.a" ; public static final String TOPIC_EXCHANGE_QUEUE_B = "topic.queue.b" ; public static final String TOPIC_EXCHANGE_QUEUE_C = "topic.queue.c" ; public static final String HEADERS_EXCHANGE_DEMO_NAME = "headers.exchange.demo.name" ; public static final String HEADERS_EXCHANGE_QUEUE_A = "headers.queue.a" ; public static final String HEADERS_EXCHANGE_QUEUE_B = "headers.queue.b" ;
DirectRabbitConfig
配置连接、消息队列、队列和交换机的绑定:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 @Component @Configuration public class DirectRabbitConfig implements BeanPostProcessor { @Bean public RabbitAdmin rabbitAdmin (ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin (connectionFactory); rabbitAdmin.setAutoStartup(true ); return rabbitAdmin; } @Bean public Queue rabbitmqDemoDirectQueue () { return new Queue (RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true , false , false ); } @Bean public DirectExchange rabbitmqDemoDirectExchange () { return new DirectExchange (RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true , false ); } @Bean public Queue fanoutExchangeQueueA () { return new Queue (RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true , false , false ); } @Bean public Queue fanoutExchangeQueueB () { return new Queue (RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true , false , false ); } @Bean public FanoutExchange rabbitmqDemoFanoutExchange () { return new FanoutExchange (RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true , false ); } @Bean public Binding bindFanoutA () { return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange()); } @Bean public Binding bindFanoutB () { return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange()); } @Bean public Binding bindDirect () { return BindingBuilder .bind(rabbitmqDemoDirectQueue()) .to(rabbitmqDemoDirectExchange()) .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING); } @Bean public TopicExchange rabbitmqDemoTopicExchange () { return new TopicExchange (RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true , false ); } @Bean public Queue topicExchangeQueueA () { return new Queue (RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true , false , false ); } @Bean public Queue topicExchangeQueueB () { return new Queue (RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true , false , false ); } @Bean public Queue topicExchangeQueueC () { return new Queue (RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true , false , false ); } @Bean public Binding bindTopicA () { return BindingBuilder.bind(topicExchangeQueueB()) .to(rabbitmqDemoTopicExchange()) .with("a.*" ); } @Bean public Binding bindTopicB () { return BindingBuilder.bind(topicExchangeQueueC()) .to(rabbitmqDemoTopicExchange()) .with("a.*" ); } @Bean public Binding bindTopicC () { return BindingBuilder.bind(topicExchangeQueueA()) .to(rabbitmqDemoTopicExchange()) .with("rabbit.#" ); } @Bean public Queue headersQueueA () { return new Queue (RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true , false , false ); } @Bean public Queue headersQueueB () { return new Queue (RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B, true , false , false ); } @Bean public HeadersExchange rabbitmqDemoHeadersExchange () { return new HeadersExchange (RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, true , false ); } @Bean public Binding bindHeadersA () { Map<String, Object> map = new HashMap <>(); map.put("key_one" , "java" ); map.put("key_two" , "rabbit" ); return BindingBuilder.bind(headersQueueA()) .to(rabbitmqDemoHeadersExchange()) .whereAll(map).match(); } @Bean public Binding bindHeadersB () { Map<String, Object> map = new HashMap <>(); map.put("headers_A" , "coke" ); map.put("headers_B" , "sky" ); return BindingBuilder.bind(headersQueueB()) .to(rabbitmqDemoHeadersExchange()) .whereAny(map).match(); } }
和教程中的不同,在操作时报错出现循环依赖的问题,不使用rabbitAdmin
也没事。
Core模块 Core模块向消息队列写入,用若干个不同的controller控制向不同的queue写入。包含controller、service(impl),下面从Impl来看四种交换机方式。
1 2 3 4 String sendMsg (String msg) throws Exception; String sendMsgByFanoutExchange (String msg) throws Exception; String sendMsgByTopicExchange (String msg, String routingKey) throws Exception; String sendMsgByHeadersExchange (String msg, Map<String, Object> map) throws Exception;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public String sendMsg (String msg) throws Exception { try { String msgId = UUID.randomUUID().toString().replace("-" , "" ).substring(0 , 32 ); String sendTime = sdf.format(new Date ()); Map<String, Object> map = new HashMap <>(); map.put("msgId" , msgId); map.put("sendTime" , sendTime); map.put("msg" , msg); rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map); return "ok" ; } catch (Exception e) { e.printStackTrace(); return "error" ; } }
1 2 3 4 5 6 7 8 9 10 11 12 @Override public String sendMsgByFanoutExchange (String msg) throws Exception { Map<String, Object> message = getMessage(msg); try { rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "" , message); return "ok" ; } catch (Exception e) { e.printStackTrace(); return "error" ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public String sendMsgByTopicExchange (String msg, String routingKey) throws Exception { Map<String, Object> message = getMessage(msg); try { rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, routingKey, message); return "ok" ; } catch (Exception e) { e.printStackTrace(); return "error" ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public String sendMsgByHeadersExchange (String msg, Map<String, Object> map) throws Exception { try { MessageProperties messageProperties = new MessageProperties (); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setContentType("UTF-8" ); messageProperties.getHeaders().putAll(map); Message message = new Message (msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null , message); return "ok" ; } catch (Exception e) { e.printStackTrace(); return "error" ; } }
Consumer模块 消费者模块用来获取对应监听队列的消息,主要有以下两个注意点:
1 2 3 1. @RabbitListener 注解声明监听的队列2. @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC)) 其中queuesToDeclare 表示如果没有这个队列则创建一个队列,防止报错;
启动后可以看到各条线路的监听消息:
1 2 3 4 5 队列A收到消息:{msg=广播发布,UUID, msgId=1fe4602e0c054b878f9d21f8fbf25840, sendTime=2023 -06 -09 13 :09:39 } 队列B收到消息:{msg=广播发布,UUID, msgId=1fe4602e0c054b878f9d21f8fbf25840, sendTime=2023 -06 -09 13 :09:39 } 队列[headers.queue.a]收到消息:UUID 队列[headers.queue.b]收到消息:UUID 消费者RabbitMQDemo从消息队列中消费信息: {msg=直连写入,UUID, msgId=b58befd9034c495290540640abd8c0a4, sendTime=2023 -06 -09 13 :04 :03 }
死信队列 本节参考自RabbitMQ - 死信队列(Dead-Letter-Exchange ) - 知乎 (zhihu.com)
DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Bean public Queue ttlDirectQueue () { HashMap<String,Object> args = new HashMap <>(); args.put("x-message-ttl" ,5000 ); args.put("x-max-length" ,8 ); args.put("x-dead-letter-exchange" ,"dead_direct_exchange" ); args.put("x-dead-letter-routing-key" ,"dead" ); return new Queue ("ttl.direct.queue" ,true ,false ,false ,args); }