网站首页 > 技术文章 正文
序
本文主要研究一下rocketmq的ExtProducerResetConfiguration
ExtProducerResetConfiguration
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@Configuration public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class); ? private ConfigurableApplicationContext applicationContext; ? private StandardEnvironment environment; ? private RocketMQProperties rocketMQProperties; ? private ObjectMapper objectMapper; ? public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper, StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.objectMapper = rocketMQMessageObjectMapper; this.environment = environment; this.rocketMQProperties = rocketMQProperties; } ? @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } ? @Override public void afterSingletonsInstantiated() { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class); ? if (Objects.nonNull(beans)) { beans.forEach(this::registerTemplate); } } ? private void registerTemplate(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); ? if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName()); } ? ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; validate(annotation, genericApplicationContext); ? DefaultMQProducer mqProducer = createProducer(annotation); // Set instanceName same as the beanName mqProducer.setInstanceName(beanName); try { mqProducer.start(); } catch (MQClientException e) { throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}", beanName), e); } RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean; rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setObjectMapper(objectMapper); ? ? log.info("Set real producer to :{} {}", beanName, annotation.value()); } ? private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) { DefaultMQProducer producer = null; ? RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); if (producerConfig == null) { producerConfig = new RocketMQProperties.Producer(); } String nameServer = environment.resolvePlaceholders(annotation.nameServer()); String groupName = environment.resolvePlaceholders(annotation.group()); groupName = StringUtils.isEmpty(groupName) ? producerConfig.getGroup() : groupName; ? String ak = environment.resolvePlaceholders(annotation.accessKey()); ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey(); String sk = environment.resolvePlaceholders(annotation.secretKey()); sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey(); String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic()); customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic; ? if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)), annotation.enableMsgTrace(), customizedTraceTopic); producer.setVipChannelEnabled(false); } else { producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic); } ? producer.setNamesrvAddr(nameServer); producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout()); producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed()); producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed()); producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize()); producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold()); producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer()); ? return producer; } ? private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) { if (genericApplicationContext.isBeanNameInUse(annotation.value())) { throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " + "please check the @ExtRocketMQTemplateConfiguration", annotation.value())); } ? if (rocketMQProperties.getNameServer() == null || rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) { throw new BeanDefinitionValidationException( "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " + "global property, please use the default RocketMQTemplate!"); } } }
- ExtProducerResetConfiguration实现了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法会取出标记有ExtRocketMQTemplateConfiguration的beans,然后挨个执行registerTemplate方法
- registerTemplate首先判断RocketMQTemplate.class是否是该bean的超类,是的话则读取ExtRocketMQTemplateConfiguration注解,校验注解指定的NameServer地址是否与全局的相同,相同则抛出BeanDefinitionValidationException异常
- 最后通过createProducer方法创建DefaultMQProducer,然后执行其start方法,然后复制给该bean,并设置objectMapper
ExtRocketMQTemplateConfiguration
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface ExtRocketMQTemplateConfiguration { /** * The component name of the Producer configuration. */ String value() default ""; ? /** * The property of "name-server". */ String nameServer(); ? /** * Name of producer. */ String group() default "${rocketmq.producer.group:}"; /** * Millis of send message timeout. */ int sendMessageTimeout() default -1; /** * Compress message body threshold, namely, message body larger than 4k will be compressed on default. */ int compressMessageBodyThreshold() default -1; /** * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. * This may potentially cause message duplication which is up to application developers to resolve. */ int retryTimesWhenSendFailed() default -1; /** * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p> * This may potentially cause message duplication which is up to application developers to resolve. */ int retryTimesWhenSendAsyncFailed() default -1; /** * Indicate whether to retry another broker on sending failure internally. */ boolean retryNextServer() default false; /** * Maximum allowed message size in bytes. */ int maxMessageSize() default -1; /** * The property of "access-key". */ String accessKey() default "${rocketmq.producer.accessKey:}"; /** * The property of "secret-key". */ String secretKey() default "${rocketmq.producer.secretKey:}"; /** * Switch flag instance for message trace. */ boolean enableMsgTrace() default true; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default "${rocketmq.producer.customized-trace-topic:}"; }
- ExtRocketMQTemplateConfiguration注解定义了value、nameServer、group、sendMessageTimeout、compressMessageBodyThreshold、retryTimesWhenSendFailed、retryTimesWhenSendAsyncFailed、retryNextServer、maxMessageSize、accessKey、secretKey、enableMsgTrace、customizedTraceTopic属性
ExtRocketMQTemplate
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; import org.apache.rocketmq.spring.core.RocketMQTemplate; ? @ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}") public class ExtRocketMQTemplate extends RocketMQTemplate { }
- 这里定义了ExtRocketMQTemplate类,它继承了RocketMQTemplate,同时使用了ExtRocketMQTemplateConfiguration注解
小结
- ExtProducerResetConfiguration实现了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法会取出标记有ExtRocketMQTemplateConfiguration的beans,然后挨个执行registerTemplate方法
- registerTemplate首先判断RocketMQTemplate.class是否是该bean的超类,是的话则读取ExtRocketMQTemplateConfiguration注解,校验注解指定的NameServer地址是否与全局的相同,相同则抛出BeanDefinitionValidationException异常
- 最后通过createProducer方法创建DefaultMQProducer,然后执行其start方法,然后复制给该bean,并设置objectMapper
doc
- ExtProducerResetConfiguration
- ExtRocketMQTemplateConfiguration
- ExtRocketMQTemplate
猜你喜欢
- 2024-09-21 5分钟了解Handler错误使用场景(handlerexecution)
- 2024-09-21 魔兽世界:不会用宏的快来看 教你入门宏命令 老油条请评论区补充
- 2024-09-21 关于SendMessage函数(send函数的参数)
- 2024-09-21 老外给你发“sorry,WC”,你知道他说的是什么意思吗?
- 2024-09-21 面试官:你来简单回答一下RocketMQ的默认发送流程
- 2024-09-21 Spring Boot 集成RocketMQ:使用模板类发送和消费延时消息
- 2024-09-21 记一次 .NET某医疗器械清洗系统 卡死分析
- 2024-09-21 Spring Boot+RocketMQ 实现多实例分布式环境下的事件驱动
- 1514℃桌面软件开发新体验!用 Blazor Hybrid 打造简洁高效的视频处理工具
- 569℃Dify工具使用全场景:dify-sandbox沙盒的原理(源码篇·第2期)
- 510℃MySQL service启动脚本浅析(r12笔记第59天)
- 486℃服务器异常重启,导致mysql启动失败,问题解决过程记录
- 485℃启用MySQL查询缓存(mysql8.0查询缓存)
- 467℃「赵强老师」MySQL的闪回(赵强iso是哪个大学毕业的)
- 446℃mysql服务怎么启动和关闭?(mysql服务怎么启动和关闭)
- 444℃MySQL server PID file could not be found!失败
- 最近发表
- 标签列表
-
- c++中::是什么意思 (83)
- 标签用于 (65)
- 主键只能有一个吗 (66)
- c#console.writeline不显示 (75)
- pythoncase语句 (81)
- es6includes (73)
- windowsscripthost (67)
- apt-getinstall-y (86)
- node_modules怎么生成 (76)
- c++int转char (75)
- static函数和普通函数 (76)
- el-date-picker开始日期早于结束日期 (70)
- js判断是否是json字符串 (67)
- checkout-b (67)
- c语言min函数头文件 (68)
- asynccallback (71)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- & (66)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- eacces (67)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)