网站首页 > 技术文章 正文
导读
本文围绕RocketMQ和基于其实现的DDMQ的顺序消费机制展开深度探讨,从源码解读二者顺序消费的实现原理和差异,包括发送端的顺序发送,Broker及消费端的顺序拉取,消费端的顺序消费等。
适合有一定消息队列基础,正在或计划使用RocketMQ及相关衍生产品进行开发,对消息顺序性有严格要求,期望深入理解底层原理以优化系统性能和稳定性的后端开发工程师、架构师阅读。
RocketMQ
RocketMQ模型
先简单了解一下RocketMQ的模型,有个概念。
部署模型
NameServer:担任路由消息的提供者,使得生产者或消费者能够通过NameServer查找各Topic及其queue相应的Broker IP列表。
Broker:消息中转角色,负责接收从生产者发来的消息并存储,同时为消费者拉取请求做准备。
队列模型
Topic:一类消息的集合,每个topic包含若干条消息,但每条消息只能属于一个topic。
Tag:相同topic下可以有不同的tag,即再分类。
注:上图中TopicA和TopicB的Queue0不是同一个队列。
集群消费下,同一Topic下的队列会均匀分配给同一消费者组中的每位消费者。
Rebalance机制,即负载均衡机制:将队列均匀分配给消费者,包括队列数或消费者数有变化时也是通过该机制重新分配。
这里Rebalance策略有几种,由于不是本次分享重点就不展开了,感兴趣的可以看
org.apache.rocketmq.client.consumer.rebalance目录下的实现。
一张图了解RocketMQ如何顺序消费
源码解读
dependency>groupId>org.apache.rocketmqgroupId>artifactId>rocketmq-clientartifactId>version>5.3.1version>dependency>
顺序发送消息
做法:按序投递至同一队列中。
调用方法:
org.apache.rocketmq.client.producer.DefaultMQProducer#send(
org.apache.rocketmq.common.message.Message, 
org.apache.rocketmq.client.producer.MessageQueueSelector, java.lang.Object)
// 顺序发送使用示例public static void main(String[] args) throws Exception {// 创建生产者实例并设置生产者组名DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");// 设置Name Server地址producer.setNamesrvAddr("127.0.0.1:9876");producer.start;// 发送消息Message message = new Message("test_topic", "test_tag", "Hello, Rocketmq!".getBytes);SendResult sendResult = producer.send(message, new SelectMessageQueueByHash, "test_arg");System.out.println(sendResult);//...producer.shutdown;}
这个send方法有三个参数
MessageQueueSelector selector:消息队列选择器(一个interface,有一个select方法,返回是消息队列),可自行实现也可以用SDK中提供的几个。
Object arg:上面selector的select方法的参数。
// org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.MessageQueueSelector, java.lang.Object)// 消息发送调用入口@Overridepublic SendResult send(Message msg, MessageQueueSelector selector, Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {msg.setTopic(withNamespace(msg.getTopic));// 选取发送的队列MessageQueue mq = this.defaultMQProducerImpl.invokeMessageQueueSelector(msg, selector, arg, this.getSendMsgTimeout);mq = queueWithNamespace(mq);// 执行消息发送if (this.getAutoBatch && !(msg instanceof MessageBatch)) {return sendByAccumulator(msg, mq, null);} else {return sendDirect(msg, mq, null);}}// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#invokeMessageQueueSelector// 选择器:选择发送的消息队列public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg,final long timeout) throws MQClientException, RemotingTooMuchRequestException {long beginStartTime = System.currentTimeMillis;this.makeSureStateOK;Validators.checkMessage(msg, this.defaultMQProducer);// 获取Topic对象TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic);if (topicPublishInfo != null && topicPublishInfo.ok) {MessageQueue mq = null;try {// 获取Topic的所有队列ListmQClientFactory.getMQAdminImpl.parsePublishMessageQueues(topicPublishInfo.getMessageQueueList);Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic, mQClientFactory.getClientConfig.getNamespace);userMessage.setTopic(userTopic);// 选取发送的队列mq = mQClientFactory.getClientConfig.queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}long costTime = System.currentTimeMillis - beginStartTime;if (timeoutthrow new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 返回队列return mq;} else {throw new MQClientException("select message queue return null.", null);}}validateNameServerSetting;throw new MQClientException("No route info for this topic, " + msg.getTopic, null);}
SDK中提供的几种MessageQueueSelector实现
// org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash// hash取余 选择消息队列public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(Listint value = arg.hashCode % mqs.size;if (value 0) {value = Math.abs(value);}return mqs.get(value);}}
// org.apache.rocketmq.client.producer.selector.SelectMessageQueueByRandom// 随机 选择消息队列public class SelectMessageQueueByRandom implements MessageQueueSelector {private Random random = new Random(System.currentTimeMillis);@Overridepublic MessageQueue select(Listint value = random.nextInt(mqs.size);return mqs.get(value);}}
所以,消息顺序发送的重点就是这个selector,要确保消息都发送到了同一个队列中。
在消息队列数量不变的情况下,是可以使用hash取余的这种方法的,同时还需保证消息发送时传入的arg不变。一般默认也是用的这种方法。
另外,若有并发问题或多实例同时投递问题还需要通过加锁等方式自行控制消息按序发送。
顺序消费消息
-  
集群消费下,保证消费者集群只有一位在拉取及消费消息。
 -  
