潘志宝
5 天以前 4d7e3bb9a93ac0bdba9075e5efa536a165f8aae9
提交 | 用户 | 时间
e7c126 1 package com.iailab.framework.mq.redis.core.job;
H 2
3 import cn.hutool.core.collection.CollUtil;
4 import com.iailab.framework.mq.redis.core.RedisMQTemplate;
5 import com.iailab.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
6 import lombok.AllArgsConstructor;
7 import lombok.extern.slf4j.Slf4j;
8 import org.redisson.api.RLock;
9 import org.redisson.api.RedissonClient;
10 import org.springframework.data.domain.Range;
11 import org.springframework.data.redis.connection.stream.*;
12 import org.springframework.data.redis.core.StreamOperations;
13 import org.springframework.scheduling.annotation.Scheduled;
14
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Objects;
18
19 /**
20  * 这个任务用于处理,crash 之后的消费者未消费完的消息
21  */
22 @Slf4j
23 @AllArgsConstructor
24 public class RedisPendingMessageResendJob {
25
26     private static final String LOCK_KEY = "redis:pending:msg:lock";
27
28     /**
29      * 消息超时时间,默认 5 分钟
30      *
31      * 1. 超时的消息才会被重新投递
32      * 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到
33      */
34     private static final int EXPIRE_TIME = 5 * 60;
35
36     private final List<AbstractRedisStreamMessageListener<?>> listeners;
37     private final RedisMQTemplate redisTemplate;
38     private final String groupName;
39     private final RedissonClient redissonClient;
40
41     /**
42      * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题
43      */
44     @Scheduled(cron = "35 * * * * ?")
45     public void messageResend() {
46         RLock lock = redissonClient.getLock(LOCK_KEY);
47         // 尝试加锁
48         if (lock.tryLock()) {
49             try {
50                 execute();
51             } catch (Exception ex) {
52                 log.error("[messageResend][执行异常]", ex);
53             } finally {
54                 lock.unlock();
55             }
56         }
57     }
58
59     /**
60      * 执行清理逻辑
61      *
62      * @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files">讨论</a>
63      */
64     private void execute() {
65         StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
66         listeners.forEach(listener -> {
67             PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
68             // 每个消费者的 pending 队列消息数量
69             Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
70             pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
71                 log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
72                 // 每个消费者的 pending消息的详情信息
73                 PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
74                 if (pendingMessages.isEmpty()) {
75                     return;
76                 }
77                 pendingMessages.forEach(pendingMessage -> {
78                     // 获取消息上一次传递到 consumer 的时间,
79                     long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
80                     if (lastDelivery < EXPIRE_TIME){
81                         return;
82                     }
83                     // 获取指定 id 的消息体
84                     List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),
85                             Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
86                     if (CollUtil.isEmpty(records)) {
87                         return;
88                     }
89                     // 重新投递消息
90                     redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
91                             .ofObject(records.get(0).getValue()) // 设置内容
92                             .withStreamKey(listener.getStreamKey()));
93                     // ack 消息消费完成
94                     redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
95                     log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
96                 });
97             });
98         });
99     }
100 }