SpringBoot+RabbitMQ 死信队列

news/2024/7/5 3:15:37

欢迎关注方志朋的博客,回复”666“获面试宝典

前言

死信:无法被消费的消息,称为死信。

如果死信一直留在队列中,会导致一直被消费,却从不消费成功。

所以我们专门开辟了一个来存放死信的队列,叫死信队列(DLX,dead-letter-exchange)。

死信的几种来源:
  • 消息 TTL 过期(time to live,存活时间,可以用在限时支付消息)

  • 队列达到最大长度(队列满了,无法路由到该队列)

  • 消息被拒绝( basic.reject / basic.nack ),并且 requeue = false

376756d03aa25a080c5acb6c3a6e1032.png

环境准备配置

准备 MQ 的队列和环境:

  • 正常交换机

    • 正常队列(最长队列 5) ---- 正常消费者,拒绝消息

    • ttl 队列(过期时间 60 秒) ---- 没有消费者

  • 死信交换机

    • 死信队列

主要配置文件如下:

@Configuration
public class DeadConfig {/* 正常配置 **********************************************************************************************************//*** 正常交换机,开启持久化*/@BeanDirectExchange normalExchange() {return new DirectExchange("normalExchange", true, false);}@Beanpublic Queue normalQueue() {// durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。Map<String, Object> args = deadQueueArgs();// 队列设置最大长度args.put("x-max-length", 5);return new Queue("normalQueue", true, false, false, args);}@Beanpublic Queue ttlQueue() {Map<String, Object> args = deadQueueArgs();// 队列设置消息过期时间 60 秒args.put("x-message-ttl", 60 * 1000);return new Queue("ttlQueue", true, false, false, args);}@BeanBinding normalRouteBinding() {return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normalRouting");}@BeanBinding ttlRouteBinding() {return BindingBuilder.bind(ttlQueue()).to(normalExchange()).with("ttlRouting");}/* 死信配置 **********************************************************************************************************//*** 死信交换机*/@BeanDirectExchange deadExchange() {return new DirectExchange("deadExchange", true, false);}/*** 死信队列*/@Beanpublic Queue deadQueue() {return new Queue("deadQueue", true, false, false);}@BeanBinding deadRouteBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("deadRouting");}/*** 转发到 死信队列,配置参数*/private Map<String, Object> deadQueueArgs() {Map<String, Object> map = new HashMap<>();// 绑定该队列到私信交换机map.put("x-dead-letter-exchange", "deadExchange");map.put("x-dead-letter-routing-key", "deadRouting");return map;}}

arguments 具体参数如下:

484592dad63884b6d3fcf2e14086d0a1.png

队列达到最大长度

首先测试最简单的,没有消费者。

调用6次正常队列的生产方法。

/*** 正常消息队列,队列最大长度5*/@GetMapping("/normalQueue")public String normalQueue() {Map<String, Object> map = new HashMap<>();map.put("messageId", String.valueOf(UUID.randomUUID()));map.put("data", System.currentTimeMillis() + ", 正常队列消息,最大长度 5");rabbitTemplate.convertAndSend("normalExchange", "normalRouting", map, new CorrelationData());return JSONObject.toJSONString(map);}

MQ 结果如下:

66a6b05adea5985492cc785532f07fc8.png

消息 TTL 过期

消息的TTL 指的是消息的存活时间,我们可以通过设置消息的TTL或者队列的TTL来实现。

  • 消息的TTL :对于设置了过期时间属性(expiration)的消息,消息如果在过期时间内没被消费,会过期

