网站首页 > 技术文章 正文
RocketMQ 支持定时消息和延时消息。本文将介绍如何使用rocketmq-spring-boot-starter 包实现延时消息的发送和消费。
一、延时消息和定时消息
延时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
定时消息:Producer将消息发送到消息队列RocketMQ版服务端,但并不期望立马投递这条消息,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
定时消息与延时消息在代码配置上存在一些差异,但是最终达到的效果相同:消息在发送到消息队列RocketMQ版服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。
核心实现思路:将延迟消息通过一个临时存储进行暂存,到期后才投递到目标Topic中。如下图所示:
不过,目前开源版本的RocketMQ 仅支持延时消息,并不支持定时消息。下图是Message 类,仅提供了setDelayTimeLevel() 方法进行延时设置,并没有提供定时消息设置的方法setStartDeliverTime(),如图所示:
二、发送延时消息
默认 Broker 服务器端有 18 个延时级别:
messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
这 18 个延时级别在服务器端启动的时候,会被解析并放置到表 delayLevelTable 中。解析的过程就是上述字符串按照空格拆分开,然后根据时间单位的不同再进一步进行计算,得到最终的毫秒时间。级别就是根据这些毫秒时间的顺序而确定的,例如上述 1s 延迟就是级别 1, 5s 延迟就是级别 2,以此类推。注意:延时级别是从1开始计数的。
在示例项目中创建DelayProducer 服务类,代码如下所示:
/**
* 发送延时/定时消息
*/
@Slf4j
@Component
public class DelayProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送延时消息
* @param topic
* @param book
* @param timeout
* @param delayLevel
* 延时等级:现在RocketMQ并不支持任意时间的延时,需要设置几个固定的延时等级,
* 从1s到2h分别对应着等级 1 到 18,消息消费失败会进入延时消息队列
* "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
*/
public void sendDelayMsg(String topic, Book book, long timeout, int delayLevel) {
Message<Book> message = MessageBuilder.withPayload(book).build();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message, timeout, delayLevel);
log.info("消息id:{} : {} ", sendResult.getMsgId(), sendResult.getSendStatus().name());
}
}
对应的单元测试代码如下所示:
@SpringBootTest
class DelayProducerTest {
@Autowired
private DelayProducer delayProducer;
@Test
void sendDelayMsg() {
Book book = new Book("001",
"COLA 4.x架构入门和项目实践",
"技术专栏详细讲解COLA框架的使用,领域驱动设计DDD中领域模型的开发,以及DDD经典示例项目-货物运输系统代码实现细节。",
"软件架构",
new Date(),
336, 60
);
delayProducer.sendDelayMsg(AppConstant.DELAY_TOPIC, book, 1000, 5);
}
}
代码中设置延时等级5是一分钟。
启动单元测试,发送延时消息,此时通过RocketMQ Dashboard 查看,延时消息尚未入队列。
需要等待延时时间之后,消息才会入队列,如图所示,消息id:190590BD486E18B4AAC20D7A1EE30000。
三、消费延时消息
延时消息的消费和普通消息是一样的,代码如下所示:
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "DelayArchNoteConsumerGroup",
topic = AppConstant.DELAY_TOPIC)
public class DelayConsumer implements RocketMQListener<Book> {
@Override
public void onMessage(Book book) {
log.info("接收消息:{}#{}", book.getId(), book);
}
}
启动单元测试,发送延时消息,输出结果如下所示:
2022-04-03 15:24:39.388 INFO 84504 --- [ main] com.archnote.service.DelayProducer : 消息id:1907305A4A1818B4AAC20D9B44580000 : SEND_OK
消息监听器接收到消息,输出结果如下所示:
2022-04-03 15:25:39.444 INFO 84502 --- [ConsumerGroup_5] com.archnote.service.DelayConsumer : 接收消息:001#Book(id=001, title=COLA 4.x架构入门和项目实践, content=技术专栏详细讲解COLA框架的使用,领域驱动设计DDD中领域模型的开发,以及DDD经典示例项目-货物运输系统代码实现细节。, author=软件架构, createTime=Sun Apr 03 15:24:39 CST 2022, like=336, price=60)
可以看到从延时消息的发出,到消费输出,间隔时间在1分钟左右。
四、延迟消息的流转过程
延迟消息在RocketMQ Broker端的流转如下图所示:
可以看到,总共有6个步骤,如下所示:
(1)修改消息Topic名称和队列信息
RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。
这里将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。
(2)转发消息到延迟Topic的ConsumeQueue中
CommitLog中的消息转发到ConsumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。
(3)延迟服务消费SCHEDULE_TOPIC_XXXX消息
Broker内部有一个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。
(4)将信息重新存储到CommitLog中
在将消息到期后,需要投递到目标Topic。由于在第一步已经记录了原来的Topic和队列信息,因此这里重新设置,再存储到CommitLog即可。此外,由于之前Message Tag HashCode字段存储的是消息的投递时间,这里需要重新计算tag的哈希值后再存储。
(5)将消息投递到目标Topic中
由于消息的Topic名称已经改为了目标Topic。因此消息会直接投递到目标Topic的ConsumeQueue中,之后消费者即消费到这条消息。
(6)消费者消费目标Topic中的数据
延时消息除了支持同步发送之外,也支持异步发送、单向发送。可以尝试自己完成相关代码。
猜你喜欢
- 2024-09-21 5分钟了解Handler错误使用场景(handlerexecution)
- 2024-09-21 魔兽世界:不会用宏的快来看 教你入门宏命令 老油条请评论区补充
- 2024-09-21 关于SendMessage函数(send函数的参数)
- 2024-09-21 老外给你发“sorry,WC”,你知道他说的是什么意思吗?
- 2024-09-21 面试官:你来简单回答一下RocketMQ的默认发送流程
- 2024-09-21 记一次 .NET某医疗器械清洗系统 卡死分析
- 2024-09-21 Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
- 2024-09-21 聊聊rocketmq的ExtProducerResetConfiguration
- 1514℃桌面软件开发新体验!用 Blazor Hybrid 打造简洁高效的视频处理工具
- 569℃Dify工具使用全场景:dify-sandbox沙盒的原理(源码篇·第2期)
- 510℃MySQL service启动脚本浅析(r12笔记第59天)
- 486℃服务器异常重启,导致mysql启动失败,问题解决过程记录
- 485℃启用MySQL查询缓存(mysql8.0查询缓存)
- 467℃「赵强老师」MySQL的闪回(赵强iso是哪个大学毕业的)
- 446℃mysql服务怎么启动和关闭?(mysql服务怎么启动和关闭)
- 444℃MySQL server PID file could not be found!失败
- 最近发表
- 标签列表
-
- c++中::是什么意思 (83)
- 标签用于 (65)
- 主键只能有一个吗 (66)
- c#console.writeline不显示 (75)
- pythoncase语句 (81)
- es6includes (73)
- windowsscripthost (67)
- apt-getinstall-y (86)
- node_modules怎么生成 (76)
- c++int转char (75)
- static函数和普通函数 (76)
- el-date-picker开始日期早于结束日期 (70)
- js判断是否是json字符串 (67)
- checkout-b (67)
- c语言min函数头文件 (68)
- asynccallback (71)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- & (66)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- eacces (67)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)