潘志宝
9 天以前 b2bb7d1ff5639dd844e84b881a515eca30625411
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.iailab.framework.tenant.core.job;
 
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.StrUtil;
import com.iailab.framework.common.util.json.JsonUtils;
import com.iailab.framework.tenant.core.service.TenantFrameworkService;
import com.iailab.framework.tenant.core.util.TenantUtils;
import com.xxl.job.core.context.XxlJobHelper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
 
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 多租户 JobHandler AOP
 * 任务执行时,会按照租户逐个执行 Job 的逻辑
 *
 * 注意,需要保证 JobHandler 的幂等性。因为 Job 因为某个租户执行失败重试时,之前执行成功的租户也会再次执行。
 *
 * @author iailab
 */
@Aspect
@RequiredArgsConstructor
@Slf4j
public class TenantJobAspect {
 
    private final TenantFrameworkService tenantFrameworkService;
 
    @Around("@annotation(tenantJob)")
    public void around(ProceedingJoinPoint joinPoint, TenantJob tenantJob) {
        // 获得租户列表
        List<Long> tenantIds = tenantFrameworkService.getTenantIds();
        if (CollUtil.isEmpty(tenantIds)) {
            return;
        }
 
        // 逐个租户,执行 Job
        Map<Long, String> results = new ConcurrentHashMap<>();
        tenantIds.parallelStream().forEach(tenantId -> {
            // TODO iailab:先通过 parallel 实现并行;1)多个租户,是一条执行日志;2)异常的情况
            TenantUtils.execute(tenantId, () -> {
                try {
                    System.out.println("租户id:" + tenantId);
                    joinPoint.proceed();
                } catch (Throwable e) {
                    results.put(tenantId, ExceptionUtil.getRootCauseMessage(e));
                    // 打印异常
                    XxlJobHelper.log(StrUtil.format("[多租户({}) 执行任务({}),发生异常:{}]",
                            tenantId, joinPoint.getSignature(), ExceptionUtils.getStackTrace(e)));
                }
            });
        });
        // 如果 results 非空,说明发生了异常,标记 XXL-Job 执行失败
        if (CollUtil.isNotEmpty(results)) {
            XxlJobHelper.handleFail(JsonUtils.toJsonString(results));
        }
    }
 
}