RabbitMq TTL+死信队列 延迟消息问题记录 全球今头条

首页>资讯 > 正文
2023-02-26 22:58:44

来源:腾讯云

延迟队列存储的对象是对应的延迟消息,所谓的延迟消息是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费

利用RabbitMqTTL和死信队列 来实现延时消费。

如果设置的是队列统一过期时间放到死信队列,没有什么问题。


(相关资料图)

如果是延时时间设置到每条消息上的。而不是给队列的。

实现方式为消息存活时间为动态用户页面可配置的。

这就导致了一个问题:

先用一条消息的存活时间是1天。后面又进了一条消息存活时间是1小时。

结果一小时到了,发现这条消息并没有被转发到消费延时过期消息的队列。

原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。

它不会检测每一条消息是否过期。而是顺序检测。

如果first in的消息过期时间很长,会导致它阻塞后进的消息。

不仅无法实现真正的过期时间。还会导致,一个大的过期时间的先进的消息,会堆积一堆后进的过期时间短的消息。

问题解决

这个时候可以使用rabbitMq的一个插件:rabbitmq_delayed_message_exchange

一段时间以来,人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。而rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送

插件安装

需要根据自己的rabbitMq选择对应的版本。我rabbitMq的版本是RabbitMQ 3.11.0,对应的插件版本就是:3.11.1

基于Linux

--1、cd到rabbitmq默认安装位置cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins--2、通过ftp工具将插件上传到此目录下--3、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--4、重启MQ服务systemctl restart rabbitmq-server

基于Docker

--1、通过ftp工具将插件上传到Linux服务器的根目录下--2、拷贝到docker中rabbitmq插件目录下,rabbitmq_delayed_message_exchange-3.9.0.ez(下载包的全名)docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins--3、进入容器docker exec -it 容器id /bin/bash--4、查看插件是否存在(确保2中的操作已经将插件拷贝过来了)cd pluginsls |grep delay--5、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--6、退出容器exit--7、重启MQ服务docker restart 容器ID

安装成功

web界面新建交换机选择类型出现红框标注即表示成功

image.png

代码实现

1:springBoot配置