消费者消费时只有一个线程在消费;
 
// 客户端顺序消费使用示例public static void main(String[] args) throws Exception {// 创建消费者实例并设置消费者组名和消费模式DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.setMessageModel(MessageModel.CLUSTERING);consumer.subscribe("test_topic", "test_tag");// 设置顺序消费消息监听器consumer.registerMessageListener(new MessageListenerOrderly {@Overridepublic ConsumeOrderlyStatus consumeMessage(Listfor (MessageExt msg : msgs) {try {System.out.println(new String(msg.getBody));} catch (Exception e) {e.printStackTrace;}}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start;}
拉取消息前,请求Broker端上锁,定时20秒执行一次续锁:
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#start// 拉取消息-客户端请求Broker端上锁@Overridepublic void start {// 集群消费模式下,通过定时器,每20s请求一次Broker上锁if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel)) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable {@Overridepublic void run {try {ConsumeMessageOrderlyService.this.lockMQPeriodically;} catch (Throwable e) {log.error("scheduleAtFixedRate lockMQPeriodically exception", e);}}// REBALANCE_LOCK_INTERVAL 默认值为20000,可配置}, 1000, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}}// org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#lockMQPeriodicallypublic synchronized void lockMQPeriodically {if (!this.stopped) {this.defaultMQPushConsumerImpl.getRebalanceImpl.lockAll;}}// org.apache.rocketmq.client.impl.consumer.RebalanceImpl#lockAllpublic void lockAll {HashMapString, SetMessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName;IteratorEntryString, SetMessageQueue>>> it = brokerMqs.entrySet.iterator;while (it.hasNext) {EntryString, SetMessageQueue>> entry = it.next;final String brokerName = entry.getKey;final SetMessageQueue> mqs = entry.getValue;if (mqs.isEmpty) {continue;}// 获取BrokerFindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);if (findBrokerResult != null) {// 通过 消费者组 + clientId 对 messageQueue 进行Broker端上锁LockBatchRequestBody requestBody = new LockBatchRequestBody;requestBody.setConsumerGroup(this.consumerGroup);requestBody.setClientId(this.mQClientFactory.getClientId);requestBody.setMqSet(mqs);try {// 尝试上锁。这里的1s不是锁的过期时间,是请求超时时间;锁过期时间是维护在Broker端SetMessageQueue> lockOKMQSet =this.mQClientFactory.getMQClientAPIImpl.lockBatchMQ(findBrokerResult.getBrokerAddr, requestBody, 1000);for (MessageQueue mq : mqs) {ProcessQueue processQueue = this.processQueueTable.get(mq);if (processQueue != null) {if (lockOKMQSet.contains(mq)) {if (!processQueue.isLocked) {log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);}// 上锁成功processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis);} else {// 上锁失败processQueue.setLocked(false);log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);}}}} catch (Exception e) {log.error("lockBatchMQ exception, " + mqs, e);}}}}
拉取消息,校验Broker端是否上锁成功。
拉取成功后触发消费回调。
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage// 拉取消息-客户端请求Broker上锁后public void pullMessage(final PullRequest pullRequest) {final ProcessQueue processQueue = pullRequest.getProcessQueue;// ... 省略if (!this.consumeOrderly) {// ... 省略} else {// 用前面上锁时设置的processQueue.locked判断Broker端是否上锁成功if (processQueue.isLocked) {if (!pullRequest.isPreviouslyLocked) {long offset = -1L;try {offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue);if (offset 0) {throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);}} catch (Exception e) {this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);return;}boolean brokerBusy = offsetlog.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}// 设置上锁成功 和 消息消费进度pullRequest.setPreviouslyLocked(true);pullRequest.setNextOffset(offset);}} else {// 没上锁成功则后续再重试this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);log.info("pull message later because not locked in broker, {}", pullRequest);return;}}// ... 省略// 初始化拉取消息成功时的回调PullCallback pullCallback = new PullCallback {@Overridepublic void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue, pullResult,subscriptionData);switch (pullResult.getPullStatus) {case FOUND:// ... 省略// 触发消息消费DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList,processQueue,pullRequest.getMessageQueue,dispatchToConsume);// ... 省略}// ... 省略// 触发消息拉取及消费try {this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue,subExpression,subscriptionData.getExpressionType,subscriptionData.getSubVersion,// 拉取offsetpullRequest.getNextOffset,this.defaultMQPushConsumer.getPullBatchSize,this.defaultMQPushConsumer.getPullBatchSizeInBytes,sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,// 消费回调pullCallback);} catch (Exception e) {log.error("pullKernelImpl exception", e);this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);}}
Broker端锁过期时间为默认60s,且如果有副本节点,会升级为分布式锁。
这段代码不在客户端SDK中,需要查看Broker端源码 
https://github.com/apache/rocketmq/tree/develop/broker/src/main/java/org/apache/rocketmq/broker。
// org.apache.rocketmq.broker.processor.AdminBrokerProcessor#lockBatchMQ// 拉取消息-Broker端上锁private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody, LockBatchRequestBody.class);Set// 锁住本地队列SetrequestBody.getConsumerGroup,requestBody.getMqSet,requestBody.getClientId);if (requestBody.isOnlyThisBroker || !brokerController.getBrokerConfig.isLockInStrictMode) {lockOKMQSet = selfLockOKMQSet;} else {// 若不仅仅是锁当前Broker的,则继续分布式锁逻辑requestBody.setOnlyThisBroker(true);int replicaSize = this.brokerController.getMessageStoreConfig.getTotalReplicas;// 计算法人数量int quorum = replicaSize / 2 + 1;// 如果无其他副本节点,则无需分布式锁if (quorum 1) {lockOKMQSet = selfLockOKMQSet;} else {// 计算本地各已上锁队列的锁数量final ConcurrentMapfor (MessageQueue mq : selfLockOKMQSet) {if (!mqLockMap.containsKey(mq)) {mqLockMap.put(mq, 0);}mqLockMap.put(mq, mqLockMap.get(mq) + 1);}BrokerMemberGroup memberGroup = this.brokerController.getBrokerMemberGroup;if (memberGroup != null) {// 遍历除当前节点外的其他节点,向他们发起加锁请求,让它们也加上本地锁MapaddrMap.remove(this.brokerController.getBrokerConfig.getBrokerId);final CountDownLatch countDownLatch = new CountDownLatch(addrMap.size);requestBody.setMqSet(selfLockOKMQSet);requestBody.setOnlyThisBroker(true);for (Long brokerId : addrMap.keySet) {try {this.brokerController.getBrokerOuterAPI.lockBatchMQAsync(addrMap.get(brokerId),requestBody, 1000, new LockCallback {@Overridepublic void onSuccess(Setfor (MessageQueue mq : lockOKMQSet) {if (!mqLockMap.containsKey(mq)) {mqLockMap.put(mq, 0);}// 加锁成功计数mqLockMap.put(mq, mqLockMap.get(mq) + 1);}countDownLatch.countDown;}@Overridepublic void onException(Throwable e) {LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);countDownLatch.countDown;}});} catch (Exception e) {LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);countDownLatch.countDown;}}try {countDownLatch.await(2000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {LOGGER.warn("lockBatchMQ exception on {}, {}", this.brokerController.getBrokerConfig.getBrokerName, e);}}// 加锁数大于等于法人数(过半)才算加锁成功for (MessageQueue mq : mqLockMap.keySet) {if (mqLockMap.get(mq) >= quorum) {lockOKMQSet.add(mq);}}}}LockBatchResponseBody responseBody = new LockBatchResponseBody;responseBody.setLockOKMQSet(lockOKMQSet);response.setBody(responseBody.encode);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}// org.apache.rocketmq.broker.client.rebalance.RebalanceLockManagerpublic class RebalanceLockManager {// Broker端锁默认过期时间 60sprivate final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty("rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));private final Lock lock = new ReentrantLock;// 一个消费者组 一组队列锁private final ConcurrentMapnew ConcurrentHashMap(1024);// ... 省略static class LockEntry {private String clientId;private volatile long lastUpdateTimestamp = System.currentTimeMillis;// 是否上锁public boolean isLocked(final String clientId) {boolean eq = this.clientId.equals(clientId);return eq && !this.isExpired;}// 是否过期public boolean isExpired {boolean expired =(System.currentTimeMillis - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;return expired;}}}
消费消息时需要对messageQueue和processQueue都上本地锁,且前提是已获得了Broker端的锁。
messageQueue表示拉取回来的消息元数据信息。
processQueue可以看作是MessageQueue的消费快照,本地操作主要是修改它的数据,messageQueue和processQueue是一一对应的。
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest#run// 消费消息-消费前上本地锁@Overridepublic void run {if (this.processQueue.isDropped) {log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}// messageQueue上线程锁final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {// 广播模式 或 Broker端上锁成功 才会继续执行if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel)|| this.processQueue.isLocked && !this.processQueue.isLockExpired) {final long beginTime = System.currentTimeMillis;for (boolean continueConsume = true; continueConsume; ) {if (this.processQueue.isDropped) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);break;}// 集群消费 Broker端未上锁if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel)&& !this.processQueue.isLocked) {log.warn("the message queue not locked, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}// 集群消费 Broker端锁过期if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel)&& this.processQueue.isLockExpired) {log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}// 超时重试long interval = System.currentTimeMillis - beginTime;if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize;ListdefaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup);if (!msgs.isEmpty) {final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;// 消费前hook调用 ... 省略// 正式消费long beginTimestamp = System.currentTimeMillis;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;boolean hasException = false;try {// processQueue上线程锁this.processQueue.getConsumeLock.readLock.lock;if (this.processQueue.isDropped) {log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}// 触发业务消费status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",UtilAll.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue, e);hasException = true;} finally {// 释放processQueue线程锁this.processQueue.getConsumeLock.readLock.unlock;}// 处理消费结果 ... 省略// 消费结束后的hook ... 省略ConsumeMessageOrderlyService.this.getConsumerStatsManager.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic, consumeRT);continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {continueConsume = false;}}} else {// messageQueue线程锁 上锁失败,稍后重试if (this.processQueue.isDropped) {log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);}}}
这里为什么对messageQueue和processQueue都要加锁?
-  
对messageQueue加本地锁是保证同一时间只有一个线程在消费该队列。
 -  
