提交 | 用户 | 时间
|
e7c126
|
1 |
package com.iailab.framework.mq.redis.core.job; |
H |
2 |
|
|
3 |
import cn.hutool.core.collection.CollUtil; |
|
4 |
import com.iailab.framework.mq.redis.core.RedisMQTemplate; |
|
5 |
import com.iailab.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; |
|
6 |
import lombok.AllArgsConstructor; |
|
7 |
import lombok.extern.slf4j.Slf4j; |
|
8 |
import org.redisson.api.RLock; |
|
9 |
import org.redisson.api.RedissonClient; |
|
10 |
import org.springframework.data.domain.Range; |
|
11 |
import org.springframework.data.redis.connection.stream.*; |
|
12 |
import org.springframework.data.redis.core.StreamOperations; |
|
13 |
import org.springframework.scheduling.annotation.Scheduled; |
|
14 |
|
|
15 |
import java.util.List; |
|
16 |
import java.util.Map; |
|
17 |
import java.util.Objects; |
|
18 |
|
|
19 |
/** |
|
20 |
* 这个任务用于处理,crash 之后的消费者未消费完的消息 |
|
21 |
*/ |
|
22 |
@Slf4j |
|
23 |
@AllArgsConstructor |
|
24 |
public class RedisPendingMessageResendJob { |
|
25 |
|
|
26 |
private static final String LOCK_KEY = "redis:pending:msg:lock"; |
|
27 |
|
|
28 |
/** |
|
29 |
* 消息超时时间,默认 5 分钟 |
|
30 |
* |
|
31 |
* 1. 超时的消息才会被重新投递 |
|
32 |
* 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到 |
|
33 |
*/ |
|
34 |
private static final int EXPIRE_TIME = 5 * 60; |
|
35 |
|
|
36 |
private final List<AbstractRedisStreamMessageListener<?>> listeners; |
|
37 |
private final RedisMQTemplate redisTemplate; |
|
38 |
private final String groupName; |
|
39 |
private final RedissonClient redissonClient; |
|
40 |
|
|
41 |
/** |
|
42 |
* 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题 |
|
43 |
*/ |
|
44 |
@Scheduled(cron = "35 * * * * ?") |
|
45 |
public void messageResend() { |
|
46 |
RLock lock = redissonClient.getLock(LOCK_KEY); |
|
47 |
// 尝试加锁 |
|
48 |
if (lock.tryLock()) { |
|
49 |
try { |
|
50 |
execute(); |
|
51 |
} catch (Exception ex) { |
|
52 |
log.error("[messageResend][执行异常]", ex); |
|
53 |
} finally { |
|
54 |
lock.unlock(); |
|
55 |
} |
|
56 |
} |
|
57 |
} |
|
58 |
|
|
59 |
/** |
|
60 |
* 执行清理逻辑 |
|
61 |
* |
|
62 |
* @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files">讨论</a> |
|
63 |
*/ |
|
64 |
private void execute() { |
|
65 |
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream(); |
|
66 |
listeners.forEach(listener -> { |
|
67 |
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName)); |
|
68 |
// 每个消费者的 pending 队列消息数量 |
|
69 |
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); |
|
70 |
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> { |
|
71 |
log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount); |
|
72 |
// 每个消费者的 pending消息的详情信息 |
|
73 |
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount); |
|
74 |
if (pendingMessages.isEmpty()) { |
|
75 |
return; |
|
76 |
} |
|
77 |
pendingMessages.forEach(pendingMessage -> { |
|
78 |
// 获取消息上一次传递到 consumer 的时间, |
|
79 |
long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds(); |
|
80 |
if (lastDelivery < EXPIRE_TIME){ |
|
81 |
return; |
|
82 |
} |
|
83 |
// 获取指定 id 的消息体 |
|
84 |
List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(), |
|
85 |
Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString()))); |
|
86 |
if (CollUtil.isEmpty(records)) { |
|
87 |
return; |
|
88 |
} |
|
89 |
// 重新投递消息 |
|
90 |
redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord() |
|
91 |
.ofObject(records.get(0).getValue()) // 设置内容 |
|
92 |
.withStreamKey(listener.getStreamKey())); |
|
93 |
// ack 消息消费完成 |
|
94 |
redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0)); |
|
95 |
log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId()); |
|
96 |
}); |
|
97 |
}); |
|
98 |
}); |
|
99 |
} |
|
100 |
} |