潘志宝
5 天以前 7fce3006ecd0b670e33c2d3ba123778e79e2e943
提交 | 用户 | 时间
e7c126 1 package com.iailab.framework.mq.redis.config;
H 2
3 import cn.hutool.core.map.MapUtil;
4 import cn.hutool.core.util.StrUtil;
5 import cn.hutool.system.SystemUtil;
6 import com.iailab.framework.common.enums.DocumentEnum;
7 import com.iailab.framework.mq.redis.core.RedisMQTemplate;
8 import com.iailab.framework.mq.redis.core.job.RedisPendingMessageResendJob;
9 import com.iailab.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
10 import com.iailab.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
11 import com.iailab.framework.redis.config.IailabRedisAutoConfiguration;
12 import lombok.extern.slf4j.Slf4j;
13 import org.redisson.api.RedissonClient;
14 import org.springframework.beans.factory.annotation.Value;
15 import org.springframework.boot.autoconfigure.AutoConfiguration;
16 import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
17 import org.springframework.context.annotation.Bean;
18 import org.springframework.data.redis.connection.RedisServerCommands;
19 import org.springframework.data.redis.connection.stream.Consumer;
20 import org.springframework.data.redis.connection.stream.ObjectRecord;
21 import org.springframework.data.redis.connection.stream.ReadOffset;
22 import org.springframework.data.redis.connection.stream.StreamOffset;
23 import org.springframework.data.redis.core.RedisCallback;
24 import org.springframework.data.redis.core.RedisTemplate;
25 import org.springframework.data.redis.listener.ChannelTopic;
26 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
27 import org.springframework.data.redis.stream.StreamMessageListenerContainer;
28 import org.springframework.scheduling.annotation.EnableScheduling;
29
30 import java.util.List;
31 import java.util.Properties;
32
33 /**
34  * Redis 消息队列 Consumer 配置类
35  *
36  * @author iailab
37  */
38 @Slf4j
39 @EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
40 @AutoConfiguration(after = IailabRedisAutoConfiguration.class)
41 public class IailabRedisMQConsumerAutoConfiguration {
42
43     /**
44      * 创建 Redis Pub/Sub 广播消费的容器
45      */
46     @Bean
47     @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
48     public RedisMessageListenerContainer redisMessageListenerContainer(
49             RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
50         // 创建 RedisMessageListenerContainer 对象
51         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
52         // 设置 RedisConnection 工厂。
53         container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
54         // 添加监听器
55         listeners.forEach(listener -> {
56             listener.setRedisMQTemplate(redisMQTemplate);
57             container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
58             log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
59                     listener.getChannel(), listener.getClass().getName());
60         });
61         return container;
62     }
63
64     /**
65      * 创建 Redis Stream 重新消费的任务
66      */
67     @Bean
68     @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
69     public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
70                                                                      RedisMQTemplate redisTemplate,
71                                                                      @Value("${spring.application.name}") String groupName,
72                                                                      RedissonClient redissonClient) {
73         return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
74     }
75
76     /**
77      * 创建 Redis Stream 集群消费的容器
78      *
79      * 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>
80      */
81     @Bean(initMethod = "start", destroyMethod = "stop")
82     @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
83     public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
84             RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
85         RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
86         checkRedisVersion(redisTemplate);
87         // 第一步,创建 StreamMessageListenerContainer 容器
88         // 创建 options 配置
89         StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
90                 StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
91                         .batchSize(10) // 一次性最多拉取多少条消息
92                         .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
93                         .build();
94         // 创建 container 对象
95         StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
96                 StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
97
98         // 第二步,注册监听器,消费对应的 Stream 主题
99         String consumerName = buildConsumerName();
100         listeners.parallelStream().forEach(listener -> {
101             log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",
102                     listener.getStreamKey(), listener.getClass().getName());
103             // 创建 listener 对应的消费者分组
104             try {
105                 redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
106             } catch (Exception ignore) {
107             }
108             // 设置 listener 对应的 redisTemplate
109             listener.setRedisMQTemplate(redisMQTemplate);
110             // 创建 Consumer 对象
111             Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
112             // 设置 Consumer 消费进度,以最小消费进度为准
113             StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
114             // 设置 Consumer 监听
115             StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
116                     .builder(streamOffset).consumer(consumer)
117                     .autoAcknowledge(false) // 不自动 ack
118                     .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
119             container.register(builder.build(), listener);
120             log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",
121                     listener.getStreamKey(), listener.getClass().getName());
122         });
123         return container;
124     }
125
126     /**
127      * 构建消费者名字,使用本地 IP + 进程编号的方式。
128      * 参考自 RocketMQ clientId 的实现
129      *
130      * @return 消费者名字
131      */
132     private static String buildConsumerName() {
133         return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
134     }
135
136     /**
137      * 校验 Redis 版本号,是否满足最低的版本号要求!
138      */
139     private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
140         // 获得 Redis 版本
141         Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
142         String version = MapUtil.getStr(info, "redis_version");
143         // 校验最低版本必须大于等于 5.0.0
144         int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
145         if (majorVersion < 5) {
146             throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
147                     "请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
148         }
149     }
150
151 }