对processQueue加本地锁是避免在rebalance时,将还在消费中的该队列给移除掉以分配给其他消费者。
 
将二者分别用两把锁锁住,也是为了让消费后的处理(如处理消费结果、触发消费结束的hook等),不阻塞Rebalance。
// org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#tryRemoveOrderMessageQueue// rebalance移除队列private boolean tryRemoveOrderMessageQueue(final MessageQueue mq, final ProcessQueue pq) {try {boolean forceUnlock = pq.isDropped && System.currentTimeMillis > pq.getLastLockTimestamp + UNLOCK_DELAY_TIME_MILLS;// 移除队列前需先获取processQueue本地锁if (forceUnlock || pq.getConsumeLock.writeLock.tryLock(500, TimeUnit.MILLISECONDS)) {try {RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore.persist(mq);RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore.removeOffset(mq);pq.setLocked(false);RebalancePushImpl.this.unlock(mq, true);return true;} finally {if (!forceUnlock) {pq.getConsumeLock.writeLock.unlock;}}} else {pq.incTryUnlockTimes;}} catch (Exception e) {pq.incTryUnlockTimes;}return false;}
思考一个问题
讲DDMQ中的顺序消费前先看一个问题:
以打车流程(叫车 -> 接车 -> 支付)为例,同一个人的流程需要保序,不同人的流程应互不影响。
例如甲乙两个人同时打车,甲的接车需在甲的叫车之后,而乙的接车不应因甲的接车阻塞而阻塞。
这种情况下,如果使用RocketMQ原生的顺序消费该怎么解?
DDMQ中的顺序消费
https://github.com/didi/DDMQ
一张图了解DDMQ如何顺序消费
时序图
源码解读
客户端-拉取与消费
每sleep一段时间(默认5s),就会触发一次向代理端的PULL请求,若拉取到了消息则触发业务消费,业务消费结束后,将消费结果记录在本地。
这个在PULL请求在拉取代理端的同时,会将本地此前已消费的记录放在参数中同步至代理端,然后再将本地记录清除。
// com.xiaojukeji.carrera.consumer.thrift.client.BaseCarreraConsumerPool#startConsume// DDMQ启动消费入口-客户端private void startConsume(BaseMessageProcessor processor, int concurrency, Map// 线程池总数,concurrency是业务设置的线程数,servers.size以及下面的serverCnt是代理节点数int totalThreads = concurrency > 0 ? Math.max(concurrency, servers.size) : 0;for (Integer topicConcurrency : extraConcurrency.values) {totalThreads += topicConcurrency > 0 ? Math.max(topicConcurrency, servers.size) : 0;}if (totalThreads == 0) {throw new RuntimeException("concurrency is too small, at least one for each server.");}// 线程池executorService = Executors.newFixedThreadPool(totalThreads, new ThreadFactoryBuilder.setNameFormat("MessageProcess-%d").build);Collections.shuffle(servers);int serverCnt = servers.size;if (concurrency > 0) {if (concurrencyLOGGER.warn("concurrency({})concurrency = serverCnt;}// 为每个代理节点启动消费,如果指定的线程数大于代理节点数,则会有多个线程同时消费同一代理节点for (int i = 0; iint threadNumber = concurrency / serverCnt;threadNumber += i 1 : 0;if (threadNumber == 0) {LOGGER.warn("no thread for server:{}", servers.get(i));} else {createConsumer(processor, threadNumber, servers.get(i), null);}}}// ... 省略}// com.xiaojukeji.carrera.consumer.thrift.client.BaseCarreraConsumerPool#createConsumer(com.xiaojukeji.carrera.consumer.thrift.client.BaseMessageProcessor, int, com.xiaojukeji.carrera.consumer.thrift.client.node.Node, java.lang.String)// 创建消费者private void createConsumer(final BaseMessageProcessor processor, int consumerNumber,Node server, String topic) {for (int i = 0; iCarreraConfig newConfig = config.clone;newConfig.setServers(server.toStrStyle);final BaseCarreraConsumer consumer = createConsumer(newConfig, topic);consumer.setType(getConsumeType);if (!consumerMap.containsKey(server)) {consumerMap.put(server, new ConcurrentLinkedQueue}consumerMap.get(server).add(consumer);executorService.execute(new Runnable {@Overridepublic void run {try {// 启动消费consumer.startConsume(processor);} catch (InterruptedException e) {if (consumer.isStopped) {LOGGER.info("consumer finished!");} else {LOGGER.error("consumer is interrupted!", e);}}}});}}// com.xiaojukeji.carrera.consumer.thrift.client.BaseCarreraConsumer#startConsume// 启动消费public void startConsume(BaseMessageProcessor processor) throws InterruptedException {// init;isRunning = true;LOGGER.info("start consume group:{},server:{},topic:{}", config.getGroupId, config.getServers, topic);try {while (isRunning) {// 每隔一段时间拉取一次消息,有新的消息则触发消费,没有则sleep一段时间// 但这里的拉取和消费都没有上本地锁RES response = pullMessage;if (response == null) { //no new messagedoRetrySleep;} else {processResponse(response, processor);}}} finally {close;}LOGGER.info("consume group[{}] finished!", config.getGroupId);}// ----------------------- 拉取消息// com.xiaojukeji.carrera.consumer.thrift.client.SimpleCarreraConsumer#doPullMessageprotected PullResponse doPullMessage throws TException {// 这里会将之前已成功消费但还未submit的消息拼接成一个链表放到请求入参中ConsumeResult curResult = buildConsumeResult;try {request.setResult(curResult);// 向代理端发起PULL请求PullResponse response = client.pull(request);// 然后将本地已消费成功但还未submit的记录清空clearResult(curResult);LOGGER.debug("Client Request for {}:{}, Response:{}", host, request, response);if (response == null || response.getMessages == null || response.getMessages.size == 0) {LOGGER.debug("retry in {}ms, response={}", config.getRetryInterval, response);return null;} else {return response;}} catch (PullException e) {LOGGER.error("pull exception, code={}, message={}", e.code, e.message);}return null;}// com.xiaojukeji.carrera.consumer.thrift.client.SimpleCarreraConsumer#buildConsumeResult// 将之前已成功消费但还未submit的消息拼接成一个链表private ConsumeResult buildConsumeResult {ConsumeResult ret = null;if (resultMap == null) {return ret;}for (ConsumeResult r : resultMap.values) {if (r.getFailOffsetsSize > 0 || r.getSuccessOffsetsSize > 0) {r.nextResult = ret;ret = r;}}return ret;}// com.xiaojukeji.carrera.consumer.thrift.client.SimpleCarreraConsumer#clearResult// 记录清空private void clearResult(ConsumeResult result) {for (ConsumeResult r = result; r != null; r = r.nextResult) {r.getFailOffsets.clear;r.getSuccessOffsets.clear;}}// -----------------------// ----------------------- 消费消息// com.xiaojukeji.carrera.consumer.thrift.client.SimpleCarreraConsumer#doProcessMessage(com.xiaojukeji.carrera.consumer.thrift.PullResponse, com.xiaojukeji.carrera.consumer.thrift.client.MessageProcessor)protected void doProcessMessage(PullResponse response, MessageProcessor processor) {Context context = response.getContext;// 遍历从代理端拉取到的消息for (Message msg : response.getMessages) {MessageProcessor.Result pResult = MessageProcessor.Result.FAIL;try {// 触发业务消费pResult = processor.process(msg, context);LOGGER.debug("ProcessResult:{},msg.key={},group={},topic={},qid={},offset={}", pResult,msg.getKey, context.getGroupId, context.getTopic, context.getQid, msg.getOffset);} catch (Throwable e) {LOGGER.error("exception when processing message, msg=" + msg + ",context=" + context, e);}// 记录消费结果switch (pResult) {case SUCCESS:ack(context, msg.getOffset);break;case FAIL:fail(context, msg.getOffset);break;}}}// com.xiaojukeji.carrera.consumer.thrift.client.SimpleCarreraConsumer#ack// 记录成功消费结果,仅仅只是写在本地,并没有直接响应代理端public synchronized void ack(Context context, long offset) {getResult(context).getSuccessOffsets.add(offset);}// com.xiaojukeji.carrera.consumer.thrift.client.SimpleCarreraConsumer#fail// 记录失败消费结果,仅仅只是写在本地,并没有直接响应代理端public synchronized void fail(Context context, long offset) {getResult(context).getFailOffsets.add(offset);}// -----------------------
代理端-消费
消费代理端与Broker是使用了RocketMQ原生的顺序消费,代理端通过队列和线程锁的方式保证客户端的顺序消费。
// com.xiaojukeji.carrera.cproxy.consumer.AbstractCarreraRocketMqConsumer#setup// DDMQ顺序消费-代理端消费Broker消息(非业务消费)private void setup(DefaultMQPushConsumer rmqConsumer, RocketMQBaseConfig config, GroupConfig groupConfig) {// ... 省略// 此处也是用了RocketMQ的顺序消费 MessageListenerOrderlyrmqConsumer.setMessageListener((MessageListenerOrderly) (msgs, context) -> {if (autoCommit) {context.setAutoCommit(false);}// 消费消息,这里的消费指的是代理端的消费,而非客户端的消费return consumeRocketMQMessages(msgs, context.getMessageQueue);});// ... 省略}// com.xiaojukeji.carrera.cproxy.consumer.CarreraConsumer#processpublic void process(CommonMessage message, ConsumeContext context, ResultCallBack resultCallBack) {if (upstreamTopicMap.containsKey(message.getTopic)) {// 创建一个UpstreamJob实例,既是消息也是任务UpstreamJob job = new UpstreamJob(this, upstreamTopicMap.get(message.getTopic), message, context, resultCallBack);workingJobs.add(job);job.registerJobFinishedCallback(this::onJobFinish);// 执行消费job.execute;} else {resultCallBack.setResult(true);}}// com.xiaojukeji.carrera.cproxy.consumer.UpstreamJob#executepublic void execute {// ... 省略// 会遍历所配置的Actionif (actionIndex == CollectionUtils.size(getActions)) {onFinished(true);return;}String actionName = getActions.get(actionIndex++);state = actionName;if (LOGGER.isTraceEnabled) {LOGGER.trace("job executing... {} actionIndex={}, act={}, thread={}", info, actionIndex - 1, actionName, Thread.currentThread);}Action action = actionMap.get(actionName);if (action == null) {LOGGER.error("wrong act: {}", actionName);onFinished(false);return;}if (isTerminated) {LOGGER.info("job is terminated! job={}", info);terminate;return;}Action.Status status;try {// 执行消费status = action.act(this);} catch (Throwable e) {LOGGER.error("unexpected err, job=" + info, e);onFinished(false);return;}switch (status) {case FAIL:LOGGER.error("execute error,job={}", info);onFinished(false);break;case FINISH:// 完成或失败时会触发回调onFinished(true);break;case ASYNCHRONIZED:// 异步直接结束break;case CONTINUE:// 若未结束会递归继续遍历后面的Actionexecute;}}// ---------------------------- OrderAction// com.xiaojukeji.carrera.cproxy.actions.OrderAction#actpublic Action.Status act(UpstreamJob job) {String orderKey = job.getUpstreamTopic.getOrderKey;if (StringUtils.isNotBlank(orderKey)) {Object orderValue = null;// DDMQ顺序消费的三种保序依据:QID、MsgKey、JsonPathif (ORDER_BY_QID.equals(orderKey)) {orderValue = job.getTopic + job.getQid;} else if (ORDER_BY_KEY.equals(orderKey)) {orderValue = job.getMsgKey;} else if (job.getData instanceof JSONObject) {try {orderValue = JsonUtils.getValueByPath((JSONObject) job.getData, orderKey);} catch (Exception e) {LogUtils.logErrorInfo("Order_error",String.format("Get orderKey Exception! orderKey=%s, job=%s", orderKey, job.info), e);}}return async(job, orderValue);} else {return async(job, null);}}// com.xiaojukeji.carrera.cproxy.actions.OrderAction#asyncprivate Action.Status async(UpstreamJob job, Object orderValue) {// 设置保序依据if (orderValue != null) {job.setOrderId(orderValue.hashCode);}try {// 提交消息executor.submit(job);return Status.ASYNCHRONIZED;} catch (InterruptedException ignored) {Thread.currentThread.interrupt;return Status.FAIL;}}// com.xiaojukeji.carrera.cproxy.actions.util.UpstreamJobExecutorPool#submitpublic void submit(UpstreamJob job) throws InterruptedException {// 注册消息完成时的回调job.registerJobFinishedCallback(this::onJobFinished);// 提交消息queue.submit(job);if (!useBackgroundThread) {queue.processNextMessage;}}// com.xiaojukeji.carrera.cproxy.actions.util.UpstreamJobBlockingQueue#submitpublic void submit(UpstreamJob job) throws InterruptedException {job.setState("Async.InMainQueue");jobSize.incrementAndGet;// 消息放入UpstreamJobBlockingQueue的mainQueue中mainQueue.add(job);// readyJobs是一个Semaphore,用于触发对mainQueue的处理readyJobs.release;}// ----------------------------
代理端-处理消息
// com.xiaojukeji.carrera.cproxy.actions.util.UpstreamJobExecutorPool.WorkerThread#run// DDMQ顺序消费-代理端处理消息public void run {LOGGER.info("Thread {} started.", getName);while (running) {// 线程不断拉取UpstreamJobBlockingQueue中的消息UpstreamJob job;try {// 这个queue是UpstreamJobBlockingQueuejob = queue.poll;} catch (InterruptedException e) {LOGGER.info("worker thread {} is interrupted", getName);break;}assert job != null;activeThreadNumber.incrementAndGet;job.setWorkerId(workerId);try {// 消息处理,也就是上面的com.xiaojukeji.carrera.cproxy.consumer.UpstreamJob#execute// 也就是继续遍历Actionjob.execute;} catch (Exception e) {LogUtils.logErrorInfo("worker_running_error", "worker running error", e);}activeThreadNumber.decrementAndGet;}LOGGER.info("Thread {} finished. job after shutdown, group={}, queue.info={}",getName, group, queue.info);}// com.xiaojukeji.carrera.cproxy.actions.util.UpstreamJobBlockingQueue#poll// 处理从Broker拉取到的消息public UpstreamJob poll throws InterruptedException {while (true) {// 等待唤醒readyJobs.acquire;// 处理消息UpstreamJob job = fetchJob;if (job != null) {return job;}}}// com.xiaojukeji.carrera.cproxy.actions.util.UpstreamJobBlockingQueue#fetchJob// 对mainQueue的处理private UpstreamJob fetchJob {// 优先取reActivationQueue中的消息,这个reActivationQueue是在消费结束后的回调中会放入当前顺序消息的下一个消息UpstreamJob job = reActivationQueue.poll;if (job != null) {putInWorkingQueue(job);return job;}ReentrantLock orderLock;Integer orderId;// mainQueue上锁mainQueueLock.lock;try {// 消费mainQueue中的消息job = mainQueue.poll;// 取出消息中的保序依据orderId = job.getOrderId;if (orderId == null) {// 若没有保序依据则为普通消费,直接返回,返回后会触发执行(com.xiaojukeji.carrera.cproxy.consumer.UpstreamJob#execute)if (LOGGER.isTraceEnabled) {LOGGER.trace("job is out of mainQueue: job={}, no orderId", job.info);}// 这个workingQueue是仅用于日志的,可忽略不看putInWorkingQueue(job);return job;}// 若有保序依据则先上顺序锁orderLock = getLocks(orderId);orderLock.lock;} finally {// 释放mainQueue锁// 这里因为已经上了顺序锁了,所以可以释放掉mainQueue了// 目的是不阻塞后续其他线程对mainQueue的处理,这也就是前面甲乙二人打车例子实现的关键mainQueueLock.unlock;}try {// 根据保序依据取出排队链表(dependentJob有一个next字段来构成链表以代表顺序),这里返回的是tailUpstreamJob dependentJob = jobOrderMap.putIfAbsent(orderId, job);if (dependentJob == null) {// 没有数据代表该消息此前的顺序消息没有或已被消费,直接返回if (LOGGER.isTraceEnabled) {LOGGER.trace("job is out of mainQueue: job={}, no dependent job, orderId={}", job.info, orderId);}putInWorkingQueue(job);return job;}// 非常重要的一步,对消息按序拼接,当前消息设为新的tailassert dependentJob.getNext == null;dependentJob.setNextIfNull(job);if (LOGGER.isDebugEnabled) {LOGGER.debug("job is out of mainQueue: job={}, enter jobOrderMap, orderId={}, dependent job={}", job.info, orderId, dependentJob.info);}jobOrderMap.put(orderId, job);} finally {// 释放顺序锁orderLock.unlock;}return null;}
代理端-处理客户端拉取请求
// com.xiaojukeji.carrera.cproxy.actions.PullServerBufferAction#act// DDMQ客户端拉取消息-代理端// 这里是上面execute会遍历到的一个Actionpublic Status act(UpstreamJob job) {job.setState("PullSvr.InBuffer");// 放入PullBuffer中buffer.offer(job);return Status.ASYNCHRONIZED;}// com.xiaojukeji.carrera.cproxy.actions.PullServerBufferAction#PullServerBufferActionpublic PullServerBufferAction(ConsumerGroupConfig config) {this.config = config;// PullBuffer初始化buffer = ConsumerServiceImpl.getInstance.register(config);}// com.xiaojukeji.carrera.cproxy.server.ConsumerServiceImpl#registerpublic PullBuffer register(ConsumerGroupConfig config) {// PullBuffer会放入一个bufferMap中,key为消费者组PullBuffer buffer = bufferMap.computeIfAbsent(config.getGroup, groupId -> {PullBuffer newBuffer = new PullBuffer(groupId, pullScheduler);pullScheduler.scheduleAtFixedRate(newBuffer::recoverTimeoutMessage, 2000, 100, TimeUnit.MILLISECONDS);pullScheduler.scheduleAtFixedRate(newBuffer::cleanWaitQueue, 2000, 5000, TimeUnit.MILLISECONDS);return newBuffer;});buffer.addClusterConfig(config);return buffer;}// com.xiaojukeji.carrera.cproxy.server.ConsumerServiceImpl#pull// 该方法是最上面客户端代码中请求代理端拉取消息时,代理端的逻辑public void pull(PullRequest request, AsyncMethodCallback resultHandler) {MetricUtils.incPullStatCount(request.getGroupId, request.getTopic, null, "request");String group = request.getGroupId;consumerManager.tryCreateConsumer(group);// 若客户端的请求参数中result非null,则代表客户端向代理端同步消费进度if (request.getResult != null) {doSubmit(request.getResult);}Context context = new Context;context.setGroupId(request.getGroupId);context.setTopic(request.getTopic);// 根据消费者组从bufferMap中拉取消息返回,其中如果是广播消费,这里的groupId还会拼上具体客户端实例PullBuffer buffer = bufferMap.get(request.getGroupId);if (buffer == null) {responsePull(request, resultHandler, context, Collections.emptyList);return;}Listif (CollectionUtils.isEmpty(messages)) {// ... 省略} else {// 返回message给客户端responsePull(request, resultHandler, context, messages);}}// com.xiaojukeji.carrera.cproxy.server.ConsumerServiceImpl#doSubmit// 遍历处理客户端消费进度private boolean doSubmit(ConsumeResult consumeResult) {LOGGER.debug("submit={},client={}", consumeResult, getClientAddress);// 遍历链表,这个链表是客户端拼接带来的for (ConsumeResult r = consumeResult; r != null; r = r.nextResult) {PullBuffer buffer = bufferMap.get(consumeResult.getContext.getGroupId);if (buffer == null) continue;// 处理消费结果buffer.processResult(r);}return true;}// com.xiaojukeji.carrera.cproxy.actions.util.UpstreamJobBuffer#processResult// 处理客户端消费结果public synchronized ListList// 消费成功的if (CollectionUtils.isNotEmpty(result.getSuccessOffsets)) {for (Long offset : result.getSuccessOffsets) {UpstreamJob job = workingJobs.remove(offset);if (job != null) {MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.SUCCESS);MetricUtils.pullAckLatencyMetric(job, TimeUtils.getElapseTime(job.getPullTimestamp));// 触发消息完结回调job.onFinished(true); //success} else {if (nonExistsOffset == null) {nonExistsOffset = new ArrayList;}nonExistsOffset.add(offset);}}}// 消费失败的if (CollectionUtils.isNotEmpty(result.getFailOffsets)) {for (Long offset : result.getFailOffsets) {UpstreamJob job = workingJobs.remove(offset);if (job != null) {MetricUtils.pullAckLatencyMetric(job, TimeUtils.getElapseTime(job.getPullTimestamp));int delay = job.nextRetryDelay;if (delay >= 0) {scheduler.schedule( -> this.offer(job), delay, TimeUnit.MILLISECONDS);} else {// 这里dropJob也会触发消费完结的回调dropJob(job);}} else {if (nonExistsOffset == null) {nonExistsOffset = new ArrayList;}nonExistsOffset.add(offset);}}}return nonExistsOffset == null ? Collections.emptyList : nonExistsOffset;}// com.xiaojukeji.carrera.cproxy.actions.util.UpstreamJobBuffer#dropJobprivate void dropJob(UpstreamJob job) {MetricUtils.qpsAndFilterMetric(job, MetricUtils.ConsumeResult.FAILURE);LOGGER.warn("drop Job:{},errRetry={},retryIdx={}", job, job.getErrorRetryCnt, job.getRetryIdx);DROP_LOGGER.info("beyondErrorRetry,job={},errRetry={},retryIdx={}",job, job.getErrorRetryCnt, job.getRetryIdx);// 触发消息完结回调job.onFinished(true); //do not sendBack to rmq.}
代理端-消息完结回调
客户端消费完结后, 触发代理端的回调,将顺序消息链表中的下一个消息放入queue中。
// com.xiaojukeji.carrera.cproxy.actions.util.UpstreamJobBlockingQueue#onJobFinished// DDMQ顺序消费-消费完结回调public void onJobFinished(UpstreamJob job) {if (job == null) {LogUtils.logErrorInfo("JobFinished_error","onJobFinished error: job is null");return;}try {if (!removeWorkingQueue(job)) {LogUtils.logErrorInfo("remove_job_from_workingJobs","remove job from workingJobs failed, job={}", job.info);}Integer orderId = job.getOrderId;if (orderId != null) {// 根据保序依据上顺序锁ReentrantLock orderLock = getLocks(orderId);orderLock.lock;try {UpstreamJob nextJob = job.getNext;if (nextJob == null) {// ... 省略} else {if (LOGGER.isTraceEnabled) {LOGGER.trace("onJobFinished: job={}, orderId={}, size={}, has next:{}",job.info, orderId, getSize, nextJob.info);}// 将链表中的下一个消息放入reActivationQueuenextJob.setState("Async.InReActivationQueue");reActivationQueue.offer(nextJob);// 唤醒消息处理 (com.xiaojukeji.carrera.cproxy.actions.util.UpstreamJobBlockingQueue#fetchJob)readyJobs.release;}} finally {orderLock.unlock;}} else {if (LOGGER.isTraceEnabled) {LOGGER.trace("onJobFinished: job={}, size={}, no orderId.", job.info, getSize);}}} finally {// 记录消费进度jobSize.decrementAndGet;}}
流程图梳理
思考一个问题
为什么RocketMQ需要使用Broker端分布式锁加客户端本地线程锁来实现顺序消费,而DDMQ只需要代理端本地线程锁即可呢?
-  
不需要Broker端分布式锁:因为代理端是单节点,不像RocketMQ中的Broker有主备及读写分离,所以代理端使用线程锁即可。
 -  
