package com.iailab.framework.tenant.core.mq.rocketmq;
|
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
|
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
|
import org.springframework.beans.BeansException;
|
import org.springframework.beans.factory.config.BeanPostProcessor;
|
|
/**
|
* 多租户的 RocketMQ 初始化器
|
*
|
* @author iailab
|
*/
|
public class TenantRocketMQInitializer implements BeanPostProcessor {
|
|
@Override
|
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
if (bean instanceof DefaultRocketMQListenerContainer) {
|
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
|
initTenantConsumer(container.getConsumer());
|
} else if (bean instanceof RocketMQTemplate) {
|
RocketMQTemplate template = (RocketMQTemplate) bean;
|
initTenantProducer(template.getProducer());
|
}
|
return bean;
|
}
|
|
private void initTenantProducer(DefaultMQProducer producer) {
|
if (producer == null) {
|
return;
|
}
|
DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
|
if (producerImpl == null) {
|
return;
|
}
|
producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
|
}
|
|
private void initTenantConsumer(DefaultMQPushConsumer consumer) {
|
if (consumer == null) {
|
return;
|
}
|
DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
|
if (consumerImpl == null) {
|
return;
|
}
|
consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
|
}
|
|
}
|