优秀的编程知识分享平台

网站首页 > 技术文章 正文

聊聊rocketmq的ExtProducerResetConfiguration

nanyue 2024-09-21 20:00:49 技术文章 5 ℃

本文主要研究一下rocketmq的ExtProducerResetConfiguration

ExtProducerResetConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java

@Configuration
public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
 private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class);
?
 private ConfigurableApplicationContext applicationContext;
?
 private StandardEnvironment environment;
?
 private RocketMQProperties rocketMQProperties;
?
 private ObjectMapper objectMapper;
?
 public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,
 StandardEnvironment environment,
 RocketMQProperties rocketMQProperties) {
 this.objectMapper = rocketMQMessageObjectMapper;
 this.environment = environment;
 this.rocketMQProperties = rocketMQProperties;
 }
?
 @Override
 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
 this.applicationContext = (ConfigurableApplicationContext) applicationContext;
 }
?
 @Override
 public void afterSingletonsInstantiated() {
 Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);
?
 if (Objects.nonNull(beans)) {
 beans.forEach(this::registerTemplate);
 }
 }
?
 private void registerTemplate(String beanName, Object bean) {
 Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
?
 if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
 throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
 }
?
 ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
 GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
 validate(annotation, genericApplicationContext);
?
 DefaultMQProducer mqProducer = createProducer(annotation);
 // Set instanceName same as the beanName
 mqProducer.setInstanceName(beanName);
 try {
 mqProducer.start();
 } catch (MQClientException e) {
 throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
 beanName), e);
 }
 RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
 rocketMQTemplate.setProducer(mqProducer);
 rocketMQTemplate.setObjectMapper(objectMapper);
?
?
 log.info("Set real producer to :{} {}", beanName, annotation.value());
 }
?
 private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {
 DefaultMQProducer producer = null;
?
 RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
 if (producerConfig == null) {
 producerConfig = new RocketMQProperties.Producer();
 }
 String nameServer = environment.resolvePlaceholders(annotation.nameServer());
 String groupName = environment.resolvePlaceholders(annotation.group());
 groupName = StringUtils.isEmpty(groupName) ? producerConfig.getGroup() : groupName;
?
 String ak = environment.resolvePlaceholders(annotation.accessKey());
 ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey();
 String sk = environment.resolvePlaceholders(annotation.secretKey());
 sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey();
 String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());
 customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;
?
 if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
 producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
 annotation.enableMsgTrace(), customizedTraceTopic);
 producer.setVipChannelEnabled(false);
 } else {
 producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);
 }
?
 producer.setNamesrvAddr(nameServer);
 producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout());
 producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());
 producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());
 producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());
 producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());
 producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer());
?
 return producer;
 }
?
 private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) {
 if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
 throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
 "please check the @ExtRocketMQTemplateConfiguration",
 annotation.value()));
 }
?
 if (rocketMQProperties.getNameServer() == null ||
 rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {
 throw new BeanDefinitionValidationException(
 "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +
 "global property, please use the default RocketMQTemplate!");
 }
 }
}
  • ExtProducerResetConfiguration实现了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法会取出标记有ExtRocketMQTemplateConfiguration的beans,然后挨个执行registerTemplate方法
  • registerTemplate首先判断RocketMQTemplate.class是否是该bean的超类,是的话则读取ExtRocketMQTemplateConfiguration注解,校验注解指定的NameServer地址是否与全局的相同,相同则抛出BeanDefinitionValidationException异常
  • 最后通过createProducer方法创建DefaultMQProducer,然后执行其start方法,然后复制给该bean,并设置objectMapper

ExtRocketMQTemplateConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface ExtRocketMQTemplateConfiguration {
 /**
 * The component name of the Producer configuration.
 */
 String value() default "";
?
 /**
 * The property of "name-server".
 */
 String nameServer();
?
 /**
 * Name of producer.
 */
 String group() default "${rocketmq.producer.group:}";
 /**
 * Millis of send message timeout.
 */
 int sendMessageTimeout() default -1;
 /**
 * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
 */
 int compressMessageBodyThreshold() default -1;
 /**
 * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
 * This may potentially cause message duplication which is up to application developers to resolve.
 */
 int retryTimesWhenSendFailed() default -1;
 /**
 * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
 * This may potentially cause message duplication which is up to application developers to resolve.
 */
 int retryTimesWhenSendAsyncFailed() default -1;
 /**
 * Indicate whether to retry another broker on sending failure internally.
 */
 boolean retryNextServer() default false;
 /**
 * Maximum allowed message size in bytes.
 */
 int maxMessageSize() default -1;
 /**
 * The property of "access-key".
 */
 String accessKey() default "${rocketmq.producer.accessKey:}";
 /**
 * The property of "secret-key".
 */
 String secretKey() default "${rocketmq.producer.secretKey:}";
 /**
 * Switch flag instance for message trace.
 */
 boolean enableMsgTrace() default true;
 /**
 * The name value of message trace topic.If you don't config,you can use the default trace topic name.
 */
 String customizedTraceTopic() default "${rocketmq.producer.customized-trace-topic:}";
}
  • ExtRocketMQTemplateConfiguration注解定义了value、nameServer、group、sendMessageTimeout、compressMessageBodyThreshold、retryTimesWhenSendFailed、retryTimesWhenSendAsyncFailed、retryNextServer、maxMessageSize、accessKey、secretKey、enableMsgTrace、customizedTraceTopic属性

ExtRocketMQTemplate

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
?
@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
  • 这里定义了ExtRocketMQTemplate类,它继承了RocketMQTemplate,同时使用了ExtRocketMQTemplateConfiguration注解

小结

  • ExtProducerResetConfiguration实现了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法会取出标记有ExtRocketMQTemplateConfiguration的beans,然后挨个执行registerTemplate方法
  • registerTemplate首先判断RocketMQTemplate.class是否是该bean的超类,是的话则读取ExtRocketMQTemplateConfiguration注解,校验注解指定的NameServer地址是否与全局的相同,相同则抛出BeanDefinitionValidationException异常
  • 最后通过createProducer方法创建DefaultMQProducer,然后执行其start方法,然后复制给该bean,并设置objectMapper

doc

  • ExtProducerResetConfiguration
  • ExtRocketMQTemplateConfiguration
  • ExtRocketMQTemplate
最近发表
标签列表