dengzedong
2024-12-31 f20e755c36d40d3474b6866b4f1b006cb128ef75
提交 | 用户 | 时间
e7c126 1 package com.iailab.framework.tenant.core.job;
H 2
3 import cn.hutool.core.collection.CollUtil;
4 import cn.hutool.core.exceptions.ExceptionUtil;
5 import cn.hutool.core.util.StrUtil;
6 import com.iailab.framework.common.util.json.JsonUtils;
7 import com.iailab.framework.tenant.core.service.TenantFrameworkService;
8 import com.iailab.framework.tenant.core.util.TenantUtils;
9 import com.xxl.job.core.context.XxlJobHelper;
10 import lombok.RequiredArgsConstructor;
11 import lombok.extern.slf4j.Slf4j;
12 import org.apache.commons.lang3.exception.ExceptionUtils;
13 import org.aspectj.lang.ProceedingJoinPoint;
14 import org.aspectj.lang.annotation.Around;
15 import org.aspectj.lang.annotation.Aspect;
16
17 import java.util.List;
18 import java.util.Map;
19 import java.util.concurrent.ConcurrentHashMap;
20
21 /**
22  * 多租户 JobHandler AOP
23  * 任务执行时,会按照租户逐个执行 Job 的逻辑
24  *
25  * 注意,需要保证 JobHandler 的幂等性。因为 Job 因为某个租户执行失败重试时,之前执行成功的租户也会再次执行。
26  *
27  * @author iailab
28  */
29 @Aspect
30 @RequiredArgsConstructor
31 @Slf4j
32 public class TenantJobAspect {
33
34     private final TenantFrameworkService tenantFrameworkService;
35
36     @Around("@annotation(tenantJob)")
37     public void around(ProceedingJoinPoint joinPoint, TenantJob tenantJob) {
38         // 获得租户列表
39         List<Long> tenantIds = tenantFrameworkService.getTenantIds();
40         if (CollUtil.isEmpty(tenantIds)) {
41             return;
42         }
43
44         // 逐个租户,执行 Job
45         Map<Long, String> results = new ConcurrentHashMap<>();
46         tenantIds.parallelStream().forEach(tenantId -> {
47             // TODO iailab:先通过 parallel 实现并行;1)多个租户,是一条执行日志;2)异常的情况
48             TenantUtils.execute(tenantId, () -> {
49                 try {
50                     System.out.println("租户id:" + tenantId);
51                     joinPoint.proceed();
52                 } catch (Throwable e) {
53                     results.put(tenantId, ExceptionUtil.getRootCauseMessage(e));
54                     // 打印异常
55                     XxlJobHelper.log(StrUtil.format("[多租户({}) 执行任务({}),发生异常:{}]",
56                             tenantId, joinPoint.getSignature(), ExceptionUtils.getStackTrace(e)));
57                 }
58             });
59         });
60         // 如果 results 非空,说明发生了异常,标记 XXL-Job 执行失败
61         if (CollUtil.isNotEmpty(results)) {
62             XxlJobHelper.handleFail(JsonUtils.toJsonString(results));
63         }
64     }
65
66 }