序
本文主要研究一下artemis ClientConsumer的handleRegularMessage
handleRegularMessage
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
public final class ClientConsumerImpl implements ClientConsumerInternal {? ? //......? ? private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);? ? private final Runner runner = new Runner();? ? private volatile MessageHandler handler;? ? //......? ? private void handleRegularMessage(ClientMessageInternal message) { ? ? if (message.getAddress() == null) { ? ? ? ? message.setAddress(queueInfo.getAddress()); ? ? }? ? ? message.onReceipt(this);? ? ? if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { ? ? ? ? // We have messages of different priorities so we need to ack them individually since the order ? ? ? ? // of them in the ServerConsumerImpl delivery list might not be the same as the order they are ? ? ? ? // consumed in, which means that acking all up to won't work ? ? ? ? ackIndividually = true; ? ? }? ? ? // Add it to the buffer ? ? buffer.addTail(message, message.getPriority());? ? ? if (handler != null) { ? ? ? ? // Execute using executor ? ? ? ? if (!stopped) { ? ? ? ? ? queueExecutor(); ? ? ? ? } ? ? } else { ? ? ? ? notify(); ? ? } ? }? ? private void queueExecutor() { ? ? if (logger.isTraceEnabled()) { ? ? ? ? logger.trace(this + "::Adding Runner on Executor for delivery"); ? ? }? ? ? sessionExecutor.execute(runner); ? }? ? //......}
- ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner
Runner
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
public final class ClientConsumerImpl implements ClientConsumerInternal {? ? //......? ? private class Runner implements Runnable {? ? ? @Override ? ? public void run() { ? ? ? ? try { ? ? ? ? ? callOnMessage(); ? ? ? ? } catch (Exception e) { ? ? ? ? ? ActiveMQClientLogger.LOGGER.onMessageError(e);? ? ? ? ? ? lastException = e; ? ? ? ? } ? ? } ? }? ? private void callOnMessage() throws Exception { ? ? if (closing || stopped) { ? ? ? ? return; ? ? }? ? ? session.workDone();? ? ? // We pull the message from the buffer from inside the Runnable so we can ensure priority ? ? // ordering. If we just added a Runnable with the message to the executor immediately as we get it ? ? // we could not do that? ? ? ClientMessageInternal message;? ? ? // Must store handler in local variable since might get set to null ? ? // otherwise while this is executing and give NPE when calling onMessage ? ? MessageHandler theHandler = handler;? ? ? if (theHandler != null) { ? ? ? ? if (rateLimiter != null) { ? ? ? ? ? rateLimiter.limit(); ? ? ? ? }? ? ? ? ? failedOver = false;? ? ? ? ? synchronized (this) { ? ? ? ? ? message = buffer.poll(); ? ? ? ? }? ? ? ? ? if (message != null) { ? ? ? ? ? if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { ? ? ? ? ? ? ? //Ignore, this could be a relic from a previous receiveImmediate(); ? ? ? ? ? ? ? return; ? ? ? ? ? }? ? ? ? ? ? boolean expired = message.isExpired();? ? ? ? ? ? flowControlBeforeConsumption(message);? ? ? ? ? ? if (!expired) { ? ? ? ? ? ? ? if (logger.isTraceEnabled()) { ? ? ? ? ? ? ? ? logger.trace(this + "::Calling handler.onMessage"); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { ? ? ? ? ? ? ? ? @Override ? ? ? ? ? ? ? ? public ClassLoader run() { ? ? ? ? ? ? ? ? ? ? ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();? ? ? ? ? ? ? ? ? ? ? Thread.currentThread().setContextClassLoader(contextClassLoader);? ? ? ? ? ? ? ? ? ? ? return originalLoader; ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? });? ? ? ? ? ? ? ? onMessageThread = Thread.currentThread(); ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? theHandler.onMessage(message); ? ? ? ? ? ? ? } finally { ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? AccessController.doPrivileged(new PrivilegedAction<Object>() { ? ? ? ? ? ? ? ? ? ? ? @Override ? ? ? ? ? ? ? ? ? ? ? public Object run() { ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.currentThread().setContextClassLoader(originalLoader); ? ? ? ? ? ? ? ? ? ? ? ? ? return null; ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? }); ? ? ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? ? ? ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e); ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? onMessageThread = null; ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? if (logger.isTraceEnabled()) { ? ? ? ? ? ? ? ? logger.trace(this + "::Handler.onMessage done"); ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? if (message.isLargeMessage()) { ? ? ? ? ? ? ? ? message.discardBody(); ? ? ? ? ? ? ? } ? ? ? ? ? } else { ? ? ? ? ? ? ? session.expire(this, message); ? ? ? ? ? }? ? ? ? ? ? // If slow consumer, we need to send 1 credit to make sure we get another message ? ? ? ? ? if (clientWindowSize == 0) { ? ? ? ? ? ? ? startSlowConsumer(); ? ? ? ? ? } ? ? ? ? } ? ? } ? }? ? private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException { ? ? // Chunk messages will execute the flow control while receiving the chunks ? ? if (message.getFlowControlSize() != 0) { ? ? ? ? // on large messages we should discount 1 on the first packets as we need continuity until the last packet ? ? ? ? flowControl(message.getFlowControlSize(), !message.isLargeMessage()); ? ? } ? }? ? public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException { ? ? if (clientWindowSize >= 0) { ? ? ? ? creditsToSend += messageBytes;? ? ? ? ? if (creditsToSend >= clientWindowSize) { ? ? ? ? ? if (clientWindowSize == 0 && discountSlowConsumer) { ? ? ? ? ? ? ? if (logger.isTraceEnabled()) { ? ? ? ? ? ? ? ? logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer"); ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be ? ? ? ? ? ? ? // always buffering one after received the first message ? ? ? ? ? ? ? final int credits = creditsToSend - 1;? ? ? ? ? ? ? ? creditsToSend = 0;? ? ? ? ? ? ? ? if (credits > 0) { ? ? ? ? ? ? ? ? sendCredits(credits); ? ? ? ? ? ? ? } ? ? ? ? ? } else { ? ? ? ? ? ? ? if (logger.isDebugEnabled()) { ? ? ? ? ? ? ? ? logger.debug("Sending " + messageBytes + " from flow-control"); ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? final int credits = creditsToSend;? ? ? ? ? ? ? ? creditsToSend = 0;? ? ? ? ? ? ? ? if (credits > 0) { ? ? ? ? ? ? ? ? sendCredits(credits); ? ? ? ? ? ? ? } ? ? ? ? ? } ? ? ? ? } ? ? } ? }?? ? //......} ?
- Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal,若不为null,则执行flowControlBeforeConsumption(message),对于非expired的会执行theHandler.onMessage(message)方法;对于clientWindowSize为0的则执行startSlowConsumer();flowControlBeforeConsumption方法会执行flowControl方法,该方法会计算credits,然后执行sendCredits(credits)
小结
ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner;Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal进行处理
doc
- ClientConsumerImpl