package com.iailab.framework.mq.redis.core;
|
|
import com.iailab.framework.common.util.json.JsonUtils;
|
import com.iailab.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
import com.iailab.framework.mq.redis.core.message.AbstractRedisMessage;
|
import com.iailab.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
|
import com.iailab.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
|
import lombok.AllArgsConstructor;
|
import lombok.Getter;
|
import org.springframework.data.redis.connection.stream.RecordId;
|
import org.springframework.data.redis.connection.stream.StreamRecords;
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
|
/**
|
* Redis MQ 操作模板类
|
*
|
* @author iailab
|
*/
|
@AllArgsConstructor
|
public class RedisMQTemplate {
|
|
@Getter
|
private final RedisTemplate<String, ?> redisTemplate;
|
/**
|
* 拦截器数组
|
*/
|
@Getter
|
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
|
|
/**
|
* 发送 Redis 消息,基于 Redis pub/sub 实现
|
*
|
* @param message 消息
|
*/
|
public <T extends AbstractRedisChannelMessage> void send(T message) {
|
try {
|
sendMessageBefore(message);
|
// 发送消息
|
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
|
} finally {
|
sendMessageAfter(message);
|
}
|
}
|
|
/**
|
* 发送 Redis 消息,基于 Redis Stream 实现
|
*
|
* @param message 消息
|
* @return 消息记录的编号对象
|
*/
|
public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
|
try {
|
sendMessageBefore(message);
|
// 发送消息
|
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
|
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
|
.withStreamKey(message.getStreamKey())); // 设置 stream key
|
} finally {
|
sendMessageAfter(message);
|
}
|
}
|
|
/**
|
* 添加拦截器
|
*
|
* @param interceptor 拦截器
|
*/
|
public void addInterceptor(RedisMessageInterceptor interceptor) {
|
interceptors.add(interceptor);
|
}
|
|
private void sendMessageBefore(AbstractRedisMessage message) {
|
// 正序
|
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
|
}
|
|
private void sendMessageAfter(AbstractRedisMessage message) {
|
// 倒序
|
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
interceptors.get(i).sendMessageAfter(message);
|
}
|
}
|
|
}
|