RocketMQ(Rocket Message Queue)是一款分布式消息中间件,由阿里巴巴开源并最终捐赠给了Apache基金会,成为Apache的顶级项目,它广泛应用于大数据、分布式事务解耦、异步通信、削峰填谷等场景,在RocketMQ中,定时消息是一种特殊类型的消息,允许用户设置一个特定的时间来投递消息到消费者。
关于定时消息的存储,我们需要了解RocketMQ的消息存储机制,在RocketMQ中,消息存储主要涉及以下几个组件:
1、Producer(生产者) 负责发送消息。
2、Broker(消息中转站) 接收生产者发送的消息,并存储在消息存储层。
3、Consumer(消费者) 从Broker获取并消费消息。
4、Message Store 实际存储消息的地方。
5、CommitLog(提交日志) 所有消息在存储前都会顺序写入提交日志。
6、ConsumeQueue(消费队列) 根据Topic和队列ID构建的索引文件,用于存储指向提交日志中消息的指针。
7、IndexFile(索引文件) 对CommitLog进行索引,提高查找效率。
当Producer发送一条定时消息时,该消息首先被存储在CommitLog中,根据Topic和队列ID,生成相应的索引并存储在ConsumeQueue中,对于定时消息来说,它们在存储时确实可能会多次存储,但这并不意味着会有多份物理拷贝,相反,这是通过更新原有消息的属性来实现的,确保定时消息能够在指定的时间点被投递。
具体到定时消息的处理流程如下:
1、发送定时消息 当Producer发送一条带有特定延时级别的定时消息时,该消息会被标记为延迟级别,并存入CommitLog。
2、消息存储与更新 Broker会维护一个定时任务,定期检查CommitLog中是否有到期的定时消息,一旦发现定时消息到了预设的时间点,Broker会将这条消息的延时级别更新为普通级别,并将其放入ConsumeQueue中等待消费者来消费。
3、去重处理 如果在定时未到达之前,相同的消息再次被发送,并且具有相同的Message Key,那么后发送的消息将会覆盖先前的消息,这是因为RocketMQ使用Message Key作为唯一标识来判断消息是否重复,实际上只有最后存储的消息会作为最终的实际消息被消费。
4、消费定时消息 消费者在订阅了相应的Topic和队列之后,会在消息可用时接收到这些定时消息。
虽然定时消息在逻辑上可能被多次"存储"或更新其属性,但物理上只会保留一份最新的实际消息,RocketMQ通过高效的索引和定时任务机制确保了定时消息能够准确地在预定时间送达消费者,利用Message Key进行去重处理,保证了消息的唯一性和准确性。
原创文章,作者:未希,如若转载,请注明出处:https://www.kdun.com/ask/537265.html
本网站发布或转载的文章及图片均来自网络,其原创性以及文中表达的观点和判断不代表本网站。如有问题,请联系客服处理。
发表回复