package com.iailab.framework.mq.redis.config; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.system.SystemUtil; import com.iailab.framework.common.enums.DocumentEnum; import com.iailab.framework.mq.redis.core.RedisMQTemplate; import com.iailab.framework.mq.redis.core.job.RedisPendingMessageResendJob; import com.iailab.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; import com.iailab.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; import com.iailab.framework.redis.config.IailabRedisAutoConfiguration; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisServerCommands; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.scheduling.annotation.EnableScheduling; import java.util.List; import java.util.Properties; /** * Redis 消息队列 Consumer 配置类 * * @author iailab */ @Slf4j @EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息 @AutoConfiguration(after = IailabRedisAutoConfiguration.class) public class IailabRedisMQConsumerAutoConfiguration { /** * 创建 Redis Pub/Sub 广播消费的容器 */ @Bean @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 public RedisMessageListenerContainer redisMessageListenerContainer( RedisMQTemplate redisMQTemplate, List> listeners) { // 创建 RedisMessageListenerContainer 对象 RedisMessageListenerContainer container = new RedisMessageListenerContainer(); // 设置 RedisConnection 工厂。 container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory()); // 添加监听器 listeners.forEach(listener -> { listener.setRedisMQTemplate(redisMQTemplate); container.addMessageListener(listener, new ChannelTopic(listener.getChannel())); log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", listener.getChannel(), listener.getClass().getName()); }); return container; } /** * 创建 Redis Stream 重新消费的任务 */ @Bean @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, RedisMQTemplate redisTemplate, @Value("${spring.application.name}") String groupName, RedissonClient redissonClient) { return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient); } /** * 创建 Redis Stream 集群消费的容器 * * 基础知识:Redis Stream 的 xreadgroup 命令 */ @Bean(initMethod = "start", destroyMethod = "stop") @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 public StreamMessageListenerContainer> redisStreamMessageListenerContainer( RedisMQTemplate redisMQTemplate, List> listeners) { RedisTemplate redisTemplate = redisMQTemplate.getRedisTemplate(); checkRedisVersion(redisTemplate); // 第一步,创建 StreamMessageListenerContainer 容器 // 创建 options 配置 StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() .batchSize(10) // 一次性最多拉取多少条消息 .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 .build(); // 创建 container 对象 StreamMessageListenerContainer> container = StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); // 第二步,注册监听器,消费对应的 Stream 主题 String consumerName = buildConsumerName(); listeners.parallelStream().forEach(listener -> { log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]", listener.getStreamKey(), listener.getClass().getName()); // 创建 listener 对应的消费者分组 try { redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); } catch (Exception ignore) { } // 设置 listener 对应的 redisTemplate listener.setRedisMQTemplate(redisMQTemplate); // 创建 Consumer 对象 Consumer consumer = Consumer.from(listener.getGroup(), consumerName); // 设置 Consumer 消费进度,以最小消费进度为准 StreamOffset streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()); // 设置 Consumer 监听 StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest .builder(streamOffset).consumer(consumer) .autoAcknowledge(false) // 不自动 ack .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false container.register(builder.build(), listener); log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]", listener.getStreamKey(), listener.getClass().getName()); }); return container; } /** * 构建消费者名字,使用本地 IP + 进程编号的方式。 * 参考自 RocketMQ clientId 的实现 * * @return 消费者名字 */ private static String buildConsumerName() { return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID()); } /** * 校验 Redis 版本号,是否满足最低的版本号要求! */ private static void checkRedisVersion(RedisTemplate redisTemplate) { // 获得 Redis 版本 Properties info = redisTemplate.execute((RedisCallback) RedisServerCommands::info); String version = MapUtil.getStr(info, "redis_version"); // 校验最低版本必须大于等于 5.0.0 int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false)); if (majorVersion < 5) { throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" + "请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl())); } } }