package com.xxl.job.admin.core.trigger;
|
|
import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
|
import com.xxl.job.admin.core.model.XxlJobGroup;
|
import com.xxl.job.admin.core.model.XxlJobInfo;
|
import com.xxl.job.admin.core.model.XxlJobLog;
|
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
|
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
|
import com.xxl.job.admin.core.util.I18nUtil;
|
import com.xxl.job.core.biz.ExecutorBiz;
|
import com.xxl.job.core.biz.model.ReturnT;
|
import com.xxl.job.core.biz.model.TriggerParam;
|
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
|
import com.xxl.job.core.util.IpUtil;
|
import com.xxl.job.core.util.ThrowableUtil;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.Date;
|
|
/**
|
* xxl-job trigger
|
* Created by xuxueli on 17/7/13.
|
*/
|
public class XxlJobTrigger {
|
private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class);
|
|
/**
|
* trigger job
|
*
|
* @param jobId
|
* @param triggerType
|
* @param failRetryCount
|
* >=0: use this param
|
* <0: use param from job info config
|
* @param executorShardingParam
|
* @param executorParam
|
* null: use job param
|
* not null: cover job param
|
* @param addressList
|
* null: use executor addressList
|
* not null: cover
|
*/
|
public static void trigger(int jobId,
|
TriggerTypeEnum triggerType,
|
int failRetryCount,
|
String executorShardingParam,
|
String executorParam,
|
String addressList) {
|
|
// load data
|
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
|
if (jobInfo == null) {
|
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
|
return;
|
}
|
if (executorParam != null) {
|
jobInfo.setExecutorParam(executorParam);
|
}
|
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
|
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
|
|
// cover addressList
|
if (addressList!=null && addressList.trim().length()>0) {
|
group.setAddressType(1);
|
group.setAddressList(addressList.trim());
|
}
|
|
// sharding param
|
int[] shardingParam = null;
|
if (executorShardingParam!=null){
|
String[] shardingArr = executorShardingParam.split("/");
|
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
|
shardingParam = new int[2];
|
shardingParam[0] = Integer.valueOf(shardingArr[0]);
|
shardingParam[1] = Integer.valueOf(shardingArr[1]);
|
}
|
}
|
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
|
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
|
&& shardingParam==null) {
|
for (int i = 0; i < group.getRegistryList().size(); i++) {
|
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
|
}
|
} else {
|
if (shardingParam == null) {
|
shardingParam = new int[]{0, 1};
|
}
|
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
|
}
|
|
}
|
|
private static boolean isNumeric(String str){
|
try {
|
int result = Integer.valueOf(str);
|
return true;
|
} catch (NumberFormatException e) {
|
return false;
|
}
|
}
|
|
/**
|
* @param group job group, registry list may be empty
|
* @param jobInfo
|
* @param finalFailRetryCount
|
* @param triggerType
|
* @param index sharding index
|
* @param total sharding index
|
*/
|
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
|
|
// param
|
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
|
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
|
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
|
|
// 1、save log-id
|
XxlJobLog jobLog = new XxlJobLog();
|
jobLog.setJobGroup(jobInfo.getJobGroup());
|
jobLog.setJobId(jobInfo.getId());
|
jobLog.setTriggerTime(new Date());
|
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
|
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
|
|
// 2、init trigger-param
|
TriggerParam triggerParam = new TriggerParam();
|
triggerParam.setJobId(jobInfo.getId());
|
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
|
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
|
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
|
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
|
triggerParam.setLogId(jobLog.getId());
|
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
|
triggerParam.setGlueType(jobInfo.getGlueType());
|
triggerParam.setGlueSource(jobInfo.getGlueSource());
|
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
|
triggerParam.setBroadcastIndex(index);
|
triggerParam.setBroadcastTotal(total);
|
|
// 3、init address
|
String address = null;
|
ReturnT<String> routeAddressResult = null;
|
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
|
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
|
if (index < group.getRegistryList().size()) {
|
address = group.getRegistryList().get(index);
|
} else {
|
address = group.getRegistryList().get(0);
|
}
|
} else {
|
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
|
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
|
address = routeAddressResult.getContent();
|
}
|
}
|
} else {
|
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
|
}
|
|
// 4、trigger remote executor
|
ReturnT<String> triggerResult = null;
|
if (address != null) {
|
triggerResult = runExecutor(triggerParam, address);
|
} else {
|
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
|
}
|
|
// 5、collection trigger info
|
StringBuffer triggerMsgSb = new StringBuffer();
|
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
|
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
|
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
|
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
|
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
|
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
|
if (shardingParam != null) {
|
triggerMsgSb.append("("+shardingParam+")");
|
}
|
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
|
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
|
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
|
|
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
|
.append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
|
|
// 6、save log trigger-info
|
jobLog.setExecutorAddress(address);
|
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
|
jobLog.setExecutorParam(jobInfo.getExecutorParam());
|
jobLog.setExecutorShardingParam(shardingParam);
|
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
|
//jobLog.setTriggerTime();
|
jobLog.setTriggerCode(triggerResult.getCode());
|
jobLog.setTriggerMsg(triggerMsgSb.toString());
|
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
|
|
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
|
}
|
|
/**
|
* run executor
|
* @param triggerParam
|
* @param address
|
* @return
|
*/
|
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
|
ReturnT<String> runResult = null;
|
try {
|
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
|
runResult = executorBiz.run(triggerParam);
|
} catch (Exception e) {
|
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
|
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
|
}
|
|
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
|
runResultSB.append("<br>address:").append(address);
|
runResultSB.append("<br>code:").append(runResult.getCode());
|
runResultSB.append("<br>msg:").append(runResult.getMsg());
|
|
runResult.setMsg(runResultSB.toString());
|
return runResult;
|
}
|
|
}
|