@Configurationpublic class DelayRabbitmqConfig {     /**     * 声明延迟队列     * @return     */    @Bean    public Queue delayQueue(){        return new Queue(QueueConstant.DelayQueue,                true,false,false);    }     /**     * 声明延迟自定义交换机类型     * @return     */    @Bean    public CustomExchange delayCustomExchange(){        HashMap args = new HashMap<>();//        设置 x-delayed-type 为 direct,当然也可以是 topic 等 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递        args.put("x-delayed-type","direct");        return new CustomExchange(ExchangeConstant.DelayCustomerExchange,                "x-delayed-message",true,false,args);    }     /**     * 绑定延迟交换机和队列     * @return     */    @Bean    public Binding delayQueueAndCustomExchange(){        return BindingBuilder.bind(delayQueue())                .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs();    }}

springMvc配置

引入依赖:    xmlns:util="http://www.springframework.org/schema/util"    http://www.springframework.org/schema/util    http://www.springframework.org/schema/util/spring-util-4.0.xsd                                                                                                                

代码实现

//消息发送final MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(Integer.valueOf(ttl.toString()));DisTimingPushDto disTimingPushDto = new DisTimingPushDto();disTimingPushDto.setOrderId(dispense.getOrderId());disTimingPushDto.setPushTime(disDispense.getPushTime());rabbitTemplate.convertAndSend(MsgQueueEnum.TIMING_PUSH.getExchangeName(), MsgQueueEnum.TIMING_PUSH.getQueueName(), disTimingPushDto, messagePostProcessor);//每条消息时间配置import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;/** * 延迟消息处理器 Processor * @author king * @date 2022年12月28日 11:14 */public class MyMessagePostProcessor implements MessagePostProcessor {    /**     * 消息延迟时间,单位:毫秒     */    private final Integer TTL;    public MyMessagePostProcessor(final Integer ttl) {        this.TTL = ttl;    }    @Override    public Message postProcessMessage(Message message) throws AmqpException {        message.getMessageProperties().setDelay(TTL);        return message;    }}

标签: RabbitMQ

THE END
免责声明:本文系转载,版权归原作者所有;旨在传递信息,不代表热讯制鞋网的观点和立场。

相关热点

交通高质量发展三年攻坚行动攻坚目标谋划推动实施一大批重大交通基础设施项目,交通投资累计完成360亿元以上,初步形成一航两港四环四铁九
2023-01-16 16:34:21
新华社电 上海市文化和旅游局近日发布《上海市密室剧本杀内容备案管理规定(征求意见稿)》,并截至12月8日面向社会公众广泛征求意见。这
2021-11-19 13:46:03
《中国证券报》17日刊发文章《备战2022 基金经理调仓换股布新局》。文章称,距离2021年结束仅剩一个多月,基金业绩分化明显。部分排名靠前
2021-11-19 13:46:03
交通运输部办公厅 中国人民银行办公厅 中国银行保险监督管理委员会办公厅关于进一步做好货车ETC发行服务有关工作的通知各省、自治区、直
2021-11-19 13:45:58
新华社北京11月17日电 题:从10月份市场供需积极变化看中国经济韧性新华社记者魏玉坤、丁乐读懂中国经济,一个直观的视角就是市场供需两端
2021-11-19 13:45:58
全国教育财务工作会议披露的消息称,2020年,中国国家财政性教育经费投入达4 29万亿元,占GDP总量的4 206%,我国国家财政性教育经费支出占G
2021-11-19 13:45:48
如果你也热爱“种草”,前方高能预警!让你心心念念、“浏览”忘返的网络平台,可能早已成为一块块“韭菜地”。近日,据《半月谈》报道,有...
2021-11-19 13:45:48
日前,工业和信息化部印发《“十四五”信息通信行业发展规划》(以下简称《规划》),描绘了未来5年信息通信行业的发展趋势。《规划》指出...
2021-11-19 13:45:40
本报讯(中青报·中青网记者 周围围)2021年快递业务旺季正式拉开帷幕。国家邮政局监测数据显示,仅11月1日当日,全国共揽收快递包裹5 69
2021-11-19 13:45:40
人民网曼谷11月17日电 (记者赵益普)17日上午,中国援柬埔寨第七批200万剂科兴新冠疫苗抵达金边国际机场。当天,柬埔寨政府在机场举行了
2021-11-19 13:45:35
金坛压缩空气储能国家试验示范项目主体工程一角受访者供图依托清华大学非补燃压缩空气储能技术,金坛压缩空气储能项目申请专利百余项,建立
2021-11-19 13:45:35
视觉中国供图42亿立方米据有关部门预计,今年山西煤炭产量有望突破12亿吨,12月份山西外送电能力将超过900万千瓦,今冬明春煤层气产量将达4
2021-11-19 13:44:34
14省份相继发布2021年企业工资指导线——引导企业合理提高职工工资今年以来,天津、新疆、内蒙古、陕西、西藏、山东、江西、山西、福建、四
2021-11-19 13:44:34
中新网客户端北京11月18日电 (记者 谢艺观)“一条路海角天涯,两颗心相依相伴,风吹不走誓言,雨打不湿浪漫,意济苍生苦与痛,情牵天下喜
2021-11-19 13:44:31
近日,交通运输部等三部门发布《关于进一步做好货车ETC发行服务有关工作的通知》。通知提到,对不具备授信条件的用户,商业银行可在依法合
2021-11-19 13:44:31
欧莱雅面膜陷优惠“年度最大”风波 涉及该事件集体投诉超6000人次美妆大牌双十一促销翻车?近日,因预售价格比双十一现货贵出66%,欧莱雅
2021-11-19 13:44:13
43 6%受访者会在工作两三年后考虑跳槽54 3%受访者认为跳槽对个人职业发展有利有弊如今对不少年轻人来说,想对一份工作“从一而终”不太容易
2021-11-19 13:44:13
超八成受访青年表示如有机会愿意开展副业 规划能力最重要64 4%受访青年指出做副业跟风心态最要不得如今,“身兼数职”已成为年轻人当中的
2021-11-19 13:44:01
发展氢能正当其时【科学随笔】氢能是一种二次能源,它通过一定的方法利用其他能源制取,具有清洁无污染、可储存、与多种能源便捷转换等优点
2021-11-19 13:44:01
“千杯不醉”的解酒“神药”能信吗?专家:网红“解酒药” 其实不算药俗话说,“酒逢知己千杯少”,酒一直是国人饭桌上至关重要的存在。尽...
2021-11-19 13:43:57
最新文章

相关推荐