网站首页 > 技术文章 正文
在分布式消息队列中,消息重复消费是一个常见问题(例如网络重试、消费者重启等场景)。RocketMQ 通过多种机制来尽量减少重复消费,但无法完全避免(需业务方配合实现幂等性)。以下是 RocketMQ 的解决方案和最佳实践:
1. RocketMQ 自身的防重复机制
(1) 消息重试机制(Consumer端)
- 默认重试策略:若消费者消费失败(返回 RECONSUME_LATER),消息会被重新投递(最多 16 次)。
- 风险点:重试可能导致同一条消息被多次消费。
(2) 消息幂等性标识(Message ID + Offset)
- 每条消息有唯一的 Message ID 和 Broker Offset,消费者可通过记录已处理的 ID/Offset 去重。
- 局限性:极端情况下(如 Broker 主从切换),Message ID 可能重复(需结合业务键去重)。
2. 业务层必须实现的幂等性方案
RocketMQ 不保证绝对不重复,需业务代码自行处理幂等。常见方案:
(1) 唯一业务键 + 数据库去重
- 适用场景:订单ID、支付流水号等有唯一键的业务。
- 实现方式:
- java
// 伪代码:消费前检查唯一键是否已处理
String orderId = message.getUserProperty("order_id");
if (orderId != null && !isOrderProcessed(orderId)) {
processOrder(orderId);
markOrderAsProcessed(orderId); // 写入DB或缓存
}
- 存储选择:
- 数据库唯一索引:插入前检查 order_id 是否已存在。
- Redis SETNX:利用原子操作记录已处理的消息键。
(2) 乐观锁(适用于更新操作)
- 示例(库存扣减):
- sql
UPDATE inventory SET stock = stock - 1
WHERE product_id = #{productId} AND stock >= 1;
- 即使消息重复,数据库乐观锁会保证只扣减一次。
(3) 分布式锁(强一致性场景)
- 消费前先获取锁(如 Redis 的 SET key value NX EX):
- java
String lockKey = "msg_lock:" + message.getMsgId();
try {
if (redisClient.setIfAbsent(lockKey, "1", 30, TimeUnit.SECONDS)) {
processMessage(message);
}
} finally {
redisClient.delete(lockKey);
}
3. RocketMQ 4.x 后的增强特性
(1) 事务消息
- 二阶段提交机制可减少因生产者重试导致的消息重复(但消费者仍需幂等)。
- 适用场景:跨系统事务(如支付后发消息)。
(2) 消息轨迹(Message Trace)
- 记录消息生命周期(发送、存储、消费),便于排查重复消费问题。
4. 最佳实践总结
环节 | 措施 |
生产者 | 避免重复发送(如网络超时后重试前先检查状态)。 |
Broker | 依赖 Message ID 和 Offset,但不可完全信任。 |
消费者 | 必须实现幂等逻辑 (唯一键、乐观锁、分布式锁等)。 |
监控 | 通过 RocketMQ 控制台或日志监控重复消费告警。 |
代码示例(消费者幂等)
java
// 基于 Redis 的幂等消费示例
public class MyConsumer implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
String msgId = message.getMsgId();
String orderId = message.getUserProperty("order_id");
// 1. 检查 Redis 是否已处理
if (redisTemplate.opsForValue().setIfAbsent("consumed:" + orderId, "1", 24, TimeUnit.HOURS)) {
try {
// 2. 业务处理
processOrder(orderId);
} catch (Exception e) {
redisTemplate.delete("consumed:" + orderId); // 失败时清除标记
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
} else {
System.out.println("消息已处理,跳过: " + orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
关键结论
- RocketMQ 不保证 100% 不重复:需业务方结合消息唯一标识(如订单ID)实现幂等。
- 轻量级方案:优先用 Redis/Database 去重;高并发场景可用乐观锁或分布式锁。
- 监控:通过日志或 RocketMQ Admin Tool 定期检查重复消息。
猜你喜欢
- 2025-07-27 Mybatis Plus框架学习指南-第六节内容(常用的类 1)
- 2025-07-27 如何安全配置数据库(MySQL/PostgreSQL/MongoDB)
- 2025-07-27 SQL-执行过程详解(sql语句执行)
- 2025-07-27 SQL Server如何使用维护计划?(sql2000维护计划怎么执行)
- 2025-07-27 云服务器:SQL数据库超时的原因与解决方法
- 2025-07-27 SQL 从入门到精通:全面掌握数据库操作
- 2025-07-27 数据库 SQL 约束之 DEFAULT(sql的约束是什么)
- 2025-07-27 windows下,mysql自动备份脚本(mysql数据备份脚本)
- 2025-07-27 SQL Server中从SELECT进行UPDATE的方法
- 2025-07-27 谈谈 SQL 注入及防范(如何处理sql注入)
- 1517℃桌面软件开发新体验!用 Blazor Hybrid 打造简洁高效的视频处理工具
- 596℃Dify工具使用全场景:dify-sandbox沙盒的原理(源码篇·第2期)
- 521℃MySQL service启动脚本浅析(r12笔记第59天)
- 489℃服务器异常重启,导致mysql启动失败,问题解决过程记录
- 489℃启用MySQL查询缓存(mysql8.0查询缓存)
- 477℃「赵强老师」MySQL的闪回(赵强iso是哪个大学毕业的)
- 456℃mysql服务怎么启动和关闭?(mysql服务怎么启动和关闭)
- 454℃MySQL server PID file could not be found!失败
- 最近发表
- 标签列表
-
- cmd/c (90)
- c++中::是什么意思 (84)
- 标签用于 (71)
- 主键只能有一个吗 (77)
- c#console.writeline不显示 (95)
- pythoncase语句 (88)
- es6includes (74)
- sqlset (76)
- windowsscripthost (69)
- apt-getinstall-y (86)
- node_modules怎么生成 (76)
- c++int转char (75)
- static函数和普通函数 (76)
- el-date-picker开始日期早于结束日期 (70)
- js判断是否是json字符串 (67)
- checkout-b (67)
- c语言min函数头文件 (68)
- asynccallback (71)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)