优秀的编程知识分享平台

网站首页 > 技术文章 正文

聊聊artemis ClientConsumer的handleRegularMessage

nanyue 2024-07-31 12:06:55 技术文章 9 ℃

本文主要研究一下artemis ClientConsumer的handleRegularMessage



handleRegularMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {? ? //......? ? private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);? ? private final Runner runner = new Runner();? ? private volatile MessageHandler handler;? ? //......? ? private void handleRegularMessage(ClientMessageInternal message) { ? ?  if (message.getAddress() == null) { ? ? ? ? message.setAddress(queueInfo.getAddress()); ? ?  }? ? ?  message.onReceipt(this);? ? ?  if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { ? ? ? ? // We have messages of different priorities so we need to ack them individually since the order ? ? ? ? // of them in the ServerConsumerImpl delivery list might not be the same as the order they are ? ? ? ? // consumed in, which means that acking all up to won't work ? ? ? ? ackIndividually = true; ? ?  }? ? ?  // Add it to the buffer ? ?  buffer.addTail(message, message.getPriority());? ? ?  if (handler != null) { ? ? ? ? // Execute using executor ? ? ? ? if (!stopped) { ? ? ? ? ?  queueExecutor(); ? ? ? ? } ? ?  } else { ? ? ? ? notify(); ? ?  } ? }? ? private void queueExecutor() { ? ?  if (logger.isTraceEnabled()) { ? ? ? ? logger.trace(this + "::Adding Runner on Executor for delivery"); ? ?  }? ? ?  sessionExecutor.execute(runner); ? }? ? //......}
  • ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner

Runner

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {? ? //......? ? private class Runner implements Runnable {? ? ?  @Override ? ?  public void run() { ? ? ? ? try { ? ? ? ? ?  callOnMessage(); ? ? ? ? } catch (Exception e) { ? ? ? ? ?  ActiveMQClientLogger.LOGGER.onMessageError(e);? ? ? ? ? ?  lastException = e; ? ? ? ? } ? ?  } ? }? ? private void callOnMessage() throws Exception { ? ?  if (closing || stopped) { ? ? ? ? return; ? ?  }? ? ?  session.workDone();? ? ?  // We pull the message from the buffer from inside the Runnable so we can ensure priority ? ?  // ordering. If we just added a Runnable with the message to the executor immediately as we get it ? ?  // we could not do that? ? ?  ClientMessageInternal message;? ? ?  // Must store handler in local variable since might get set to null ? ?  // otherwise while this is executing and give NPE when calling onMessage ? ?  MessageHandler theHandler = handler;? ? ?  if (theHandler != null) { ? ? ? ? if (rateLimiter != null) { ? ? ? ? ?  rateLimiter.limit(); ? ? ? ? }? ? ? ? ? failedOver = false;? ? ? ? ? synchronized (this) { ? ? ? ? ?  message = buffer.poll(); ? ? ? ? }? ? ? ? ? if (message != null) { ? ? ? ? ?  if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { ? ? ? ? ? ? ? //Ignore, this could be a relic from a previous receiveImmediate(); ? ? ? ? ? ? ? return; ? ? ? ? ?  }? ? ? ? ? ?  boolean expired = message.isExpired();? ? ? ? ? ?  flowControlBeforeConsumption(message);? ? ? ? ? ?  if (!expired) { ? ? ? ? ? ? ? if (logger.isTraceEnabled()) { ? ? ? ? ? ? ? ?  logger.trace(this + "::Calling handler.onMessage"); ? ? ? ? ? ? ? } ? ? ? ? ? ? ? final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { ? ? ? ? ? ? ? ?  @Override ? ? ? ? ? ? ? ?  public ClassLoader run() { ? ? ? ? ? ? ? ? ? ? ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();? ? ? ? ? ? ? ? ? ? ? Thread.currentThread().setContextClassLoader(contextClassLoader);? ? ? ? ? ? ? ? ? ? ? return originalLoader; ? ? ? ? ? ? ? ?  } ? ? ? ? ? ? ? });? ? ? ? ? ? ? ? onMessageThread = Thread.currentThread(); ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ?  theHandler.onMessage(message); ? ? ? ? ? ? ? } finally { ? ? ? ? ? ? ? ?  try { ? ? ? ? ? ? ? ? ? ? AccessController.doPrivileged(new PrivilegedAction<Object>() { ? ? ? ? ? ? ? ? ? ? ?  @Override ? ? ? ? ? ? ? ? ? ? ?  public Object run() { ? ? ? ? ? ? ? ? ? ? ? ? ? Thread.currentThread().setContextClassLoader(originalLoader); ? ? ? ? ? ? ? ? ? ? ? ? ? return null; ? ? ? ? ? ? ? ? ? ? ?  } ? ? ? ? ? ? ? ? ? ? }); ? ? ? ? ? ? ? ?  } catch (Exception e) { ? ? ? ? ? ? ? ? ? ? ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e); ? ? ? ? ? ? ? ?  }? ? ? ? ? ? ? ? ?  onMessageThread = null; ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? if (logger.isTraceEnabled()) { ? ? ? ? ? ? ? ?  logger.trace(this + "::Handler.onMessage done"); ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? if (message.isLargeMessage()) { ? ? ? ? ? ? ? ?  message.discardBody(); ? ? ? ? ? ? ? } ? ? ? ? ?  } else { ? ? ? ? ? ? ? session.expire(this, message); ? ? ? ? ?  }? ? ? ? ? ?  // If slow consumer, we need to send 1 credit to make sure we get another message ? ? ? ? ?  if (clientWindowSize == 0) { ? ? ? ? ? ? ? startSlowConsumer(); ? ? ? ? ?  } ? ? ? ? } ? ?  } ? }? ? private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException { ? ?  // Chunk messages will execute the flow control while receiving the chunks ? ?  if (message.getFlowControlSize() != 0) { ? ? ? ? // on large messages we should discount 1 on the first packets as we need continuity until the last packet ? ? ? ? flowControl(message.getFlowControlSize(), !message.isLargeMessage()); ? ?  } ? }? ? public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException { ? ?  if (clientWindowSize >= 0) { ? ? ? ? creditsToSend += messageBytes;? ? ? ? ? if (creditsToSend >= clientWindowSize) { ? ? ? ? ?  if (clientWindowSize == 0 && discountSlowConsumer) { ? ? ? ? ? ? ? if (logger.isTraceEnabled()) { ? ? ? ? ? ? ? ?  logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer"); ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be ? ? ? ? ? ? ? // always buffering one after received the first message ? ? ? ? ? ? ? final int credits = creditsToSend - 1;? ? ? ? ? ? ? ? creditsToSend = 0;? ? ? ? ? ? ? ? if (credits > 0) { ? ? ? ? ? ? ? ?  sendCredits(credits); ? ? ? ? ? ? ? } ? ? ? ? ?  } else { ? ? ? ? ? ? ? if (logger.isDebugEnabled()) { ? ? ? ? ? ? ? ?  logger.debug("Sending " + messageBytes + " from flow-control"); ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? final int credits = creditsToSend;? ? ? ? ? ? ? ? creditsToSend = 0;? ? ? ? ? ? ? ? if (credits > 0) { ? ? ? ? ? ? ? ?  sendCredits(credits); ? ? ? ? ? ? ? } ? ? ? ? ?  } ? ? ? ? } ? ?  } ? }?? ? //......} ? 
  • Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal,若不为null,则执行flowControlBeforeConsumption(message),对于非expired的会执行theHandler.onMessage(message)方法;对于clientWindowSize为0的则执行startSlowConsumer();flowControlBeforeConsumption方法会执行flowControl方法,该方法会计算credits,然后执行sendCredits(credits)

小结

ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner;Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal进行处理

doc

  • ClientConsumerImpl
最近发表
标签列表