潘志宝
5 天以前 bab43330bf6f48bdb7bfb258611f51bb05ef56fe
提交 | 用户 | 时间
e7c126 1 package com.iailab.framework.mq.redis.core;
H 2
3 import com.iailab.framework.common.util.json.JsonUtils;
4 import com.iailab.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
5 import com.iailab.framework.mq.redis.core.message.AbstractRedisMessage;
6 import com.iailab.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
7 import com.iailab.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
8 import lombok.AllArgsConstructor;
9 import lombok.Getter;
10 import org.springframework.data.redis.connection.stream.RecordId;
11 import org.springframework.data.redis.connection.stream.StreamRecords;
12 import org.springframework.data.redis.core.RedisTemplate;
13
14 import java.util.ArrayList;
15 import java.util.List;
16
17 /**
18  * Redis MQ 操作模板类
19  *
20  * @author iailab
21  */
22 @AllArgsConstructor
23 public class RedisMQTemplate {
24
25     @Getter
26     private final RedisTemplate<String, ?> redisTemplate;
27     /**
28      * 拦截器数组
29      */
30     @Getter
31     private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
32
33     /**
34      * 发送 Redis 消息,基于 Redis pub/sub 实现
35      *
36      * @param message 消息
37      */
38     public <T extends AbstractRedisChannelMessage> void send(T message) {
39         try {
40             sendMessageBefore(message);
41             // 发送消息
42             redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
43         } finally {
44             sendMessageAfter(message);
45         }
46     }
47
48     /**
49      * 发送 Redis 消息,基于 Redis Stream 实现
50      *
51      * @param message 消息
52      * @return 消息记录的编号对象
53      */
54     public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
55         try {
56             sendMessageBefore(message);
57             // 发送消息
58             return redisTemplate.opsForStream().add(StreamRecords.newRecord()
59                     .ofObject(JsonUtils.toJsonString(message)) // 设置内容
60                     .withStreamKey(message.getStreamKey())); // 设置 stream key
61         } finally {
62             sendMessageAfter(message);
63         }
64     }
65
66     /**
67      * 添加拦截器
68      *
69      * @param interceptor 拦截器
70      */
71     public void addInterceptor(RedisMessageInterceptor interceptor) {
72         interceptors.add(interceptor);
73     }
74
75     private void sendMessageBefore(AbstractRedisMessage message) {
76         // 正序
77         interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
78     }
79
80     private void sendMessageAfter(AbstractRedisMessage message) {
81         // 倒序
82         for (int i = interceptors.size() - 1; i >= 0; i--) {
83             interceptors.get(i).sendMessageAfter(message);
84         }
85     }
86
87 }