package com.iailab.framework.tenant.core.job; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.exceptions.ExceptionUtil; import cn.hutool.core.util.StrUtil; import com.iailab.framework.common.util.json.JsonUtils; import com.iailab.framework.tenant.core.service.TenantFrameworkService; import com.iailab.framework.tenant.core.util.TenantUtils; import com.xxl.job.core.context.XxlJobHelper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 多租户 JobHandler AOP * 任务执行时,会按照租户逐个执行 Job 的逻辑 * * 注意,需要保证 JobHandler 的幂等性。因为 Job 因为某个租户执行失败重试时,之前执行成功的租户也会再次执行。 * * @author iailab */ @Aspect @RequiredArgsConstructor @Slf4j public class TenantJobAspect { private final TenantFrameworkService tenantFrameworkService; @Around("@annotation(tenantJob)") public void around(ProceedingJoinPoint joinPoint, TenantJob tenantJob) { // 获得租户列表 List tenantIds = tenantFrameworkService.getTenantIds(); if (CollUtil.isEmpty(tenantIds)) { return; } // 逐个租户,执行 Job Map results = new ConcurrentHashMap<>(); tenantIds.parallelStream().forEach(tenantId -> { // TODO iailab:先通过 parallel 实现并行;1)多个租户,是一条执行日志;2)异常的情况 TenantUtils.execute(tenantId, () -> { try { System.out.println("租户id:" + tenantId); joinPoint.proceed(); } catch (Throwable e) { results.put(tenantId, ExceptionUtil.getRootCauseMessage(e)); // 打印异常 XxlJobHelper.log(StrUtil.format("[多租户({}) 执行任务({}),发生异常:{}]", tenantId, joinPoint.getSignature(), ExceptionUtils.getStackTrace(e))); } }); }); // 如果 results 非空,说明发生了异常,标记 XXL-Job 执行失败 if (CollUtil.isNotEmpty(results)) { XxlJobHelper.handleFail(JsonUtils.toJsonString(results)); } } }