  • 队列的TTL :对于设置了过期时间属性(x-message-ttl)的队列,所有路由到这个队列的消息,都会设置上这个过期时间

两种配置都行,一般都用在定时任务,限时支付这种地方。

/*** 消息 TTL, time to live*/@GetMapping("/ttlToDead")public String ttlToDead() {Map<String, Object> map = new HashMap<>();map.put("messageId", String.valueOf(UUID.randomUUID()));map.put("data", System.currentTimeMillis() + ", ttl队列消息");rabbitTemplate.convertAndSend("normalExchange", "ttlRouting", map, new CorrelationData());return JSONObject.toJSONString(map);}

发送后:

9306c6d04f695fd9e8571e7c7e6214cb.png

等待过期后:

93340fc2005c11e87e28021b4855a193.png

Demo 中只是为了方便,代码中尽量使用 消息TTL,不要用 队列TTL

拒绝消息

正常队列消费后拒绝消息,并且不进行重新入队:

@Component
@RabbitListener(queues = "normalQueue")
public class NormalConsumer {@RabbitHandlerpublic void process(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException {System.out.println("收到消息,并拒绝重新入队 : " + message.toString());channel.basicReject(mqMsg.getMessageProperties().getDeliveryTag(), false);}
}

MQ 控制台:

7d795d3c89b5a3823c5b1ebbf88122a0.png

死信队列消费:

@Component
@RabbitListener(queues = "deadQueue")
public class DeadConsumer {@RabbitHandlerpublic void process(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException {System.out.println("死信队列收到消息 : " + message.toString());channel.basicAck(mqMsg.getMessageProperties().getDeliveryTag(), false);}
}

消息顺序和实验一致:

死信队列收到消息 : {data=1631534291765, 正常队列消息,最大长度 5, messageId=bce3888b-da38-4299-ac88-d22cbe164739}
死信队列收到消息 : {data=1631535222745, ttl队列消息, messageId=a4617445-5aab-4fac-aec7-5709ea699598}
死信队列收到消息 : {data=1631534503765, 正常队列消息,最大长度 5, messageId=b65ecaab-5ce7-4597-a32c-c90b67ec46da}
死信队列收到消息 : {data=1631534511468, 正常队列消息,最大长度 5, messageId=d63d2a4c-e7d3-4f00-a6ca-78e2d62d1d92}
死信队列收到消息 : {data=1631534585087, 正常队列消息,最大长度 5, messageId=eed0c349-415b-43dc-aa79-c683122a1289}
死信队列收到消息 : {data=1631534588311, 正常队列消息,最大长度 5, messageId=7a7bd152-f2fa-4a74-b9e6-943ac7cbb3d4}
死信队列收到消息 : {data=1631534608504, 正常队列消息,最大长度 5, messageId=9de512a1-4ca4-4060-9096-27aba01c1687}

来源:https://blog.csdn.net/m0_46144826

热门内容:
  • 放弃密码模式吧,最先进的Spring Cloud认证授权方案在这里

  • 实体映射最强工具类:MapStruct 真香!

  • 谷歌弃用20多年的OKR,再创内卷神器?

  • 我用Java几分钟处理完30亿个数据...

166ae42acc357c3939a3a51d7ff18fe4.png

最近面试BAT,整理一份面试资料《Java面试BAT通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。
获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

明天见(。・ω・。)ノ♡


http://lihuaxi.xjx100.cn/news/255073.html

相关文章

来厦门了!线上交流限额免费报名中

Datawhale活动 参与&#xff1a;线上或厦门线下&#xff0c;时间&#xff1a;6月23日6月23日&#xff0c;飞桨中国行将走进厦门&#xff0c;直击企业智能化转型升级诉求&#xff0c;围绕工业质检、安全巡检、生产设备健康管理等问题&#xff0c;由政府专家、行业大咖等深入剖析…

户外广告新创意

近来&#xff0c;各大城市纷纷加大了对户外广告的监管力度&#xff0c;部分城市甚至停止审批户外广告牌。这让户外广告运营者和广告发布商甚为头疼。 长期以来&#xff0c;户外广告牌扮演着截然相反的“双重角色”&#xff0c;在户外广告运营者和广告发布商眼中&#xff0c;“寸…

ES选主策略

ES版本5.6.3 1、整个流程的开始&#xff0c;实在node启动后触发的&#xff0c;Node.java中start()方法&#xff0c;通过调用ZenDiscovery.java中的doStart()方法&#xff0c;之后会调用startInitialJoin方法开始进行加入现有的cluster或者选主。 public void startInitialJoin(…

驱动数字经济加速,摩尔线程发布全新元计算架构MUSA和GPU产品

2022年3月30日&#xff0c;北京——摩尔线程今天举行主题为“元动力 创无限”的春季发布会。摩尔线程创始人兼CEO张建中解读了“元计算”这一产业趋势&#xff0c;并发布全新架构及系列重磅新品&#xff0c;包括&#xff1a;MUSA&#xff08;Moore Threads Unified System Arch…

数字家庭开发者中心

数字家庭开发者中心 http://www.adobe.com/devnet/devices/digital_home.html转载于:https://www.cnblogs.com/kobo/archive/2010/07/06/1772136.html

MySQL5.7配置日志

之前使用MySQL 5.1版本的时候&#xff0c;修改my.cnf&#xff0c;在[mysqld]下添加"log/data/mysql/query.log"&#xff0c;重启服务就ok了 但是在5.7会出现 Starting MySQL... ERROR! The server quit without updating PID file (/data/mysql/mysql.pid).原因是5.7…

LeetCode实战:除自身以外数组的乘积

背景 为什么你要加入一个技术团队&#xff1f;如何加入 LSGO 软件技术团队&#xff1f;我是如何组织“算法刻意练习活动”的&#xff1f;为什么要求团队的学生们写技术Blog 题目英文 Given an array nums of n integers where n > 1, return an array output such that ou…

IDEA公司再发新神器!超越 VS Code 骚操作!

欢迎关注方志朋的博客&#xff0c;回复”666“获面试宝典JetBrains 长期以来一直是 IDE 业务的重要参与者。到目前为止&#xff0c;他在 VS Code 上最大的竞争对手是 IntelliJ IDEA 和 WebStormJetBrains Fleet 新一代&#xff0c;VS Code杀手fc369c3a8b50159824ed8d9eeae47a2a…