RabbitMQ
# 介绍
RabbitMQ 是个中间件 , 负责 接收/存储/转发 消息数据 . 类似 平时快递派送的过程
是什么?
MQ 本质是个队列 , 先进先出 消息推送 , 是种跨进程通信机制的上下游传递消息 . 主要解决不同对象通信的序列
为什么用?
- 流量消锋 : 如果系统最大多处理1W条请求 , 且还是高峰期的时候 , 很有可能会突破1W导致宕机 . MQ可以队列缓冲 , 防止宕机 (高峰期体验差 , 但能保障了不会宕机)
- 应用解耦 : 系统有多个子系统 , 在业务涉及多个子系统完成时 , 当中的一个子系统宕机了 , 导致该业务异常无法运作 . MQ可以将要处理的业务缓存到消息队列中 , 由消息队列进行访问子系统执行业务 , 防止不一致运作问题 , 提高可用性
- 异步处理 : 假如 A调用B , 但B需要执行很长一段时间 , 但A想知道B执行的进度 , 以往 会通过 A调用API查B进度 , 显然不优雅 . MQ可以使用消息总线 , A调用B后 , MQ会监控B进度(A实时得到B进度) , B处理完后 会发消息给MQ , 由MQ转发至A .
# MQ特性
# 可靠性
消息丢失一般有3情况 :
- 生产者丢失 通过 [发布确认](#发布确认模式 Publisher Confirms) , 确保 生产者 发送消息到MQ (SpringBoot应用
- MQ丢失 通过 持久化/死信队列/备用交换机/回退消息 , 确保消息在MQ中不会丢失
- 消费者丢失 通过 消息应答 , 确保 消费者 消费成功ack响应
# 顺序性
- 1个消费者消费1个队列是没有顺序问题的
- 多个消费者消费同一个队列时就出现消费顺序的问题 . 可以考虑将一个队列分为多个队列 , 将需要保证顺序的消息发到一个队列里 , 一个队列对应一个消费者
- 当消息在消费者端用多线程处理时 , 也会出现顺序问题 . 可以考虑在内存中维护多个队列 , 将MQ发来的需要保证顺序的消息放在同一个内存队列里 , 然后一个线程处理一个队列里的消息
# 幂等性
保证一条消息不会被重复消费 , 也不会对数据库产生影响
场景 :
- 手机验证码 , 只能使用一次 , 再次发送验证码 , 则会刷新原旧的验证码
- 订单支付 , 每个订单只能支付一次
解决方案 :
- 消息使用 全局ID (可 通过时间戳/UUID等方式) , 确保唯一性 , 当写入数据时先判断是否存在 , 存在就没必要插入了 , 保证了不会重复插入现象
- 采用 Redis 自带的天然幂等性
setnx
参考文章 : https://blog.csdn.net/zw791029369/article/details/109561457 (opens new window)
# MQ分类
# ActiveMQ
优点 : 单机 万级吞吐量 , 时效性s级 , 可用性高 , 基于主从架构实现高可用性 , 消息可靠性较低的概率丢失数据 缺点 : 官方社区对 ActiveMQ5.x 维护越来越少 , 高吞吐量场景较少使用
# Kafka
Kafka是大数据消息的中间件 , 满受大厂的采纳
优点 : 单机 百万级吞吐量 , 时效性ms级 , 运作稳定 ; 分布式 , 少数宕机 , 也不会造成影响 . 消息有序 , 有UI管理页面 , 日志实时更新
缺点 : 单机超过64个队列 , 消息队列多 , 响应长 (轮询) , 实时性取决轮询间隔 , 业务失败不能重试 , 社区更新慢
# RocketMQ
自 阿里巴巴 开源产品 , Java实现 , 参考了 Kafka设计 的改进版
优点 : 单机 十万级吞吐量 , 可用性高 , 分布式架构 , 消息0丢失 , 支持 10亿级别的消息堆积 , 数据堆积不会影响性能
缺点 : 语言拓展少 , 现阶段Java/C++实现 , 社区活跃一般
# RabbitMQ (学习)
是当前主流的消息中间件之一
优点 : 高并发 , 性能高 , 单机万级吞吐量 , 跨平台 , 多语言支持 , 文档齐全 , 社区活跃高 , 更新频繁
缺点 : 商业收费 , 学习成本高
# 安装
官方 : https://www.rabbitmq.com/download.html (opens new window)
下载
- MQ : RabbitMQ 下载地址 (opens new window) 选择以
noarch.rpm
结尾的安装包 - Erlang : Erlang 下载地址 (opens new window) ,
Erlang
和RabbitMQ
版本对照 (opens new window) (MQ采用Erang语言开发 , 因此需要安装环境)
注意Linux版本支持
**安装步骤 : **
Linux上传文件 , 创建目录文件放置里面 ==mkdir /usr/local/rabbitmq==
安装 Erlang , RabbitMQ , socat(MQ依赖插件)
# Erlang rpm -ivh erlang-21.3-1.el7.x86_64.rpm # 检查版本 quit退出 erl -v # socat 依赖插件 yum install socat -y # RabbitMQ rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm # 启动服务 systemctl start rabbitmq-server # 查看服务状态 (active绿色表示成功) systemctl status rabbitmq-server
# Web管理
方便 查阅/操作 MQ
# 安装
- 执行指令安装 ==rabbitmq-plugins enable rabbitmq_management==
- 重启MQ服务
- Web访问 http://ip:15672 (IP为 Linux地址)
- 账号密码为
guest
(账号密码相同) - 在终端中添加账号 , 并且给予权限
注意 :
- 安装前提关闭MQ服务
- Linux防火墙开放 15672端口(Web管理) , 5672端口(API连接)
# 用户管理
创建用户 ==rabbitmqctl add_user <用户名> <密码>==
查看用户 ==rabbitmqctl list_users==
修改密码 ==rabbitmqctl change_password <用户名> <新密码>==
删除用户
==rabbitmqctl delete_user <用户名>==
设置用户 ==rabbitmqctl set_user_tags <用户名> <角色>==
角色 | 说明 |
---|---|
administrator | 可以登录控制台、查看所有信息、并对rabbitmq进行管理 |
monToring | 监控者;登录控制台,查看所有信息 |
policymaker | 策略制定者;登录控制台指定策略 |
managment | 普通管理员;登录控制 |
权限分配
# 为用户添加资源权限,添加配置、写、读权限
# rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" bozhu ".*" ".*" ".*"
# 核心
官方API : https://rabbitmq.github.io/rabbitmq-java-client/api/current/ (opens new window)
以下几大模式通过 Java API实现 , 模式的发送过程 , 在项目中需要引入以下依赖
点击查看 Maven配置
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<!--指定 jdk 编译版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
# 简单模式 Hello World
Java API实现 , 模拟发送接收过程
基本方法
返回 | 方法 | 说明 |
---|---|---|
- | ==queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)== | 发送消息 |
String | ==basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)== | 接收消息 , 返回消息序列号 |
实现步骤 :
消息生产者
点击查看代码
public class Producer { // 队列名称 public static final String QUERY_NAME = "hello"; // 发消息 public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 基础信息 factory.setHost("192.168.186.128"); factory.setUsername("bozhu"); factory.setPassword("123123"); Connection connection = factory.newConnection(); // 频道 Channel channel = connection.createChannel(); /** * 生成队列 * 参数 : * 1. 队列名称 * 2. 队列消息是否持久化(是否磁盘存储) * 3. 队列是否进行消息共享(多个消费者共享) * 4. 是否自动删除(最后消费者开端连接后) * 5. 其他参数 */ channel.queueDeclare(QUERY_NAME, false, false, false, null); // 发消息 String msg = "hello world"; /** * 发送消息 * 1. 指定交换机 * 2. 路由key值(本次队列名称) * 3. 其他参数信息 * 4. 发送消息的消息体 */ channel.basicPublish("", QUERY_NAME, null, msg.getBytes()); System.out.println("send Success"); } }
消息消费者
点击查看代码
public class Consumer { public static final String QUERY_NAME = "hello"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 基础信息 factory.setHost("192.168.186.128"); factory.setUsername("bozhu"); factory.setPassword("123123"); Connection connection = factory.newConnection(); // 频道 Channel channel = connection.createChannel(); /** * 接收消息 * 1. 指定队列 * 2. 成功后是否自动应答 * 3. 未成功回调信息 * 4. 取消回调信息 */ channel.basicConsume(QUERY_NAME, true, (consumerTag, message) -> { System.out.println(new String(message.getBody())); }, consumerTag -> { System.out.println("消息被中断"); } ); } }
测试 , 运行 生产者 => Web查看业务队列 => 运行 消费者 => Web查看业务队列
以上步骤中看出 生产者在运行后 , Web管理中可以看到队列新增了条 消息 (需要等待消费者消费) . 当消费者运行后会消耗掉该 消息
消费者类 不能用 junit测试 接口写 , 否则没有监听的效果
# 工具类优化
实现复用 , 减少代码重写
public class RabbitMqUtils {
public static Channel getChanel() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.186.128");
factory.setUsername("bozhu");
factory.setPassword("123123");
Channel channel;
try {
Connection connection = factory.newConnection();
channel = connection.createChannel();
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
}
return channel;
}
}
# 工作模式 Work queues
工作模式 主要思想是为了避免消息密集型的形式堆积 , 工作模式可以在多线程消费者中进行分发任务
# 轮询分发
轮询消费队列中的数据 , 消费者们会轮询进行消费消息 (每个消息只能被消费一次)
大致实现 :
- 批发生产者
- 多线程消费者
- 运行测试 , 查看消费者的消费情况
利用上面的 工具类 获取频道 (复用代码)
生产者
/**
* 生产者
*/
public class Producer {
// 队列名称
public static final String QUERY_NAME = "hello";
// 发消息
public static void main(String[] args) throws Exception {
Channel chanel = RabbitMqUtils.getChanel();
String msg = "hello world";
chanel.queueDeclare(QUERY_NAME, false, false, false, null);
for (int i = 0; i < 10; i++) {
// 发消息
String str = msg + i;
chanel.basicPublish("", QUERY_NAME, null, str.getBytes());
System.out.println("发送成功 => " + str);
}
}
}
消费者
服务形式多线程运行 , 可以区分出 消费者分别为 C1
/C2
/C3
, 在当中配置字符 , 来区分消费者
/**
* 消费者
*/
public class Consumer {
public static final String QUERY_NAME = "hello";
public static void main(String[] args) throws IOException {
System.out.println(args[0]+" 已运行!");
Channel chanel = RabbitMqUtils.getChanel();
chanel.basicConsume(QUERY_NAME, false, (consumerTag, message) -> {
System.out.println("成功 =>"+consumerTag+" : "+new String(message.getBody()));
}, consumerTag -> {
System.out.println("失败");
});
}
}
运行测试
先运行所有 生产者(此时消费者会处于等待消费的状态) , 后运行 消费者 , 运行后可以看出消费者消费方式是轮询形式的
main方法运行会传递 args
参数 , 我们可以在以下进行传参运行
- 配置好 , 填充参数 (如果多参数需要空格分开)
- 打开服务(Alt+8) , 批量运行
# 消息应答
为了保证发送过程不丢失信息 , MQ引入了消息应答机制 . 例如 : 注册账号 , 填写表单信息 , 确认提交的过程
应答机制 : 在消费者消耗处理后 , 才会告诉 MQ 进行删除消息
MQ有两种应答机制 :
- 自动应答(默认)
- 手动应答
可以在 ==Channel.basicConsume()== 方法的 autoAck参数 进行控制 手动/自动
一般情况建议选择手动应答 , 防止数据丢失问题
# 自动应答
自动应答 是为了解决 高吞吐/安全传输 方面做出了权衡 .
MQ不在乎消费者是否处理完成 , 都会告诉MQ删除队列 .
情况 :
- 消费者 处理失败也没有异常 , 会 自动补偿 , MQ会重新向消费者投递消息
- 消费者 异常了 , MQ会认为消费成功 , 会对消息进行删除 , 导致数据丢失
**重新入队机制 : **
消费者处理消息过程 , 突然宕机 , 没有ack确认 , MQ得知消息未完全处理 , 会将其消息重新排队列 , 由其他消费者处理
# 手动应答
手动应答 , 消费者处理后 , 两种可能 :
- 消费者手动
ack
(确认应答) , 告诉MQ消息完成进行删除 - 消费者自动
nack
(拒绝应答) , 告诉MQ处理失败 , 消息不会删除
应答方法
返回 | 方法 | 说明 |
---|---|---|
void | ==basicAck(long deliveryTag, boolean multiple)== | 确认收到 |
void | ==basicReject(long deliveryTag, boolean requeue)== | 拒绝消息 |
void | ==basicNack(long deliveryTag, boolean multiple, boolean requeue)== | 拒绝收到 |
手动应答 和 重入队列机制 代码实例
生产者
/**
* 消息在手动应答是不丢失、放回队列中重新消费
* @author Sans
*/
public class Task {
public static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel chanel = RabbitMqUtils.getChanel();
// 声明对队列
chanel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
String msg = sc.next();
chanel.basicPublish("", QUEUE_NAME,null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息 => "+msg);
}
}
}
消费者
说明 : 配置两个消费者 , 一个消费者等待1s ; 另一个消费者等待10s . 参数分别是 :
- ==c1 1000==
- ==c2 10000==
/**
* 消费者
* @author Sans
*/
public class Consumer {
public static final String QUEUE_NAME = "ack_queue";
/**
* @param args [线程名 , 休眠执行时长]
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Channel chanel = RabbitMqUtils.getChanel();
System.out.println(args[0]+" : 运行");
chanel.basicConsume(QUEUE_NAME, false, (consumerTag, message)->{
// 等待
try {
Thread.sleep(Long.parseLong(args[1]));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("接受到的消息:"+new String(message.getBody(), StandardCharsets.UTF_8));
/*
* 手动应答
* 1.消息的标记Tag
* 2.是否批量应答 false表示不批量应答信道中的消息
*/
chanel.basicAck(message.getEnvelope().getDeliveryTag(),false);
},(consumerTag)->{
});
}
}
宕机模拟测试
先运行 生产者 , 后运行2个消费者 , 最后生产者输入消息 , 以时间线来分析运作过程
时间 | 生产者 | 消费者1(1s) | 消费者2(10s) |
---|---|---|---|
5s | 发送 11 ; 发送22 | 收到11->ack | |
10s | - | 收到22->ack | |
15s | 发送 33 : 发送44 | 收到33->ack | 收到44->...(关闭) |
16s | 收到44->ack |
# 持久化
持久化是将队列数据存储到磁盘中 , 并非在内存中 . 哪怕宕机停掉 , 不至于数据丢失的情况
# 队列持久化
在队列声明queueDeclare
方法 中的第二个参数设为 true
, 启动 队列持久化
==queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)==
注意 :
- 原先队列中有非持久化 , 且队列名相同 , 那么会抛出错误 , 需要删除原先队列 , 并重新声明
- 队列持久化 , 并不能进行对队列中的消息进行持久化
# 消息持久化
在消息发送的basicPublish
方法 中的第二个参数设为 MessageProperties.PERSISTENT_TEXT_PLAIN
, 启动 队列持久化
==basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)==
代码复用上面的即可 , 无需展示!
# 信道堆积
信道堆积 是指MQ发送到消费者的信道消息堆积数(缓冲区) . 缓冲区中默认情况是可以无限堆积的 , 因此需要自行控制堆积数 , 以防不必要的消息等待处理
信道堆积主要通过 ==basicQos(prefetchCount)==方法 控制信道堆积数(默认0->无限)
情况测试
当生产者大量生产消息 , 且又有多个消费者(效率不同)时 :
- 未指定堆积数 : 处理慢的消费者会堆积多个消息等待处理 , 处理快的可能会处于闲置状态
- 指定堆积数 : 处理慢的消费者 , 当消息堆积到达指定数值 , 轮询分发消息会跳过该消费者
轮询分发 : 会将所有消息平均分发被每个消费者中 , 等待消费
默认情况下每条信道能够堆积无数条
注意 :
- 应答需要设为 手动应答 , 否则qos , 不会生效
# 发布确认模式 Publisher Confirms
发布确认 是保证了消息完好的推送到MQ队列中 , 确保数据不会丢失 , 以便消费者消费使用
大致步骤 :
- 获取信道
- 信道启动 发布确认模式 ==confirmSelect()==方法
- 向MQ推送消息 ==basicPublish()==方法
- 向MQ发送确认 ==waitForConfirms()==方法
异步发布 走以下步骤
- MQ返回状态 至 监听器 进行回调
ack
/nack
(成功/失败) - 失败重新推送 (回至步骤3操作)
主要方法
返回 | 方法 | 说明 |
---|---|---|
void | ==confirmSelect()== | 启动 发布确认模式 |
boolean | ==waitForConfirms()== | 向MQ发送 发布确认 |
ConfirmListener | ==addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback)== | 监听器 , 监听发布状态 成功/失败 , 以lambda形式 回调它们 |
确认发布 可分为以下类型 :
- 单体 同步等待确认 , 简单 , 吞吐量有限
- 批量 批量同步等待确认 , 简单 , 一旦出问题难以判断
- 异步
高性能 , 采用 监听器监听发布状态 和
ConcurrentSkipListMap
哈希表 多线程管理 , 能够准确异常
耗时 : 单体 > 批量 > 异步
三种模式代码示例 :
秒表工具类
/**
* 秒表计时工具
*/
public class StopWatchUtils {
public static long start;
public static long totalTime;
public static void start(){
start = System.currentTimeMillis();
}
public static void stop(){
totalTime = System.currentTimeMillis() - start;
}
public static long getTotalTime() {
return totalTime;
}
}
测试耗时
点击查看代码
/**
* 1000条消息发布测试
*/
public class Producer {
public static long MSG_COUNT = 1000;
public static void main(String[] args) throws Exception {
// 单体发布测试 (耗时 => 1391ms
//publishMesIndividually();
// 批量发布测试 (耗时 => 85ms
//publishMesBatch();
// 异步发布测试 (耗时 => 28ms
publishMesAsync();
}
/**
* 异步发布
* 只管发 , 成功/失败 由监听器管
*/
private static void publishMesAsync() throws IOException {
Channel chanel = RabbitMqUtils.getChanel();
String queueName = UUID.randomUUID().toString();
// 队列声明 (持久化
chanel.queueDeclare(queueName, true, false, false, null);
// 启动确认发布
chanel.confirmSelect();
/*
线程安全有序的哈希表 , 使用高并发情况
- 轻松记录 序号与消息 的关联
- 通过序号轻松批量删除条目
- 支持多线程
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/*
监听器 (监听消息是否成功)
参数1: 消息确认回调
参数2: 消息失败回调
- deliveryTag 消息标记
- multiple 是否批量
*/
chanel.addConfirmListener(
(deliveryTag, multiple) -> {
// 是否批量
if (multiple) {
// 获取已经确认的视图
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
// 清除视图内容
confirmed.clear();
} else {
// 直接删除序号
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认消息 => " + deliveryTag);
},
(deliveryTag, multiple) -> {
String msg = outstandingConfirms.remove(deliveryTag);
System.out.println("失败消息[" + msg + "] => " + deliveryTag);
});
StopWatchUtils.start();
// 发消息
for (int i = 1; i <= MSG_COUNT; i++) {
String msg = "msg => " + i;
chanel.basicPublish("", queueName, null, msg.getBytes(StandardCharsets.UTF_8));
// 存下所有 生产者发送的消息 K(消息序列号):V(消息内容)
outstandingConfirms.put(chanel.getNextPublishSeqNo(), msg);
}
StopWatchUtils.stop();
System.out.println("耗时 => " + StopWatchUtils.getTotalTime() + "ms");
}
/**
* 批量发布
* 发布多个确认一次
*/
private static void publishMesBatch() throws Exception {
Channel chanel = RabbitMqUtils.getChanel();
String queueName = UUID.randomUUID().toString();
// 队列声明 (持久化
chanel.queueDeclare(queueName, true, false, false, null);
// 启动确认发布
chanel.confirmSelect();
// 批单位
int batchSize = 100;
StopWatchUtils.start();
for (int i = 1; i <= MSG_COUNT; i++) {
String msg = "msg => " + i;
chanel.basicPublish("", queueName, null, msg.getBytes(StandardCharsets.UTF_8));
if (i % batchSize == 0) {
if (chanel.waitForConfirms()) System.out.println("第" + i / batchSize + "批发送成功" + msg);
}
}
StopWatchUtils.stop();
System.out.println("耗时 => " + StopWatchUtils.getTotalTime() + "ms");
}
/**
* 单体发布
* 发布一次确认一次
*/
public static void publishMesIndividually() throws Exception {
Channel chanel = RabbitMqUtils.getChanel();
String queueName = UUID.randomUUID().toString();
// 队列声明 (持久化
chanel.queueDeclare(queueName, true, false, false, null);
// 启动确认发布
chanel.confirmSelect();
StopWatchUtils.start();
for (int i = 0; i < MSG_COUNT; i++) {
String msg = "msg => " + i;
chanel.basicPublish("", queueName, null, msg.getBytes(StandardCharsets.UTF_8));
if (chanel.waitForConfirms()) {
System.out.println("发送成功 " + msg);
}
}
StopWatchUtils.stop();
System.out.println("耗时 => " + StopWatchUtils.getTotalTime() + "ms");
}
}
# 发布/订阅 模式 Publish/Subscribe
发布订阅模式是生产者推送的消息 , 其他消费者都均可收到该消息
大致流程
- 声明交换机 , 并设置
fanout
类型 - 生产者发送消息 到交换机
- 消费者队列绑定交换机 , 并配置
RoutingKey
路由规则 (类似订阅) - 消费者接收消息
# 交换机 Exchange
生产者生产的消息不会直接发送到队列中的 , 而是发送到交换机 , 由交换机推入队列
交换机类型 : (点击跳转代码示例)
- direct(点对点) (默认)
交换机会匹配 生产者发送的
RoutingKey
与 消费者队列绑定交换机的RoutingKey
. 相同才能实现点对点发送 . 如果没有匹配到一个 , 很有可能会丢失数据 - topic(发布订阅)
交换机会 通配符匹配 生产者发送的
RoutingKey
与 消费者队列绑定交换机的RoutingKey
, 符合条件的队列都会收到分发的消息 - fnout(广播)
只要消费者绑定有该类型交换机 , 不管
RoutingKey
是否匹配 , 都会接收广播消息 - ...
交换机 声明方法
==Channel.exchangeDeclare(String exchange, String type)==
- exchange : 交换机名称
- type : 交换类型
# 临时队列
临时队列 , 字面意思暂时使用的队列
队列特性 :
- 随机名称
- 断开消费者连接队列自动删除
Web管理页中 , 可以看到队列状态是 AD
Excl
(自动删除)
临时队列创建
==queueDeclare()==
一般去会获取队列标识进行食用 ==chanel.queueDeclare().getQueue();==
# 绑定 binding
Exchange交换机 创建后需要绑定队列才会进行推送消息至消费者 . 可以绑定多个队列 , 消息推送是根据 Routing Kye
(路由规则) 来确定指定队列
点击查看代码
交换机只负责转发消息 , 并没有存储消息的能力 , 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列 , 那么消息会丢失
绑定方法
==queueBind(String queue, String exchange, String routingKey)==
- queue : 队列名
- exchange : 交换机
- routing Key : 路由Key
重载方法 , 可在最后面携带参数 , 详细自行API
Web管理页 绑定
Exchanges -> 指定Exchange交换机 -> Bindings
2
# direct
交换机会匹配 生产者发送的 RoutingKey
与 消费者队列绑定交换机的 RoutingKey
. 相同才能实现点对点发送 . 如果没有匹配到一个 , 很有可能会丢失数据
代码示例 : 点击跳转
# topic
交换机会根据 通配符匹配 生产者发送的 RoutingKey
与 消费者队列绑定交换机的 RoutingKey
, 符合条件的队列都会收到分发的消息
通配符说明 :
*
代替一个单词#
代替0个/多个单词
特殊情况 :
- 如果只有一个
#
那么将会接收通道的所有数据- 如果没有
#
/*
出现 , 默认采用direct
匹配案例 :
RoutingKey | 通配值 | 说明 |
---|---|---|
com.sans.color | *.sans.* | 匹配3个单词中的中间单词 sans |
com.sans.color.red | #.red | 匹配最后为 red |
com.sans.color.blue | con.# | 匹配开头为 com |
代码示例 : 点击跳转
# fnout
只要消费者绑定有该类型交换机 , 不管RoutingKey
是否匹配 , 都会接收广播消息
代码示例 : [点击跳转](#广播 fnout)
# 代码实战
通用代码 , 参数自控
食用说明 :
生产者
/**
* 广播发送
* @author Sans
*/
public class Producer {
/**
* @param args [交换机名, 交换机类型, 路由key]
* @throws Exception
*/
public static void main(String[] args) throws Exception {
String exchangeName = args[0];
String exchangeTypeParam = args[1];
// 如果未赋予值 , 默认为 ""
String routingKey = args.length == 2 ? "" : args[2];
System.out.println("[交换机名, 交换机类型, 路由key]");
System.out.println(Arrays.toString(args));
// 枚举验证
BuiltinExchangeType exchangeType = null;
for (BuiltinExchangeType value : BuiltinExchangeType.values()) {
if (value.getType().equals(exchangeTypeParam)) exchangeType = value;
}
if (exchangeType == null) return;
Channel channel = RabbitMqUtils.getChanel();
/*
声明一个exchange
1.exchange的名称
2.exchange的类型
*/
channel.exchangeDeclare(exchangeName, exchangeType);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
// 发布消息
channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息:" + message);
}
}
}
消费者
/**
* 消费者
* @author Sans
*/
public class Consumer {
/**
* @param args [交换机名, 路由key]
* @throws Exception
*/
public static void main(String[] args) throws Exception {
String exchangeName = args[0];
String routingKey = args[1];
System.out.println("[交换机名, 路由key]");
System.out.println(Arrays.toString(args));
Channel chanel = RabbitMqUtils.getChanel();
// 临时队列
String queueName = chanel.queueDeclare().getQueue();
// 绑定交换机
chanel.queueBind(queueName, exchangeName, routingKey);
// 接收消息
chanel.basicConsume(queueName, true,(consumerTag,message)->{
System.out.println("收到消息 => "+new String(message.getBody(), StandardCharsets.UTF_8));
},consumerTag->{});
}
}
以上Main中接收参数分别说明
身份 | 接收参数 |
---|---|
生产者 | [交换机名, 交换机类型, 路由key] |
消费者 | [交换机名, 路由key] |
# direct点对点测试
运行顺序 | 应用程序服务 | 参数传递(直接复制即可) |
---|---|---|
1 | 生产者 P1 | ==color direct blue== |
2 | 消费者 C1 | ==color blue== |
3 | 消费者 C2 | ==color black== |
结果 : C1消费了 , C2无消费 . 只有 RoutingKey
匹配的消费者消费消息
# topic发布订阅
运行顺序 | 应用程序服务 | 传递参数(直接复制即可) |
---|---|---|
1 | 生产者 P1 | ==color topic com.sans.red== |
2 | 消费者 C1 | ==color #.red== |
3 | 消费者 C2 | ==color com.#== |
4 | 消费者 C3 | ==color com.*== |
5 | 消费者 C4 | ==color *.red== |
6 | 消费者 C5 | ==color #== |
7 | 消费者 C6 | ==color *== |
结果 : C1 , C2 , C5 消费者消费了 , 其余未消费 . 只有 RoutingKey
通配符匹配 的消费者消费消息
# 广播 fnout
运行顺序 | 应用程序服务 | 传递参数(直接复制即可) |
---|---|---|
1 | 生产者 P1 | ==color fnout red== |
2 | 消费者 C1 | ==color black== |
3 | 消费者 C2 | ==color blue== |
4 | 消费者 C3 | ==color yellow== |
结果 : 所有消费者都消费了 . 消费者收到消息不会受到 RoutingKey
的影响 , 只需绑定就可以收到通知
# 进阶
# 死信队列
死信 是无法被消费的消息
情况 : 生产者 发送消息 MQ , 消费者 从 队列 取出 , 由于某些原因导致 队列 中的某些消息无法被消费 , 这样的消息没有得到处理 , 称之为死信
场景 : 用户下订单时 , 点击支付 , 但又未在指定时间支付 , 死信队列机制会误认为异常消息 , 消息将会投入死信队列中
触发机制 :
- 消息 TTL 过期
- 队列到达最大长度
- 消息被拒 , 使用
channel.basicNack
/channel.basicReject
应答 , 并且参数requeue
为false(不回流队列)
交换机 和 队列 中的配置 , 一旦有修改 需要删除 重新运行
Web管理页中 , 可以看到队列状态是 DLX
(死信交换机) , DLK
(死信routingKey)
**代码示例 : **
初始化构架声明
点击查看代码
/**
* 初始化架构
* 初始化 交换机 , 队列 结构信息
* @author Sans
*/
public class InitialArchitecture {
// 普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机的名称
public static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列的名称
public static final String NORMAL_QUEUE = "normal_queue";
// 死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel chanel = RabbitMqUtils.getChanel();
String normalRoutingKey = "sans";
String deadRoutingKey = "dead";
// 重新初始化 , 删除以往相同名称的 交换机和队列
chanel.exchangeDelete(NORMAL_EXCHANGE);
chanel.exchangeDelete(DEAD_EXCHANGE);
chanel.queueDelete(NORMAL_QUEUE);
chanel.queueDelete(DEAD_QUEUE);
// 声明 普通,死信 交换机
chanel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
chanel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String, Object> arguments = new HashMap<>();
// 正常的队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 设置死信routingKey
arguments.put("x-dead-letter-routing-key", deadRoutingKey);
/* 超出长度 测试
设置队列最大长度
*/
//arguments.put("x-max-length", 6);
// 声明 普通队列 , 死信队列
chanel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
chanel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 普通队列 绑定 普通交换机
chanel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, normalRoutingKey);
// 死信队列 绑定 死信交换机
chanel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, deadRoutingKey);
}
}
生产者
点击查看代码
/**
* 生产者
* @author Sans
*/
public class Producer {
// 普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel chanel = RabbitMqUtils.getChanel();
String routingKey = "sans";
for (int i = 0; i < 10; i++) {
String msg = "msg " + i;
/** TTL 超时测试
参数3 : 构造者构建参数 ttl时间参数
*/
chanel.basicPublish(NORMAL_EXCHANGE,
routingKey,
//new AMQP.BasicProperties().builder().expiration("10000").build(),
null,
msg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功 => " + msg);
}
}
}
消费者1
点击查看代码
/**
* 消费者1
* @author Sans
*/
public class Consumer1 {
// 普通队列的名称
public static final String NORMAL_QUEUE = "normal_queue";
public static void main(String[] args) throws Exception {
Channel chanel = RabbitMqUtils.getChanel();
// 处理消息
chanel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("接收消息 => " + msg);
// 消息拒绝 测试
//long deliveryTag = message.getEnvelope().getDeliveryTag();
//if ("msg 4".equals(msg)) {
// /* 拒绝 消息/收到
// 拒绝收到 (参数3: false防止回流队列): chanel.basicNack(deliveryTag, false, false);
// 拒绝消息 (参数2: false防止回流队列): chanel.basicReject(deliveryTag, false);
// */
// //chanel.basicNack(deliveryTag, false, false);
// chanel.basicReject(deliveryTag, false);
// System.out.println("拒绝 => " + msg);
//}else{
// chanel.basicAck(deliveryTag, false);
// System.out.println("接收消息 => " + msg);
//}
}, consumerTag -> {
});
}
}
消费者2 (死信消费)
点击查看代码
/**
* 消费者2
* @author Sans
*/
public class Consumer2 {
// 死信队列的名称
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel chanel = RabbitMqUtils.getChanel();
// 处理消息
chanel.basicConsume(DEAD_QUEUE, true, (consumerTag, message) -> {
System.out.println("接收消息 => " + new String(message.getBody(), StandardCharsets.UTF_8));
}, consumerTag -> {
});
}
}
# 测试
触发死信机制 : 有3种触发条件 , 分别围绕它们的条件进行测试
过期触发TTL
- 配置 生产者发送方法 ==basicPublish()== 中的第三参数 添加消息属性 (采用构造者模式构造类) ==new AMQP.BasicProperties().builder().expiration("10000").build()== (TTL设置10s过期)
- 运行 初始化架构
- 运行 生产者
- 观察 Web管理页 , 普通队列中的所有消息 过期推送至 死信队列中
- 运行 消费者2 , 清除死信队列中的消息
队列到达最大长度
- 配置 初始化架构 , 在 声明普通队列 ==queueDeclare()==方法 中的第五个参数 编辑队列属性 Map集合新增 : ==arguments.put("x-max-length", 6);== (队列设置最大长度为6条消息)
- 运行 初始化架构
- 运行 生产者
- 运行 消费者1
- 观察 Web管理页 , 普通队列中的消息 , 有部分消息会被排挤到 死信队列中
- 运行 消费者2 , 清除死信队列中的消息
注意 :
- 初始化架构类中的配置 一旦修改了 , 则需要删除掉原旧的 交换机/队列 , 在运行 (配置更改了会冲突)
- 为了尽可能的展现消息进入 死信队列中 , 要确保上一次所修改的配置是否还原 , 以防上次的配置影响数据混乱
消息被拒绝
配置 消费者1 , 手动应答请求 , 指定部分拒绝接收
chanel.basicConsume(NORMAL_QUEUE, false, (consumerTag, message) -> { String msg = new String(message.getBody(), StandardCharsets.UTF_8); long deliveryTag = message.getEnvelope().getDeliveryTag(); if ("msg 4".equals(msg)) { /* 拒绝 消息/收到 拒绝收到 (参数3: false防止回流队列): chanel.basicNack(deliveryTag, false, false); 拒绝消息 (参数2: false防止回流队列): chanel.basicReject(deliveryTag, false); */ //chanel.basicNack(deliveryTag, false, false); chanel.basicReject(deliveryTag, false); System.out.println("拒绝 => " + msg); }else{ chanel.basicAck(deliveryTag, false); System.out.println("接收消息 => " + msg); } }, consumerTag -> { });
运行 初始化架构
运行 生产者1
运行 消费者1
观察 Web管理页 , 普通队列中的消息 , 有一条
msg4
消息 被拒绝 至死信队列中运行 消费者2 , 清除死信队列中的消息
# 整合SpringBoot
Springboot文档 : https://docs.spring.io/spring-amqp/reference/html/amqp.html
# 快速入门
引入依赖
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application配置
server:
port: 8088
spring:
rabbitmq:
host: 8.130.47.114
port: 5672
username: bozhu
password: 123123
端口容易冲突 , 建议修改
RabbitmqConfig配置类
交换机和队列 , 以及绑定 , 等操作是通过实例化形式进行的 , 以下由配置类形式展示
简单结构
点击查看代码
@Configuration
public class RabbitmqConfig {
// 交换机
String aExchange = "a_exchange";
String deadExchange = "dead_exchange";
// 队列
String aQueue = "a_queue";
String deadQueue = "dead_queue";
String routingKeyA = "RKA"
String routingKeyB = "RKB"
/**
* 交换机
*/
@Bean
public DirectExchange aExchange() {
return new DirectExchange(aExchange);
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchange);
}
/**
* 队列
*/
@Bean
public Queue aQueue() {
// 配置 死信交换机, 死信RoutingKey, 过期TTL
return QueueBuilder.durable(aExchange)
.deadLetterExchange(deadExchange)
.deadLetterRoutingKey(routingKeyB)
.ttl(10*1000)
.build();
}
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(deadQueue).build();
}
/**
* 绑定
*/
@Bean
public Binding bindingAqueueToAexchange(Queue aQueue , DirectExchange aExchange) {
return BindingBuilder.bind(aQueue).to(aExchange).with(routingKeyA);
}
@Bean
public Binding queueBbindX(Queue deadQueue , DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(routingKeyB);
}
}
# 操作对象
类 | 说明 |
---|---|
RabbitTemplate | 简化的 发送/接收 消息类 |
AmqpAdmin | 携带式对AMQP管理操作类 |
生产者发送消息 : ==RabbitTemplate.convertAndSend()==
# 消费者
@Component
public class QueueConsumer {
// 队列处理消息
// 注解指定队列名
@RabbitListener(queues = "QA")
public void receiveDead(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("处理消息 => " + msg);
}
}
# 延迟队列
延迟队列 是用来存放到指定时间才被释放消息的队列
延迟队列机制 和 死信 TTL过期 相似 , 但延迟队列对时间的控制较灵活 , 应用应用场景广泛
场景 :
- 下单十分钟未支付自动取消订单
- 新建店铺 , 如果十天未上传商品 , 则自动发送消息提醒
- 新用户注册后 , 三天未登录则进行短信提醒
- 用户发起退款 , 如果三天未处理则通知相关运营人员
- 会议预定 , 指定时间的前十分钟提醒参加会议
可以控制任意时间点发送通知
# 应用方式
延迟配置
在 队列 中配置延迟属性
/* 变量说明
NORMAL_QUEUE_A : 队列名称
DEAD_EXCHANGE_Y : 死信交换机名称
ROUTING_DEAD_QUEUE_KEY : 死信RoutingKey
*/
@Bean
public Queue aQueue() {
// 配置 死信交换机, 死信RoutingKey, 过期TTL
return QueueBuilder.durable(NORMAL_QUEUE_A)
.deadLetterExchange(DEAD_EXCHANGE_Y)
.deadLetterRoutingKey(ROUTING_DEAD_QUEUE_KEY)
.ttl(10*1000)
.build();
}
在 生产者 消息发送方法 中配置延迟
指定消息TTL在队列中超时时长 : ==message.getMessageProperties().setExpiration(ttl);==
// 发送消息方法
rabbitTemplate.convertAndSend(NORMAL_EXCHANGE_X, ROUTING_KEY_QUEUE_C, msg,
message -> {
message.getMessageProperties().setExpiration(ttl);
return message;
}
);
# 代码示例
架构 :
初始化架构 RabbitmqConfig
点击查看代码
@Configuration
public class RabbitmqConfig {
// 普通/死信 交换机
public static final String NORMAL_EXCHANGE_X = "EX";
public static final String DEAD_EXCHANGE_Y = "EY";
// 死信 队列
public static final String DEAD_QUEUE = "QD";
// 普通 队列
public static final String NORMAL_QUEUE_A = "QA";
public static final String NORMAL_QUEUE_B = "QB";
public static final String NORMAL_QUEUE_C = "QC";
// RoutingKey
public static final String ROUTING_KEY_QUEUE_A = "RQA";
public static final String ROUTING_KEY_QUEUE_B = "RQB";
public static final String ROUTING_KEY_QUEUE_C = "RQC";
// 死信 RoutingKey
public static final String ROUTING_DEAD_QUEUE_KEY = "RQD";
/**
* 交换机
*/
@Bean
public DirectExchange xExchange() {
return new DirectExchange(NORMAL_EXCHANGE_X);
}
@Bean
public DirectExchange yExchange() {
return new DirectExchange(DEAD_EXCHANGE_Y);
}
/**
* 队列
*/
@Bean
public Queue aQueue() {
// 配置 死信交换机, 死信RoutingKey, 过期TTL
return QueueBuilder.durable(NORMAL_QUEUE_A)
.deadLetterExchange(DEAD_EXCHANGE_Y)
.deadLetterRoutingKey(ROUTING_DEAD_QUEUE_KEY)
.ttl(10*1000)
.build();
}
@Bean
public Queue bQueue() {
// 配置 死信交换机, 死信RoutingKey, 过期TTL
return QueueBuilder.durable(NORMAL_QUEUE_B)
.deadLetterExchange(DEAD_EXCHANGE_Y)
.deadLetterRoutingKey(ROUTING_DEAD_QUEUE_KEY)
.ttl(40*1000)
.build();
}
@Bean
public Queue cQueue() {
// 配置 死信交换机, 死信RoutingKey
return QueueBuilder.durable(NORMAL_QUEUE_C)
.deadLetterExchange(DEAD_EXCHANGE_Y)
.deadLetterRoutingKey(ROUTING_DEAD_QUEUE_KEY)
.build();
}
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
/**
* 绑定
* 比较示例 : 普通队列绑定 普通队列 --- 普通交换机
* chanel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, normalRoutingKey);
*/
@Bean
public Binding queueAbindingX(Queue aQueue , DirectExchange xExchange) {
return BindingBuilder.bind(aQueue).to(xExchange).with(ROUTING_KEY_QUEUE_A);
}
@Bean
public Binding queueBbindingX(Queue bQueue , DirectExchange xExchange) {
return BindingBuilder.bind(bQueue).to(xExchange).with(ROUTING_KEY_QUEUE_B);
}
@Bean
public Binding queueCbindingX(Queue cQueue, DirectExchange xExchange) {
return BindingBuilder.bind(cQueue).to(xExchange).with(ROUTING_KEY_QUEUE_C);
}
@Bean
public Binding queueDbindingY(Queue deadQueue , DirectExchange yExchange) {
return BindingBuilder.bind(deadQueue).to(yExchange).with(ROUTING_DEAD_QUEUE_KEY);
}
}
生产者 ProducerSendMessageController
点击查看代码
@RestController
@RequestMapping("/producer")
public class ProducerSendMessageController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/{msg}")
public void sendMessage(@PathVariable String msg) {
System.out.println("收到请求 => " + msg);
// 发送消息
rabbitTemplate.convertAndSend(NORMAL_EXCHANGE_X, ROUTING_KEY_QUEUE_B, "B队列(40 000ms) =>" + msg);
rabbitTemplate.convertAndSend(NORMAL_EXCHANGE_X, ROUTING_KEY_QUEUE_A, "A队列(10 000ms) =>" + msg);
}
/**
* QC 发送消息(自定义时长)
* @param msg 消息
* @param ttl 过期时长 ms
*/
@GetMapping("/qcSend/{msg}/{ttl}")
public void sendMessage(@PathVariable String msg, @PathVariable String ttl) {
System.out.printf("收到请求 => [%s, %s]%n", msg, ttl);
rabbitTemplate.convertAndSend(NORMAL_EXCHANGE_X, ROUTING_KEY_QUEUE_C, msg,
message -> {
message.getMessageProperties().setExpiration(ttl);
return message;
}
);
}
}
死信消费者 DeadQueueConsumer
@Component
public class DeadQueueConsumer {
// 死信处理
@RabbitListener(queues = DEAD_QUEUE)
public void receiveDead(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("死信消费 => " + new Date() + " : " + msg);
}
}
# 代码测试
测试 QA 和 QB 队列 TTL过期测试
- 访问 http://localhost:8088/producer/send/lisi
- 观察 控制台 , 死信队列打印的消息(等待10s/40s)
他们队列是分开分发一条消息 互不干扰 , 等待时间分别是 10s/40s
测试 QC 队列多消息
连续访问 http://localhost:8088/producer/qcSend/GOGO/10000
连续访问 http://localhost:8088/producer/qcSend/GOGOGO/15000
连续访问 http://localhost:8088/producer/qcSend/GO/5000
观察 控制台 , 消息是共同在队列中进行等待时间
收到请求 => [GOGO, 10000] 收到请求 => [GOGOGO, 15000] 收到请求 => [GO, 5000] 死信消费 => Mon Mar 06 11:25:59 CST 2023 : GOGO 死信消费 => Mon Mar 06 11:26:05 CST 2023 : GOGOGO 死信消费 => Mon Mar 06 11:26:05 CST 2023 : GO
在 多消息同一队列中 , 消息的TTL过期时间是同时加载的 , 并且是有序的 .
问题不难发现 , 同一时间发送两条消息 , 如果 第一个消息的TTL 大于 第二条消息的TTL , 即使 第二条消息的TTL已超时 , 也必须等待 第一条消息的TTL过期 , 最后也是按照先后顺序处理消息
# 插件拓展
该插件正是解决上面的问题 , 采用交换机的插件类型 , 能够实现 消息TTL等待是在交换机中等待执行 , 避免了在队列中排队的问题
插件官网 : https://www.rabbitmq.com/community-plugins.html
插件GitHub : https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
安装 :
进入MQ插件目录 , 并将下载好的插件放进去
.ez
格式 (选择自己的版本号) ==cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins==安装插件 (不需 填写版本和文件后缀) ==rabbitmq-plugins enable rabbitmq_delayed_message_exchange==
重启RabbitMQ ==systemctl restart rabbitmq-server==
观察 Web管理页 , 新增交换机 , 查看类型是否多出了
x-delayed-message
类型
下载注意版本兼容问题
# 应用方式
交换机声明
采用 自定义交换机 进行实例化声明
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new CustomExchange(
DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
消费者延迟发送
往消息添加 x-delay
头 属性 , 延迟功能 ==message.getMessageProperties().setDelay(ttl);==
/* 变量说明
DELAYED_EXCHANGE_NAME : 交换机名称
DELAYED_ROUTING_KEY : RoutingKey
msg : 发送消息
ttl : 消息延迟时长
*/
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY ,msg , message -> {
// 为消息属性添加延迟功能
message.getMessageProperties().setDelay(ttl);
return message;
});
# 代码示例
例图就不展示了 , 一个交换机: delayed.exchange , 一个队列: delayed.queue , 生产者 , 消费者
主要测试 队列TTL过期循序问题
初始化架构 DelayedQueueConfig
@Configuration
public class DelayedQueueConfig {
// 交换机, 队列, routingKey
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
/** 自定义交换机
* 由于是自定义类型
*/
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new CustomExchange(
DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
}
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public Binding delayedQueueBindingDelayedExchange(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者
@RestController
@RequestMapping("/producer")
public class ProducerSendMessageController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/delayed/{msg}/{ttl}")
public void sendDelayedMessage(@PathVariable String msg, @PathVariable Integer ttl) {
System.out.printf("收到请求 => [%s, %s]%n", msg, ttl);
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY,msg , message -> {
// 设置方式和以往不同 , 这个是设置延迟
// message.getMessageProperties().setExpiration(ttl + "");
message.getMessageProperties().setDelay(ttl);
return message;
});
}
}
消费者
/**
* 消费者
* @author Sans
*/
@Component
public class QueueConsumer {
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void delayedConsumer(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消费消息 => " + new Date() + " : " + msg);
}
}
# 代码测试
队列多消息测试
连续访问 http://localhost:8088/producer/delayed/GOGO/10000
连续访问 http://localhost:8088/producer/delayed/GOGOGO/15000
连续访问 http://localhost:8088/producer/delayed/GO/5000
观察 控制台 , 消息的延迟时间 是否按照有小到大的顺序 进行处理
收到请求 => [GOGO, 10000] 收到请求 => [GOGOGO, 15000] 收到请求 => [GO, 5000] 消费消息 => Mon Mar 06 11:29:26 CST 2023 : GO 消费消息 => Mon Mar 06 11:29:29 CST 2023 : GOGO 消费消息 => Mon Mar 06 11:29:35 CST 2023 : GOGOGO
在Web管理页中 , 是在交换机查看 延迟的消息 , 并非在队列中查看 , 消息直到延迟时间到期才会释放
# 发布确认高级
生产者发送消息后会进行备份到缓存中 , 如果成功则从缓存删除该备份的消息 , 否则在缓存中执行定时任务 , 重新从缓存中重新发布至 交换机 , 直到成功为止
大致流程
配置 ==spring.rabbitmq.publisher-confirm-type: correlated==
声明基本架构 (交换机/队列/绑定)
生产者正常发送消息
消费者正常接收消息
编写 回调类 , 实现
RabbitTemplate.ConfirmCallback
回调接口 (交换机)重写
confirm()
回调方法 (成功/失败 都会走该方法)内部类接口注入 (由于是内部类不能直接拿去使用 , 不过可以通过以下形式注入其中)
@Resource RabbitTemplate rabbitTemplate; /* @PostConstruct注解 在配置类 执行的构造函数 和 自动注入 后执行初始化的方法(类似servlet的init()方法) */ @PostConstruct public void init() { // 发布确认 rabbitTemplate.setConfirmCallback(this); }
测试
配置
生产者的类型确认使用 spring.rabbitmq.publisher-confirm-type
配置值 | 说明 |
---|---|
none(默认) | 不做任何确认操作 |
correlated | 消息到交换机触发 回调 |
simple | 通过手动 waitForConfirms() 返回结果回答(少用) |
# 代码示例
例图就不展示了 , 一个交换机: confirm.exchange , 一个队列: confirm.queue , 生产者 , 消费者
配置
==spring.rabbitmq.publisher-confirm-type: correlated==
基本架构
@Configuration
public class ConfirmConfig {
// 交换机, 队列, routingKey
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String CONFIRM_ROUTING_KEY = "confirm.routingkey";
@Bean
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding confirmQueueBindingConfirmExchange(Queue confirmQueue, DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
生产者
@RestController
@RequestMapping("/producer")
public class ProducerSendMessageController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/confirm/{msg}")
public void confirmSendMessage(@PathVariable String msg) {
System.out.println("confirmSend => " + msg);
// 回调相关数据对象
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, CONFIRM_ROUTING_KEY, msg, correlationData);
}
}
消费者
@Component
public class QueueConsumer {
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void confirmConsumer(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("confirmConsumer => " + new Date() + " : " + msg);
}
}
回调类
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Resource
RabbitTemplate rabbitTemplate;
// 注入内部类接口
@PostConstruct
public void init() {
// 发布确认
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换机确认回调方法
* @param correlationData 回调的相关数据
* @param ack ack为true,nack为false
* @param cause 原因,对于nack,如果可用,否则为null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData == null ? "" : correlationData.getId();
if (ack) {
System.out.println("Success => " + id);
} else {
System.out.println("Failure => " + id + " [" + cause + "] ");
}
}
}
# 代码测试
根据可控变量分析 , 可分析出可能情况 :
- 生产者 找不到交换机
- 找到交换机 , 但找不到队列
验证请求 : http://localhost:8088/producer/confirm/GO
生产者 找不到交换机
模拟故障操作 : 更改 生产者发送方法所指定的交换机名 , 试图寻找个不存在的交换机
测试结果 : 触发 ConfirmCallback()
回调函数 , 失败 , 找不到交换机
confirmSend => GO
Failure => confirm2 [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'no' in vhost '/', class-id=60, method-id=40)]
找到交换机 , 但找不到队列
模拟故障操作 : (以下两个故障结果是一致的)
- 更改 生产者发送方法指定的RoutingKey
- 新建交换机 , 不进行绑定队列
测试结果 : 触发 ConfirmCallback()
回调函数 , 成功 , 但消息没有得到消费 . 由于找不到队列消息而丢失
confirmSend => GO
Success => confirm3
通过以上情况测试不难发现 , 仅靠 交换机 的确认是不行的 , 还需要在 队列 中进行确认消息 !
可以通过退回消息 , 解决该问题
# 回退消息
回退消息 主要功能是 确认消息发到队列中 . 也解决了上面进入队列确认的问题
实现 基于 发布确认代码 新增 配置/代码
配置 ==spring.rabbitmq.publisher-returns: true==
编写 回调类 , 实现
RabbitTemplate.ReturnsCallback
回调接口 (队列)重写
returnedMessage()
回调方法 , 找不到交换机失败回调内部类接口注入 (由于是内部类不能直接拿去使用 , 不过可以通过以下形式注入其中)
# 代码示例
复用上面的 发布确认代码
配置 ==spring.rabbitmq.publisher-returns: true==
生产者
点击查看代码
@GetMapping("/messageFallback/{msg}")
public void messageFallbackSend(@PathVariable String msg) {
System.out.println("messageFallbackSend => " + msg);
// 发送成功
CorrelationData correlationData1 = new CorrelationData("messageFallback1");
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, CONFIRM_ROUTING_KEY, msg, correlationData1);
// 生产者 找不到交换机
//CorrelationData correlationData2 = new CorrelationData("messageFallback2");
//rabbitTemplate.convertAndSend("no", "", msg, correlationData2);
// 交换机 找不到RoutingKey
//CorrelationData correlationData3 = new CorrelationData("messageFallback3");
//rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, "no", msg, correlationData3);
// 交换机 找不到队列
//CorrelationData correlationData4 = new CorrelationData("messageFallback4");
//rabbitTemplate.convertAndSend("testNoNull", "", msg, correlationData4);
}
回调类
点击查看代码
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Resource
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 发布确认
rabbitTemplate.setConfirmCallback(this);
// 消息回退
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交换机确认回调方法
* @param correlationData 回调的相关数据
* @param ack ack为true,nack为false
* @param cause 原因,对于nack,如果可用,否则为null
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData == null ? "" : correlationData.getId();
if (ack) {
System.out.println("Success => " + id);
} else {
System.out.println("Failure => " + id + " [" + cause + "] ");
}
}
/**
* 回退消息
* 当消息过程不能达到目的地 , 则将消息返回给生产者
* @param returned 返回的消息和元数据
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("回退消息 ==>");
System.out.println(" 消息: " + new String(returned.getMessage().getBody(), StandardCharsets.UTF_8));
System.out.println(" 交换机名: " + returned.getExchange());
System.out.println(" RoutingKey: " + returned.getRoutingKey());
System.out.println(" 退回原因: " + returned.getReplyText());
}
}
# 代码测试
根据可控变量分析 , 可分析出可能情况 :
- 生产者 找不到交换机
- 生产者 找到交换机 , 但找不到队列
验证请求 : http://localhost:8088/producer/messageFallback/GO
生产者 找不到交换机
模拟故障操作 : 更改 生产者发送方法所指定的交换机名 , 试图寻找个不存在的交换机
测试结果 : 触发 ConfirmCallback()
回调函数 , 失败 , 找不到交换机
messageFallbackSend => GO
Failure => messageFallback2 [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'no' in vhost '/', class-id=60, method-id=40)]
生产者 找到交换机 , 但找不到队列
模拟故障操作 : (以下两个故障结果是一致的)
- 更改 生产者发送方法指定的RoutingKey
- 新建交换机 , 不进行绑定队列
测试结果 : 触发 ConfirmCallback()
(成功)和returnedMessage()
(失败) 回调函数 , 但消息没有得到消费 . 由于找不到队列消息而丢失
messageFallbackSend => GO
回退消息 ==>
消息: GO
交换机名: confirm.exchange
RoutingKey: no
退回原因: NO_ROUTE
Success => messageFallback3
# 备用交换机
备用交换机 , 字面意思 , 当某一交换机匹配不到RoutingKey
指定的队列 , 那么会交给 备用交换机 处理 .
一般情况 备用交换机 , 用来处理消费者 监控/报警 等操作
应用方式
在配置交换机Bean的时候可以通过 构造者模式中的 alternate()
方法 指定备用交换机
/* 变量说明
BACKUP_EXCHANGE_NORMAL: 普通交换机名
BACKUP_EXCHANGE_BACKUP: 备份交换机名
*/
@Bean
public DirectExchange backupNormalExchange() {
return ExchangeBuilder.directExchange(BACKUP_EXCHANGE_NORMAL)
.durable(true)
// 备用
.alternate(BACKUP_EXCHANGE_BACKUP)
.build();
}
# 代码示例
架构图
基本架构
点击查看代码
@Configuration
public class BackupConfig {
// 交换机
public static final String BACKUP_EXCHANGE_NORMAL = "backup.exchange.normal";
public static final String BACKUP_EXCHANGE_BACKUP = "backup.exchange.backup";
// 队列
public static final String BACKUP_QUEUE_NORMAL = "backup.queue.normal";
public static final String BACKUP_QUEUE_BACKUP = "backup.queue.backup";
public static final String BACKUP_QUEUE_WARNING = "backup.queue.warning";
// routingKey
public static final String BACKUP_ROUTING_KEY_NORMAL = "backup.routingkey.normal";
/**
* 交换机
*/
@Bean
public DirectExchange backupNormalExchange() {
return ExchangeBuilder.directExchange(BACKUP_EXCHANGE_NORMAL)
.durable(true)
// 备用
.alternate(BACKUP_EXCHANGE_BACKUP)
.build();
}
@Bean
public FanoutExchange backupExchange() {
// 广播
return new FanoutExchange(BACKUP_EXCHANGE_BACKUP);
}
/**
* 队列
*/
@Bean
public Queue backupNormalQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NORMAL).build();
}
@Bean
public Queue backupBackupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_BACKUP).build();
}
@Bean
public Queue backupWarningQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_WARNING).build();
}
/**
* 绑定
*/
@Bean
public Binding bindingNormalQueueToNormalExchange(Queue backupNormalQueue, DirectExchange backupNormalExchange) {
return BindingBuilder.bind(backupNormalQueue).to(backupNormalExchange).with(BACKUP_ROUTING_KEY_NORMAL);
}
@Bean
public Binding bindingBackupBackupQueueToBackupExchange(Queue backupBackupQueue, FanoutExchange backupExchange) {
return BindingBuilder.bind(backupBackupQueue).to(backupExchange);
}
@Bean
public Binding bindingBackupWarningQueueToBackupExchange(Queue backupWarningQueue, FanoutExchange backupExchange) {
return BindingBuilder.bind(backupWarningQueue).to(backupExchange);
}
}
生产者
点击查看代码
@RestController
@RequestMapping("/producer")
public class ProducerSendMessageController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/backup/{msg}")
public void backupSend(@PathVariable String msg) {
System.out.println("backupSend => " + msg);
// 发送成功
CorrelationData correlationData1 = new CorrelationData("backupSend>1");
rabbitTemplate.convertAndSend(BACKUP_EXCHANGE_NORMAL, BACKUP_ROUTING_KEY_NORMAL, msg, correlationData1);
// 生产者 找不到交换机
//CorrelationData correlationData2 = new CorrelationData("backupSend>2");
//rabbitTemplate.convertAndSend(BACKUP_EXCHANGE_NORMAL+"123", BACKUP_ROUTING_KEY_NORMAL, msg, correlationData2);
// 交换机 找不到RoutingKey
//CorrelationData correlationData3 = new CorrelationData("backupSend>3");
//rabbitTemplate.convertAndSend(BACKUP_EXCHANGE_NORMAL, BACKUP_ROUTING_KEY_NORMAL+"123", msg, correlationData3);
// 交换机 找不到队列
//CorrelationData correlationData4 = new CorrelationData("backupSend>4");
//rabbitTemplate.convertAndSend("testNoNull", "", msg, correlationData4);
}
}
消费者
点击查看代码
@Component
public class QueueConsumer {
/**
* 备份交换机
*/
// 正常消费者
@RabbitListener(queues = BACKUP_QUEUE_NORMAL)
public void backupNormalConsumer(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("backupNormalConsumer => " + new Date() + " : " + msg);
}
// 备份消费者
@RabbitListener(queues = BACKUP_QUEUE_BACKUP)
public void backupConsumer(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("backupConsumer => " + new Date() + " : " + msg);
}
// 警告消费者
@RabbitListener(queues = BACKUP_QUEUE_WARNING)
public void backupWarningConsumer(Message message, Channel channel) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("backupWarningConsumer => " + new Date() + " : " + msg);
}
}
# 代码测试
根据可控变量分析 , 可分析出可能情况 :
- 生产者 找不到交换机
- 交换机 RoutingKey匹配不到队列
- 交换机 未绑定队列
- 备用交换机 未绑定队列
验证请求 : http://localhost:8088/producer/messageFallback/GO
模拟故障操作 , 在生产者类中写有 , 去掉注释测试即可
生产者 找不到交换机
模拟故障操作 : 更改 生产者发送方法所指定的交换机名 , 试图寻找个不存在的交换机
测试结果 : ConfirmCallback()
回调失败
backupSend => GO
Failure => backupSend>2 [channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'backup.exchange.normal123' in vhost '/', class-id=60, method-id=40)]
交换机 RoutingKey匹配不到队列
模拟故障操作 : 更改 生产者发送方法指定的RoutingKey
测试结果 : ConfirmCallback()
回调成功 , 并且 备用交换机处理消息
backupSend => GO
Success => backupSend>3
backupWarningConsumer => Tue Mar 07 14:40:37 CST 2023 : GO
backupConsumer => Tue Mar 07 14:40:37 CST 2023 : GO
交换机 未绑定队列
模拟故障操作 : 新建交换机 , 不进行绑定队列
测试结果 : ConfirmCallback()
回调成功 , 在交换机中 触发returnedMessage()
退回消息 , 备用交换机未处理消息
backupSend => GO
回退消息 ==>
消息: GO
交换机名: testNoNull
RoutingKey:
退回原因: NO_ROUTE
Success => backupSend>4
备用交换机 未绑定队列
模拟故障操作 : 备用交换机 , 不进行绑定队列 (在Web管理页 解绑 重发消息即可实现)
测试结果 : ConfirmCallback()
回调成功 , 在备用交换机中 触发returnedMessage()
退回消息 , 备用交换机未处理消息
backupSend => GO
回退消息 ==>
消息: GO
交换机名: backup.exchange.normal
RoutingKey: backup.routingkey.normal123
退回原因: NO_ROUTE
Success => backupSend>3
在上面4种情况观察分析 , 可以发现 交换机一旦未绑定队列 , 会使 消息回退 , 也不会执行备用交换机方案
# 队列优先级
不难想象 , 在双十一高峰期 , 订单会非常多 . 有时 公司出于利益方面 , 划分出客户等级 , 等级越高的客户他们的单子往往在拥挤的时候优先得到解决
在MQ当中优先级取值范围 : 0 ~ 255 (数值越大越优先)
应用方式
配置
在 队列 中配置 优先级属性
/* 变量说明
CONFIRM_QUEUE_NAME: 队列名
*/
@Bean
public Queue priorityQueue() {
return QueueBuilder.durable("priority.Queue")
// 优先级 优先级最大值
.maxPriority(40)
.build();
}
在 生产者 消息发送方法 中配置优先级
指定 消息 在队列中的优先级 : ==message.getMessageProperties().setPriority(n);==
@GetMapping("/priority/{msg}")
public void priorityBend(@PathVariable String msg) {
// 优先级倒过来 ,
System.out.println("priorityBend => " + msg);
for (int i = 0; i < 20; i++) {
CorrelationData correlationData = new CorrelationDat22a(msg + i);
int finalI = i;
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, CONFIRM_ROUTING_KEY, msg + i, message -> {
message.getMessageProperties().setPriority(finalI);
return message;
}, correlationData);
}
}
# 测试
源代码基本架构 点击跳转
测试步骤 :
- 注释掉消费者代码
- 运行项目
- 访问 http://localhost:8088/producer/priority/GO (发送消息)
- 观察方式 , 有两种: Web管理页 (能看到消息明细) ; 控制台(测试顺序)
Web观察
进入 Queues -> 选择队列 -> Get Messages
控制台观察 : 恢复消费者代码(去除先前注解) , 重新运行项目
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO19
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO18
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO17
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO16
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO15
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO14
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO13
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO12
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO11
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO10
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO9
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO8
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO7
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO6
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO5
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO4
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO3
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO2
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO1
confirmConsumer => Tue Mar 07 21:55:26 CST 2023 : GO0
# 惰性队列
惰性队列 主要作用的将消息尽可能的存到磁盘中 , 而消费者响应消息的时候才会被加载到内存中 , 设计初衷主要是容纳更多的消息 , 以免高峰期导致内存爆炸现象
不同情况下的队列 :
- 普通队列(default) : 消息保存到内存中 (尽可能提高性能)
- 惰性队列(lazy) : 消息保存到磁盘中 (尽可能存储更多的消息)
惰性优点 :
- 消费者失效 , 消息堆积情况
- 大量消息 , 占用内存小
应用 :
- 在 队列 中配置 模式属性 : ==x-queue-mode: lazy==
- 大量发送消息
- 观察 Web内存占用情况
SpringBoot 配置
在 队列 中配置 惰性属性
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.Queue")
// 惰性
.lazy()
.build();
}
高压测试
在 100W 条消息的内存情况下
- 普通 : 5.2MiB ≈ 5.078125MB ≈ 5078.1KB
- 惰性 : 1.1MiB ≈ 1.07421875MB ≈ 1074.2KB
GB 是 生厂商为了方便计算 , 以十进制 10的3次方运算 . 如 : 1000MB = 1GB
GiB 而是 操作系统是采用 , 以二进制 2的10次方运算 . 如 : 1MiB = 1024KiB
为了便于理解 可以将 MiB为MB (其他单位也是如此)
采用 异步发布消息 100W条消息
普通 :
惰性 :
# 集群
以往的操作都是处于一台机器操作 , 那么该机器宕机了 , 就不能服务 , 因此我们只可以使用多台服务器连接形成集群 , 提高可用率 , 哪怕其中一台宕机了也可以完好的将数据保留
下面采用Docker模拟多台服务器应用
# Docker
在Docker中 部署安装RabbitMQ
安装Docker (网上教程烂大街..)
查看版本 https://hub.docker.com/_/rabbitmq , 并拉取下载 建议下载含有Web管理页 , 镜像中带有
mangement
版本的启动docker容器
docker run -d --name [容器名称] \ -p 5672:5672 \ -p 15672:15672 \ -v `pwd`/data:/home/rabbitmq \ --hostname [节点名称] \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:[tag标签] \
我个人的应用方式 (端口小修一下)
docker run -d --name rabbitmq03 \ -p 5674:5672 \ -p 15674:15672 \ -v `pwd`/data:/home/rabbitmq \ --hostname node03 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ rabbitmq:3.11.10-management
选项说明 :
选项 说明 -d 后台运行 --name 指定容器名称 -p 指定端口 [外部端口]:[容器端口] (5672:连接访问; 15672: Web管理页) -v 映射 目录/文件 --hostname 主机名 (较为重要 集群作为 节点名称 使用) -e 指定环境变量 (默认账号密码) Web打开 http://ip:15672/ / http://ip:15673/ / http://ip:15674/
账号密码 :
admin
**检查 : **
- 检查docker容器 ==docker ps -a==
- 检查端口是否调试好 ==docker port {容器id}==
- 查看防火墙是否开放端口
MQ容器不同版本的端口很有可能不同 , 可通过 ==docker ps -a== 进行检查