简介
RabbitMQ是开源的高级消息队列协议,它是用Erlang语言进行开发的,支持多种客户端。
RabbitMQ是目前应用广泛的消息中间件,当然目前的使用趋势逐步的偏向RocketMQ和Kafka。
在企业级应用,电商应用,微服务等等应用中,消息队列都担任着相当重要的角色,例如在业务服务模块中的解耦,异步通信,削峰,限流,超时业务等。
但值得注意的是RabbitMQ和单机吞吐量远不如RocketMQ和Kafka,社区活跃度目前处于不冷不热的状态,由于它是基于Erlang开发,所以它的并发能力很强,性能极好,延时很低。
使用场景
推送通知
这里也就是我们常说的”发布\订阅“功能,它也是RabbitMQ中的重要功能。我们可以用”发布\订阅“功能来实现通知等。消费者(consumer)监听RabbitMQ队列中的数据。如果队列中有数据,则消费者会按照”先进先出“的原则逐条进行消费。而生产者(producer)只需要将数据存入队列中即可。这样降低了不同系统之间的耦合度,也确保了消息通知的及时性,且不影响系统的性能。
”发布\订阅“中支持三种模式:一对一、一对多、广播。着三种模式都可以根据规则选择分发的对象,同时,消费者(consumer)也可以制定规则来选择是否接受这些订阅数据。
异步任务
后端系统街道任务之后,将其分解成多个小任务,只要完成这些小任务,那么整个流程任务就可以完成。但是如果某一个或者某几个小任务是耗时任务,且对核心任务影响不大,则可以将这些任务放入消息队列中去处理,加快主要业务的处理速度。
跨应用通信
RabbitMQ中提供两种事务模式:
- AMQP事务模式
- Confirm事务模式
RabbitMQ可以用于不同开发语言应用之间的通信(如Java和C++进行通信),实现企业应用的集成。由于消息队列是和平台和语言无关的,并且语义上也不是函数调用,因此,RabbitMQ适合作为多个应用之间的中间件或者说是接口,且不需要发送方和接收方同时在线。
不同语言的解耦,可以最大幅度的减少此程序之间的相互依赖,提高系统的可用性以及可扩展性。
消息延迟
这个场景常出现在电商系统中,利用RabbitMQ消息队列延迟,可以实现订单、支付过期定时过期定时取消等功能。延迟队列存储延时消息,所以当消息被发送以后,消费者不是立即拿到消息,而是等待指定时间之后才能拿到这个消息进行消费。
当然,这个功能也可以利用其它第三方库或者定时任务进行开发。
但值得注意的是,单用RabbitMQ来实现的话,需要确认版本在3.5.8及以上,同时一般需要官方提供的依赖插件rabbitmq_delayed_message_excahnge来实现。
常见的实现途径是通过定时任务来完成的。
远程过程调用
在实际的应用场景中,有时候需要一些同步处理,以等待服务器端将消息处理完成后再执行下一步操作,这相当于RPC(远程过程调用)。RabbitMQ也支持RPC。
基本概念及特性
特性
- 信息确认:自动应答和手动应答
- 队列持久化
- 信息持久化
- 消息拒收
- ...
生产者、消费者和代理
- 生产者: 消息的创建方,负责创建和推送数据到消息服务器。
- 消费者: 消息的接收方,用于处理数据和确认消息。
- 代理: RabbitMQ本身,本身并不产生数据,扮演搬运工的角色。
消息队列
MQ的全称是Message Queue,意为消息队列之意。Queue是RabbitMQ的内部对象,用于存储生产者的消息知道发送给消费者进行消费,也是消费者接受消息的地方。RabbitMQ的消息也都之恶能存储在Queue中,多个消费者可以订阅同一个Queue。
Queue有以下一些重要的属性:
- 持久性: 如果启用,则队列将会在消息协商器(broker)重启之前都有效。
- 自动删除: 如果启用,则队列将会在所有消费者停止使用之后自动删除掉。
- 惰性: 如果没有声明队列,则应用程序调用队列时导致的异常不会主动报警。
- 排他性: 如果启用,则声明它的消费者才能使用。
交换机(Exchange)
交换机用于接收、分配消息。生产者需要预先指定一个routing key,然后将消息发送到交换机,这个routing key需要与Exchange Type及binding key联合使用才能最终生效,然后交换机将消息路由到一个或者多个队列中。
在虚拟主机的消息协商器(broker)中,每个Exchange都有唯一的名字。
Exchange包含4中类型:
- direct
- topic
- fanout
- headers
以上不同的类型代表着绑定的队列的行为的不同。
direct
此类型的行为是先匹配,再投送,在绑定队列时会设定一个routing key,只有在消息的routing key与队列匹配时,消息才会被交换机投送到绑定的额队列中。允许一个队列通过一个固定的routing key进行绑定。
此模式是RabbitMQ的默认交换机模式,也是最简单的模式,它是根据routing key进行全文匹配去寻找队列的。
topic
此类型是按规则转发消息。意为主题交换机(topic exchange),转发效益主要根据通配符。队列和交换机的绑定会定义一种路由模式,通配符就要在这种路由模式和路由键之间匹配后,交换机才会转发消息。
在这种交换机模式下,路由键必须包含一个*
号,主要用于匹配路由键指定位置的一个单词。
topic模式还支持消息的routing key,用*
或者#
的模式进行绑定,*
匹配一个单词,#
匹配0个或者多个单词。
headers
它根据应用程序消息的特定属性进行匹配,可以在binding key中标记消息为可选或者是必选。在队列与交换机绑定时,会设定一组键值对规则,消息中也包括一组键值对(headers属性),当这些键值对中有一对或者全部匹配时,消息则会被投递到对应的队列中去。
fanout
消息广播的模式,即将消息广播到所有绑定到它的队列中,不考虑routing key所设定的值。此模式下routing key会被忽略掉。
通道
有些应用需要与AMQP代理建立多个连接,但是同时开启多个TCP连接会消耗过多的系统资源,并使得防火墙的配置变得更加困难,所以使用AMQP中的通道(channel)有时也会被称为频道,来处理多连接,可以理解成一个TCP连接的多个子连接。
一个特定的通道上的通信和其他通道上的通信是完全隔离的,因此,每个AMQP方法都需要携带一个通道号,这样客户端就可以指定方法连接的通道。
消息确认 (message acknowledgement)
当一个消息从消息队列中投递给消费者之后,消费者会通知消息代理,这个过程可以是自动的,也可以由处理消息的应用的开发者手动执行,当消息确认启动时,消息代理需要接受到来自消费者的确认回执才完全将消息从队列中删除,这个可以理解成TCP双向连接,当双方确认连接后,则完成一次消息投递。
如果消息无法被成功路由,或被返回给发送者并且被丢弃,或者是消息代理执行了延期操作,则消息会被放入到一个死信队列中,此时,消息的发送者可以选择某些参数来处理这些情况。
工作模式
- 简单模式:生产者,一个消费者
- 工作队列模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。
- 订阅模式:一个生产者发送的消息会被多个消费者获取。
- 路由模式:发送消息到交换机,并且要指定路由key(routing key),消费者在将队列绑定到交换机时需要指定路由key。
- topic模式:根据主题进行匹配,此时队列血药绑定在一个模式上,并且通过通配符来匹配。
AmqpTemplate
在Spring中也提供了AMQP协议的模板操作类,用于发送和接受消息,它定义发送和接受消息等操作,还提供了RabbitTemplate
用于实现AmqpTemplate
接口,而且还提供了错误抛出类AmqpException
。RabbitTemplate
支持消息的确认和返回。
send()
提供三个重载方法
- void send(Message message) throws AmqpException
- void send(String routingKey, Message message) throws AmqpException
- void send(String exchange, String routingKey, Message message) throws AmqpException
covertAndSend()
AmqpTemplate模板类还提供了这个方法来发送消息。此方法相当于是简化的send()
,可以自动处理消息的序列化。
@Test
public void send(){
Message message = MessageBuilder.withBody("content".getBytes())
.setContentType(...)
.setMessageId(..)
.setHeader(..)
.build();
amqpTemplate.send("test",message);
}
@Test
public void covertAndSend(){
amqpTemplate.convertAndSend("test","content");
}
这就是这两者的区别,但是实现的效果是一致的。
接受消息
接受消息可以有两种方式
- 一种为调用receive方法,如果该方法没有获得消息,则直接返回null,此方法不阻塞。
- 异步接收,通过注册一个Listener来实现消息的接收,接收消息需要指定队列,或者是设置默认的队列。
Spring Boot 集成
首先下载,安装,启动,配置账号密码,这些就略过了。
我们在pom文件中添加starter依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
本文所使用的SpringBoot版本为2.3.3发布版,之后我们编写接收者和发送者来进行测试。
在目录下新建receiver
和sender
包。
之后在其下新建ReceiverA
,ReceiverB
和QueueSender
类。
package com.imsle.rbmqtest.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "test") //监听队列
public class ReceiverA {
/**
* @方法名: QueueReceiver
* @说明:
* @param queue
* @return: void
* @作者: Seale
* @时间: 2020/08/25 12:51
*/
@RabbitHandler
public void QueueReceiver(String queue){
System.out.println("消息接收者A:\t"+queue);
}
}
/************************************************************************************/
package com.imsle.rbmqtest.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "test")
public class ReceiverB {
@RabbitHandler
/**
* @方法名: QueueReceiver
* @说明:
* @param queue
* @return: void
* @作者: Seale
* @时间: 2020/08/25 12:51
*/
public void QueueReceiver(String queue){
System.out.println("消息接收者B:\t"+queue);
}
}
/**********************************************************************************/
package com.imsle.rbmqtest.sender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class QueueSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String context){
System.out.println("发送者:\t"+context);
//将消息发送到队列中
rabbitTemplate.convertAndSend("test",context);
}
}
并且新建config包,在其下创建RabbitConfig
配置类
package com.imsle.rbmqtest.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public Queue queue(){
System.out.println("开启队列test");
return new Queue("test");
}
}
最后编写测试类
package com.imsle.rbmqtest.sender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class QueueSenderTest {
@Autowired
QueueSender queueSender;
@Test
public void sendTest(){
int time = 2 ;
for (int i = 0 ; i < time ; i++){
String msg = "发送信息\t第"+(i+1)+"次\t"+new Date();
queueSender.send(msg);
}
}
}
如果报错,那么去检查你的配置文件是否填写正确,在这里我修改过我的rabbitMQ的用户。
spring:
rabbitmq:
host: localhost
port: 5672
username: root
password: 123456
成功运行后的结果你将会看到
发送者: 发送信息 第1次 Tue Aug 25 13:10:11 CST 2020
发送者: 发送信息 第2次 Tue Aug 25 13:10:11 CST 2020
消息接收者B: 发送信息 第2次 Tue Aug 25 13:10:11 CST 2020
消息接收者A: 发送信息 第1次 Tue Aug 25 13:10:11 CST 2020