1 什么是消息队列
消息队列中间件是分布式系统中重要的组件,主要解决应用 耦合 ,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。通俗的来讲,消息队列就是生产者生产消息,消费者监听到消息做各自的业务操作,也就是消费消息的过程。
2下载与安装
2.1安装 Erlang (由于 RabbitMq 是基于erlang的)
RabbitMQ和Erlang的对应关系 rabbitmq .com/which-erlang.html Erlang下载地址:
安装过程简单粗暴,一直next就行
2.2安装RabbitMQ
下载地址:

安装:点next就行
2.3配置RabbitMq
2.3.1执行以下命令
启用WEB管理插件 rabbitmq-plugins enable rabbitmq_management

2.3.2访问

2.3.3进入首页
用户名密码guest/guest

3 RabbitMQ 的工作原理

1、生产者发送/发布消息到代理; 2、消费者从代理那里接收消息。RabbitMQ扮演代理中间件的角色; 3、当生产者发送消息时,它并不是直接把消息发送到队列里的,而是使用 交换机 来发送; 4、交换机把消息分发到不同的队列里,消费者就能从监听的队列中消费消息。
4六种消息模型
4.1简单模式(simple)

1.消息产生着将消息放入队列; 2.消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被消费后,自动从队列中删除; 3.缺点:这种模式下消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。 4.应用场景:客户端服务端模式的聊天程序。
5.代码 (1)先定义一个简单的队列存储消息
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:25
* @description:
* @modified By:
* @version: $
*/@Configuration
public class SimpleQueue {
/**
* 创建一个简单的队列,叫hello
* @return
*/ @Bean
public Queue queue() {
return new Queue("hello");
}
}
(2)定义消费者
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:25
* @description:消费者,监听hello队列
* @modified By:
* @version: $
*/@Component
public class Consumer1 {
@RabbitListener(queues = "hello")
@RabbitHandler
public void receive(String msg){
System.out.println("Consumer1收到消息:"+msg);
}
}
(3)定义生产者
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:27
* @description:这里直接用 Rest 接口来做生产者
* @modified By:
* @version: $
*/@RestController
@RequestMapping("/")
public class RabbitRestController {
@Autowired
@Qualifier("fireRabbitTemplate")
private RabbitTemplate rabbitTemplate;
private final Logger log =LoggerFactory.getLogger(getClass());
@RequestMapping("/send")
public String send() {
String context = "hello==========" + new Date();
log.info("发送消息 : " + context);
//生产者,正在往hello这个路由规则中发送,由于没有交换机,所以路由规则就是队列名称
this.rabbitTemplate.convertAndSend("hello", context);
return "success";
}
}
(4)前端发送Rest请求,看控制台效果

4.2工作模式(work)

1.消息产生者将消息放入队列。生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢; 2.消费者A,消费者B,当然可以更多,同时监听同一个队列,消费者共同争抢当前的消息队列内容,谁先拿到谁负责消费消息; 3.缺点:高并发情况下,会产生某一个消息被多个消费者共同消费。 4.应用场景:发红包
5.代码 work模式我们只需要在简单模式的基础上添加一个消费者,也监听hello这个队列 (1)添加消费者2
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:29
* @description:
* @modified By:
* @version: $
*/@Component
public class Consumer2 {
@RabbitHandler
@RabbitListener(queues = "hello")
public void receive(String msg){
System.out.println("Consumer2收到消息:"+msg);
}
}
(2)前端发送Rest请求,看控制台效果 发送四次请求,消费者1和2分别接收到两次

4.3发布订阅模式(publish/subscribe)