不需要客户端线程锁:因为客户端能够拉取到的消息,一定是无顺序关系的,所以也不需要加锁了。
 
RocketMQ与DDMQ顺序消费对比
猜你喜欢
- 2025-09-18 GPU集群扩展:Ray Serve与Celery的技术选型与应用场景分析
 - 2025-09-18 【不背八股】2.操作系统-进程、线程、协程的基本理解
 - 2025-09-18 两张图看透Android Handler使用与机制
 - 2025-09-18 Spring Boot 3.x 日志配置与 Logback 集成指南
 - 2025-09-18 解锁C++异步之力:高效并发编程指南
 - 2025-09-18 Flutter框架分析(八)-Platform Channel
 - 2025-09-18 原来你是这样打印日志的,怪不得天天背锅……
 - 2025-09-18 .NET Aspire 9.4 发布了 CLI GA、交互式仪表板和高级部署功能
 - 2025-09-18 27.8K!一把梭 LLM:LiteLLM 带你用一套接口召唤 100+ 大模型
 - 2025-09-18 Rust异步编程神器:用Tokio轻松创建定时任务
 
- 最近发表
 - 
- 聊一下 gRPC 的 C++ 异步编程_grpc 异步流模式
 - [原创首发]安全日志管理中心实战(3)——开源NIDS之suricata部署
 - 超详细手把手搭建在ubuntu系统的FFmpeg环境
 - Nginx运维之路(Docker多段构建新版本并增加第三方模
 - 92.1K小星星,一款开源免费的远程桌面,让你告别付费远程控制!
 - Go 人脸识别教程_piwigo人脸识别
 - 安卓手机安装Termux——搭建移动服务器
 - ubuntu 安装开发环境(c/c++ 15)_ubuntu安装c++编译器
 - Rust开发环境搭建指南:从安装到镜像配置的零坑实践
 - Windows系统安装VirtualBox构造本地Linux开发环境
 
 
- 标签列表
 - 
- cmd/c (90)
 - c++中::是什么意思 (84)
 - 标签用于 (71)
 - 主键只能有一个吗 (77)
 - c#console.writeline不显示 (95)
 - pythoncase语句 (88)
 - es6includes (74)
 - sqlset (76)
 - apt-getinstall-y (100)
 - node_modules怎么生成 (87)
 - chromepost (71)
 - flexdirection (73)
 - c++int转char (80)
 - mysqlany_value (79)
 - static函数和普通函数 (84)
 - el-date-picker开始日期早于结束日期 (76)
 - js判断是否是json字符串 (75)
 - c语言min函数头文件 (77)
 - asynccallback (87)
 - localstorage.removeitem (77)
 - vector线程安全吗 (73)
 - java (73)
 - js数组插入 (83)
 - mac安装java (72)
 - 无效的列索引 (74)
 
 
