package com.iailab.framework.tenant.core.job;
|
|
import cn.hutool.core.collection.CollUtil;
|
import cn.hutool.core.exceptions.ExceptionUtil;
|
import cn.hutool.core.util.StrUtil;
|
import com.iailab.framework.common.util.json.JsonUtils;
|
import com.iailab.framework.tenant.core.service.TenantFrameworkService;
|
import com.iailab.framework.tenant.core.util.TenantUtils;
|
import com.xxl.job.core.context.XxlJobHelper;
|
import lombok.RequiredArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import org.aspectj.lang.ProceedingJoinPoint;
|
import org.aspectj.lang.annotation.Around;
|
import org.aspectj.lang.annotation.Aspect;
|
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
/**
|
* 多租户 JobHandler AOP
|
* 任务执行时,会按照租户逐个执行 Job 的逻辑
|
*
|
* 注意,需要保证 JobHandler 的幂等性。因为 Job 因为某个租户执行失败重试时,之前执行成功的租户也会再次执行。
|
*
|
* @author iailab
|
*/
|
@Aspect
|
@RequiredArgsConstructor
|
@Slf4j
|
public class TenantJobAspect {
|
|
private final TenantFrameworkService tenantFrameworkService;
|
|
@Around("@annotation(tenantJob)")
|
public void around(ProceedingJoinPoint joinPoint, TenantJob tenantJob) {
|
// 获得租户列表
|
List<Long> tenantIds = tenantFrameworkService.getTenantIds();
|
if (CollUtil.isEmpty(tenantIds)) {
|
return;
|
}
|
|
// 逐个租户,执行 Job
|
Map<Long, String> results = new ConcurrentHashMap<>();
|
tenantIds.parallelStream().forEach(tenantId -> {
|
// TODO iailab:先通过 parallel 实现并行;1)多个租户,是一条执行日志;2)异常的情况
|
TenantUtils.execute(tenantId, () -> {
|
try {
|
System.out.println("租户id:" + tenantId);
|
joinPoint.proceed();
|
} catch (Throwable e) {
|
results.put(tenantId, ExceptionUtil.getRootCauseMessage(e));
|
// 打印异常
|
XxlJobHelper.log(StrUtil.format("[多租户({}) 执行任务({}),发生异常:{}]",
|
tenantId, joinPoint.getSignature(), ExceptionUtils.getStackTrace(e)));
|
}
|
});
|
});
|
// 如果 results 非空,说明发生了异常,标记 XXL-Job 执行失败
|
if (CollUtil.isNotEmpty(results)) {
|
XxlJobHelper.handleFail(JsonUtils.toJsonString(results));
|
}
|
}
|
|
}
|