潘志宝
2024-12-24 140065f50679c04ea873db3f6c958358b8dd8ddc
提交 | 用户 | 时间
e7c126 1 package com.iailab.framework.tenant.core.mq.rocketmq;
H 2
3 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
4 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
5 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
6 import org.apache.rocketmq.client.producer.DefaultMQProducer;
7 import org.apache.rocketmq.spring.core.RocketMQTemplate;
8 import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
9 import org.springframework.beans.BeansException;
10 import org.springframework.beans.factory.config.BeanPostProcessor;
11
12 /**
13  * 多租户的 RocketMQ 初始化器
14  *
15  * @author iailab
16  */
17 public class TenantRocketMQInitializer implements BeanPostProcessor {
18
19     @Override
20     public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
21         if (bean instanceof DefaultRocketMQListenerContainer) {
22             DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
23             initTenantConsumer(container.getConsumer());
24         } else if (bean instanceof RocketMQTemplate) {
25             RocketMQTemplate template = (RocketMQTemplate) bean;
26             initTenantProducer(template.getProducer());
27         }
28         return bean;
29     }
30
31     private void initTenantProducer(DefaultMQProducer producer) {
32         if (producer == null) {
33             return;
34         }
35         DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
36         if (producerImpl == null) {
37             return;
38         }
39         producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
40     }
41
42     private void initTenantConsumer(DefaultMQPushConsumer consumer) {
43         if (consumer == null) {
44             return;
45         }
46         DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
47         if (consumerImpl == null) {
48             return;
49         }
50         consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
51     }
52
53 }