1.X代表交换机rabbitMQ内部组件,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中; 2.消费者监听队列,对应消息队列的消费者拿到消息进行消费; 3.相关场景:邮件群发,群聊天。
4.代码 (1)定义交换机与队列
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:47
* @description:交换机-发布订阅模式
* @modified By:
* @version: $
*/@Configuration
public class QueueExchange {
@Bean
public Queue queueA() {
return new Queue("queueA", true);
}
@Bean
public Queue queueB() {
return new Queue("queueB", true);
}
/**
* 创建一个fanoutExchange交换机
*/ @Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
/**
* 将queueA队列绑定到fanoutExchange交换机上面
*/ @Bean
Binding bindingExchangeMessageFanoutA(Queue queueA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueA).to(fanoutExchange);
}
/**
* 将queueB队列绑定到fanoutExchange交换机上面
*/ @Bean
Binding bindingExchangeMessageFanoutB(Queue queueB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueB).to(fanoutExchange);
}
}
(2)定义消费者
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:消费者3
* @modified By:
* @version: $
*/@Component
public class Consumer3 {
@RabbitHandler
@RabbitListener(queues = "queueA")
public void receive(String msg){
System.out.println("Consumer3收到消息:"+msg);
}
}
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:消费者4
* @modified By:
* @version: $
*/@Component
public class Consumer4 {
@RabbitHandler
@RabbitListener(queues = "queueB")
public void receive(String msg){
System.out.println("Consumer4收到消息:"+msg);
}
}
(3)定义生产者
@RequestMapping("/sendExchange")
public String sendToExchange(){
String context = "exchange=======" + new Date();
log.info("发送消息 : " + context);
//生产者,正在往交换机发送消息,交换机会根据绑定的队列来发送(如果多个客户端监听同一个队列,只有一个能收到消息)
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
return "success";
}
(4)前端发送Rest请求,看控制台效果 这里消费者3和4都收到了消息,因为他们分别监听不同的两个队列

