package com.iailab.framework.mq.redis.core; import com.iailab.framework.common.util.json.JsonUtils; import com.iailab.framework.mq.redis.core.interceptor.RedisMessageInterceptor; import com.iailab.framework.mq.redis.core.message.AbstractRedisMessage; import com.iailab.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage; import com.iailab.framework.mq.redis.core.stream.AbstractRedisStreamMessage; import lombok.AllArgsConstructor; import lombok.Getter; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.core.RedisTemplate; import java.util.ArrayList; import java.util.List; /** * Redis MQ 操作模板类 * * @author iailab */ @AllArgsConstructor public class RedisMQTemplate { @Getter private final RedisTemplate redisTemplate; /** * 拦截器数组 */ @Getter private final List interceptors = new ArrayList<>(); /** * 发送 Redis 消息,基于 Redis pub/sub 实现 * * @param message 消息 */ public void send(T message) { try { sendMessageBefore(message); // 发送消息 redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message)); } finally { sendMessageAfter(message); } } /** * 发送 Redis 消息,基于 Redis Stream 实现 * * @param message 消息 * @return 消息记录的编号对象 */ public RecordId send(T message) { try { sendMessageBefore(message); // 发送消息 return redisTemplate.opsForStream().add(StreamRecords.newRecord() .ofObject(JsonUtils.toJsonString(message)) // 设置内容 .withStreamKey(message.getStreamKey())); // 设置 stream key } finally { sendMessageAfter(message); } } /** * 添加拦截器 * * @param interceptor 拦截器 */ public void addInterceptor(RedisMessageInterceptor interceptor) { interceptors.add(interceptor); } private void sendMessageBefore(AbstractRedisMessage message) { // 正序 interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message)); } private void sendMessageAfter(AbstractRedisMessage message) { // 倒序 for (int i = interceptors.size() - 1; i >= 0; i--) { interceptors.get(i).sendMessageAfter(message); } } }