提交 | 用户 | 时间
|
e7c126
|
1 |
package com.iailab.framework.mq.redis.config; |
H |
2 |
|
|
3 |
import cn.hutool.core.map.MapUtil; |
|
4 |
import cn.hutool.core.util.StrUtil; |
|
5 |
import cn.hutool.system.SystemUtil; |
|
6 |
import com.iailab.framework.common.enums.DocumentEnum; |
|
7 |
import com.iailab.framework.mq.redis.core.RedisMQTemplate; |
|
8 |
import com.iailab.framework.mq.redis.core.job.RedisPendingMessageResendJob; |
|
9 |
import com.iailab.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; |
|
10 |
import com.iailab.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; |
|
11 |
import com.iailab.framework.redis.config.IailabRedisAutoConfiguration; |
|
12 |
import lombok.extern.slf4j.Slf4j; |
|
13 |
import org.redisson.api.RedissonClient; |
|
14 |
import org.springframework.beans.factory.annotation.Value; |
|
15 |
import org.springframework.boot.autoconfigure.AutoConfiguration; |
|
16 |
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; |
|
17 |
import org.springframework.context.annotation.Bean; |
|
18 |
import org.springframework.data.redis.connection.RedisServerCommands; |
|
19 |
import org.springframework.data.redis.connection.stream.Consumer; |
|
20 |
import org.springframework.data.redis.connection.stream.ObjectRecord; |
|
21 |
import org.springframework.data.redis.connection.stream.ReadOffset; |
|
22 |
import org.springframework.data.redis.connection.stream.StreamOffset; |
|
23 |
import org.springframework.data.redis.core.RedisCallback; |
|
24 |
import org.springframework.data.redis.core.RedisTemplate; |
|
25 |
import org.springframework.data.redis.listener.ChannelTopic; |
|
26 |
import org.springframework.data.redis.listener.RedisMessageListenerContainer; |
|
27 |
import org.springframework.data.redis.stream.StreamMessageListenerContainer; |
|
28 |
import org.springframework.scheduling.annotation.EnableScheduling; |
|
29 |
|
|
30 |
import java.util.List; |
|
31 |
import java.util.Properties; |
|
32 |
|
|
33 |
/** |
|
34 |
* Redis 消息队列 Consumer 配置类 |
|
35 |
* |
|
36 |
* @author iailab |
|
37 |
*/ |
|
38 |
@Slf4j |
|
39 |
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息 |
|
40 |
@AutoConfiguration(after = IailabRedisAutoConfiguration.class) |
|
41 |
public class IailabRedisMQConsumerAutoConfiguration { |
|
42 |
|
|
43 |
/** |
|
44 |
* 创建 Redis Pub/Sub 广播消费的容器 |
|
45 |
*/ |
|
46 |
@Bean |
|
47 |
@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 |
|
48 |
public RedisMessageListenerContainer redisMessageListenerContainer( |
|
49 |
RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) { |
|
50 |
// 创建 RedisMessageListenerContainer 对象 |
|
51 |
RedisMessageListenerContainer container = new RedisMessageListenerContainer(); |
|
52 |
// 设置 RedisConnection 工厂。 |
|
53 |
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory()); |
|
54 |
// 添加监听器 |
|
55 |
listeners.forEach(listener -> { |
|
56 |
listener.setRedisMQTemplate(redisMQTemplate); |
|
57 |
container.addMessageListener(listener, new ChannelTopic(listener.getChannel())); |
|
58 |
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", |
|
59 |
listener.getChannel(), listener.getClass().getName()); |
|
60 |
}); |
|
61 |
return container; |
|
62 |
} |
|
63 |
|
|
64 |
/** |
|
65 |
* 创建 Redis Stream 重新消费的任务 |
|
66 |
*/ |
|
67 |
@Bean |
|
68 |
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 |
|
69 |
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners, |
|
70 |
RedisMQTemplate redisTemplate, |
|
71 |
@Value("${spring.application.name}") String groupName, |
|
72 |
RedissonClient redissonClient) { |
|
73 |
return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient); |
|
74 |
} |
|
75 |
|
|
76 |
/** |
|
77 |
* 创建 Redis Stream 集群消费的容器 |
|
78 |
* |
|
79 |
* 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a> |
|
80 |
*/ |
|
81 |
@Bean(initMethod = "start", destroyMethod = "stop") |
|
82 |
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 |
|
83 |
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer( |
|
84 |
RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) { |
|
85 |
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate(); |
|
86 |
checkRedisVersion(redisTemplate); |
|
87 |
// 第一步,创建 StreamMessageListenerContainer 容器 |
|
88 |
// 创建 options 配置 |
|
89 |
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions = |
|
90 |
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() |
|
91 |
.batchSize(10) // 一次性最多拉取多少条消息 |
|
92 |
.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 |
|
93 |
.build(); |
|
94 |
// 创建 container 对象 |
|
95 |
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = |
|
96 |
StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); |
|
97 |
|
|
98 |
// 第二步,注册监听器,消费对应的 Stream 主题 |
|
99 |
String consumerName = buildConsumerName(); |
|
100 |
listeners.parallelStream().forEach(listener -> { |
|
101 |
log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]", |
|
102 |
listener.getStreamKey(), listener.getClass().getName()); |
|
103 |
// 创建 listener 对应的消费者分组 |
|
104 |
try { |
|
105 |
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); |
|
106 |
} catch (Exception ignore) { |
|
107 |
} |
|
108 |
// 设置 listener 对应的 redisTemplate |
|
109 |
listener.setRedisMQTemplate(redisMQTemplate); |
|
110 |
// 创建 Consumer 对象 |
|
111 |
Consumer consumer = Consumer.from(listener.getGroup(), consumerName); |
|
112 |
// 设置 Consumer 消费进度,以最小消费进度为准 |
|
113 |
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()); |
|
114 |
// 设置 Consumer 监听 |
|
115 |
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest |
|
116 |
.builder(streamOffset).consumer(consumer) |
|
117 |
.autoAcknowledge(false) // 不自动 ack |
|
118 |
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false |
|
119 |
container.register(builder.build(), listener); |
|
120 |
log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]", |
|
121 |
listener.getStreamKey(), listener.getClass().getName()); |
|
122 |
}); |
|
123 |
return container; |
|
124 |
} |
|
125 |
|
|
126 |
/** |
|
127 |
* 构建消费者名字,使用本地 IP + 进程编号的方式。 |
|
128 |
* 参考自 RocketMQ clientId 的实现 |
|
129 |
* |
|
130 |
* @return 消费者名字 |
|
131 |
*/ |
|
132 |
private static String buildConsumerName() { |
|
133 |
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID()); |
|
134 |
} |
|
135 |
|
|
136 |
/** |
|
137 |
* 校验 Redis 版本号,是否满足最低的版本号要求! |
|
138 |
*/ |
|
139 |
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) { |
|
140 |
// 获得 Redis 版本 |
|
141 |
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info); |
|
142 |
String version = MapUtil.getStr(info, "redis_version"); |
|
143 |
// 校验最低版本必须大于等于 5.0.0 |
|
144 |
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false)); |
|
145 |
if (majorVersion < 5) { |
|
146 |
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" + |
|
147 |
"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl())); |
|
148 |
} |
|
149 |
} |
|
150 |
|
|
151 |
} |