4.4 路由模式 (routing)
1.消息生产者将消息发送给交换机按照路由判断,路由是字符串,交换机根据路由的key去匹配,只有匹配上路由key对应的消息队列,对应的消费者才能消费消息; 2.根据业务功能定义路由字符串; 3.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中; 4.业务场景:统一门户和子系统交互,每个子系统对应不同的业务处理,通过路由key分发不同的消息到对应子系统队列,完成消息消费。 5.代码 (1)定义交换机与队列,绑定路由key
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 17:06
* @description:路由模式
* @modified By:
* @version: $
*/@Configuration
public class QueueRouter {
public static final String DIRECT_EXCHANGE = "directExchange";
public static final String QUEUE_DIRECT_A = "direct.A";
public static final String QUEUE_DIRECT_B = "direct.B";
/**
* 创建一个direct交换机
* @return
*/ @Bean
DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
Queue queueDirectNameA() {
return new Queue(QUEUE_DIRECT_A);
}
/**
* 创建队列
* @return
*/ @Bean
Queue queueDirectNameB() {
return new Queue(QUEUE_DIRECT_B);
}
/**
* 将direct.A队列绑定到directExchange交换机中,使用a.key作为路由规则
* @param queueDirectNameA
* @param directExchange
* @return
*/ @Bean
Binding bindingExchangeMessageDirectA(Queue queueDirectNameA, DirectExchange directExchange) {
return BindingBuilder.bind(queueDirectNameA).to(directExchange).with("a.key");
}
/**
* 将direct.B队列绑定到directExchange交换机中,使用b.key作为路由规则
* @param queueDirectNameB
* @param directExchange
* @return
*/ @Bean
Binding bindingExchangeMessageDirectB(Queue queueDirectNameB, DirectExchange directExchange) {
return BindingBuilder.bind(queueDirectNameB).to(directExchange).with("b.key");
}
(2)定义消费者
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:
* @modified By:
* @version: $
*/@Component
public class Consumer5 {
@RabbitListener(queues = QueueRouter.QUEUE_DIRECT_A)
@RabbitHandler
public void receiveA(String msg){
System.out.println("Consumer5-direct-A收到路由消息:"+msg);
}
@RabbitListener(queues = QueueRouter.QUEUE_DIRECT_B)
@RabbitHandler
public void receiveB(String msg){
System.out.println("Consumer5-direct-B收到路由消息:"+msg);
}
}
(3)定义生产者
@RequestMapping("/sendRouter")
public String sendToExchangeByRouter(){
String context = "exchange=======" + new Date();
log.info("发送路由消息 : " + context);
//生产者,正在往交换机发送消息,队列绑定了不同路由规则,交换机会使用a.key作为路由规则来发送
this.rabbitTemplate.convertAndSend(QueueRouter.DIRECT_EXCHANGE,"a.key", context);
return "success";
}
(4)发送前端请求,看效果 可以看到只有监听了QUEUE DIRECT A的消费者能收到消息,因为队列A使用的路由key为a.key

4.5主题模式(topic)
注意:与路由模式的区别就是路由key可以是通配符,模糊匹配。交换机类型为topic 1.星号井号代表通配符; 2.星号代表多个单词, 井号 代表一个单词; 3.路由功能添加模糊匹配; 4.消息产生者产生消息,把消息交给交换机; 5.交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。 5.代码 (1)定义队列与topic交换机
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 17:06
* @description:主题模式
* @modified By:
* @version: $
*/@Configuration
public class QueueTopic {
public static final String TOPIC_EXCHANGE = "topicExchange";
public static final String DIRECT_REGXA = "nr.topic.#";
public static final String DIRECT_REGXB = "nr.topic.b";
public static final String DIRECT_REGXC = "nr.topic.c";
public static final String QUEUE_TOPIC_A = "topic.A";
public static final String QUEUE_TOPIC_B = "topic.B";
public static final String QUEUE_TOPIC_C = "topic.C";
/**
* 创建一个topic交换机
* @return
*/ @Bean
TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
/**
* 创建队列
* @return
*/ @Bean
Queue queueTopicNameA() {
return new Queue(QUEUE_TOPIC_A);
}
@Bean
Queue queueTopicNameB() {
return new Queue(QUEUE_TOPIC_B);
}
@Bean
Queue queueTopicNameC() {
return new Queue(QUEUE_TOPIC_C);
}
/**
* 将direct.A队列绑定到topicExchange交换机中,使用nr.topic.#作为路由规则
* @param queueTopicNameA
* @param topicExchange
* @return
*/ @Bean
Binding bindingExchangeMessageTopicA(Queue queueTopicNameA, TopicExchange topicExchange) {
return BindingBuilder.bind(queueTopicNameA).to(topicExchange).with(DIRECT_REGXA);
}
/**
* 将direct.B队列绑定到topicExchange交换机中,使用nr.topic.b作为路由规则
* @param queueTopicNameB
* @param topicExchange
* @return
*/ @Bean
Binding bindingExchangeMessageTopicB(Queue queueTopicNameB, TopicExchange topicExchange) {
return BindingBuilder.bind(queueTopicNameB).to(topicExchange).with(DIRECT_REGXB);
}
/**
* 将direct.B队列绑定到topicExchange交换机中,使用nr.topic.c作为路由规则
* @param queueTopicNameC
* @param topicExchange
* @return
*/ @Bean
Binding bindingExchangeMessageTopicC(Queue queueTopicNameC, TopicExchange topicExchange) {
return BindingBuilder.bind(queueTopicNameC).to(topicExchange).with(DIRECT_REGXC);
}
}
(2)定义消费者
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:
* @modified By:
* @version: $
*/@Component
public class Consumer6 {
@RabbitListener(queues = QueueTopic.QUEUE_TOPIC_A)
@RabbitHandler
public void receiveA(String msg){
System.out.println("Consumer6-topic-A收到路由消息:"+msg);
}
@RabbitListener(queues = QueueTopic.QUEUE_TOPIC_B)
@RabbitHandler
public void receiveB(String msg){
System.out.println("Consumer6-topic-B收到路由消息:"+msg);
}
@RabbitListener(queues = QueueTopic.QUEUE_TOPIC_C)
@RabbitHandler
public void receiveC(String msg){
System.out.println("Consumer6-topic-C收到路由消息:"+msg);
}
}
(3)定义生产者
@RequestMapping("/sendTopic")
public String sendToExchangeByTopic(){
String context = "topic=======" + new Date();
log.info("发送topic消息 : " + context);
//生产者,正在往topic交换机发送消息,队列绑定了不同路由规则,交换机会使用nr.topic.b作为路由规则来发送
// 用nr.topic.b和nr.topic.#作为路由key的队列都能收到消息
this.rabbitTemplate.convertAndSend(QueueTopic.TOPIC_EXCHANGE,"nr.topic.b", context);
return "success";
}
(4)前端发送Rest请求,看效果 和预期一样,A和B都收到了消息

4.6 RPC模式
注:图片来源于官网
- 客户端启动时,它将创建一个匿名排他回调队列。
- 对于RPC请求,客户端发送一条消息,该消息具有两个属性: reply to(设置为回 调队列)和correlation id(设置为每个请求的唯一值)。
- 求被发送到rpc_queue队列。
- RPC工作程序(又名:服务器)正在等待该队列上的请求。出现请求时,它将使用reply_to字段中的队列来完成工作,并将消息和结果发送回客户端。
- 客户端等待回调队列上的数据。当出现一条消息时,它将检查correlation_id属性。如果它与请求中的值匹配,则将响应返回给应用程序。 注:此处来源于官网,这里只做简单介绍,详情可看官网
5消息确认机制(ACK)
业务系统中,消息丢了怎么办,消息发送到哪了?我们通常需要一些消息补偿机制去处理这些问题。
消息确认分为两种,发送确认和接收确认
5.1消息发送确认
确认生产者将消息发送给交换机,交换机传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换机,二是确认是否到达队列 (1)通过实现ConfirmCallBack接口确认消息发送到交换机 代码:
/**
* 如果消息到达交换机, 则 confirm 回调, ack = true
* 如果消息不到达交换机, 则 confirm 回调, ack = false
* 需要设置spring.rabbitmq.publisher-confirm-type=correlated
*/ rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
log.info("收到回调:{}", ack == true ? "消息成功到达交换机" : "消息到达交换机失败");
if (!ack) {
log.info("correlationData:{}", correlationData.getId());
log.info("消息到达交换机失败原因:{}", cause);
// 根据业务逻辑实现消息补偿机制
}
});
(2)通过实现ReturnCallback接口确认消息从交换机发送到队列 代码:
/**
* 消息从交换机到达队列成功, 则 returnedMessage 不回调
* 消息从交换机到达队列失败, 则 returnedMessage 回调
* 需要设置spring.rabbitmq.publisher-returns=true
*/ rabbitTemplate.setReturnsCallback(returnedMessage->{
log.info("消息未到达队列,setReturnsCallback回调");
log.info("消息报文:{}", new String(returnedMessage.getMessage().getBody()));
log.info("消息编号:{}", returnedMessage.getReplyCode());
log.info("描述:{}", returnedMessage.getReplyText());
log.info("交换机名称:{}", returnedMessage.getExchange());
log.info("路由名称:{}", returnedMessage.getRoutingKey());
// 根据业务逻辑实现消息补偿机制
});
5.1消息接收确认
(1)确认模式
AcknowledgeMode.NONE:不确认 AcknowledgeMode.AUTO:自动确认 AcknowledgeMode.MANUAL:手动确认 需要配置:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
(2)代码: 消费者确认
/**
* @author :Mr.Fire
* @date :Created in 2021/4/25 16:15
* @description:
* @modified By:
* @version: $
*/@Component
public class Consumer8 {
@RabbitListener(queues = QueueRouter.QUEUE_DIRECT_A)
@RabbitHandler
public void receiveA(Message msg, Channel channel) throws IOException {
try {
//消息确认机制还可以起到限流作用,比如在接收到此条消息时休眠几秒钟
Thread.sleep(3000);
// 确认收到消息,消息将被队列移除
// false只确认当前consumer一个消息收到,true确认所有consumer获得的消息。
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Consumer8-direct-A收到确认消息:"+msg);
}
}
生产者
@RequestMapping("/sendAck")
@ResponseBody
public String sendAck() {
String context = "exchange=======" + new Date();
log.info("发送确认消息 : " + context);
//生产者,正在往交换机发送消息,队列绑定了不同路由规则,交换机会使用a.key作为路由规则来发送
this.rabbitTemplate.convertAndSend(QueueRouter.DIRECT_EXCHANGE,"a.key", context);
return "success";
}
前端请求看效果:

当然还有其他,比如失败确认basicNack,拒绝basicReject,basicPublish重新发布等,以后有机会在讲解…
6 Spring Boot 整合 RabbitMQ
1.新建一个Maven工程

勾选Spring Web和RabbitMq的依赖,也可以建好工程自己添加

创建完成,来看看POM文件依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.配置yml或者properties,我这里使用properties
server.port= 8090
spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#消息确认需要配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
#手动确认消息
spring.rabbitmq.listener.direct.acknowledge-mode=manual
3.启动项目

4.到此,已经可以开始开发你自己的业务逻辑了
7扩展面试思考题
1.消息基于什么传输? 2.如何避免消息重复投递或重复消费? 3.如何保证消息不丢失? 4.手动确认模式中,消息手动拒绝中如果requeue为true会重新放入队列,消费者处理过程中一直有异常情况下会导致入队-拒绝-入队的死循环,怎么处理?