对比新文件 |
| | |
| | | package com.iailab.framework.tenant.core.mq.rocketmq; |
| | | |
| | | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
| | | import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; |
| | | import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; |
| | | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
| | | import org.apache.rocketmq.spring.core.RocketMQTemplate; |
| | | import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; |
| | | import org.springframework.beans.BeansException; |
| | | import org.springframework.beans.factory.config.BeanPostProcessor; |
| | | |
| | | /** |
| | | * 多租户的 RocketMQ 初始化器 |
| | | * |
| | | * @author iailab |
| | | */ |
| | | public class TenantRocketMQInitializer implements BeanPostProcessor { |
| | | |
| | | @Override |
| | | public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { |
| | | if (bean instanceof DefaultRocketMQListenerContainer) { |
| | | DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; |
| | | initTenantConsumer(container.getConsumer()); |
| | | } else if (bean instanceof RocketMQTemplate) { |
| | | RocketMQTemplate template = (RocketMQTemplate) bean; |
| | | initTenantProducer(template.getProducer()); |
| | | } |
| | | return bean; |
| | | } |
| | | |
| | | private void initTenantProducer(DefaultMQProducer producer) { |
| | | if (producer == null) { |
| | | return; |
| | | } |
| | | DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl(); |
| | | if (producerImpl == null) { |
| | | return; |
| | | } |
| | | producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook()); |
| | | } |
| | | |
| | | private void initTenantConsumer(DefaultMQPushConsumer consumer) { |
| | | if (consumer == null) { |
| | | return; |
| | | } |
| | | DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl(); |
| | | if (consumerImpl == null) { |
| | | return; |
| | | } |
| | | consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook()); |
| | | } |
| | | |
| | | } |