潘志宝
2024-09-06 c06f48bded461209f117167fbf89ed57a3f37ef4
提交 | 用户 | 时间
e7c126 1 package com.xxl.job.admin.core.trigger;
H 2
3 import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
4 import com.xxl.job.admin.core.model.XxlJobGroup;
5 import com.xxl.job.admin.core.model.XxlJobInfo;
6 import com.xxl.job.admin.core.model.XxlJobLog;
7 import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
8 import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
9 import com.xxl.job.admin.core.util.I18nUtil;
10 import com.xxl.job.core.biz.ExecutorBiz;
11 import com.xxl.job.core.biz.model.ReturnT;
12 import com.xxl.job.core.biz.model.TriggerParam;
13 import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
14 import com.xxl.job.core.util.IpUtil;
15 import com.xxl.job.core.util.ThrowableUtil;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 import java.util.Date;
20
21 /**
22  * xxl-job trigger
23  * Created by xuxueli on 17/7/13.
24  */
25 public class XxlJobTrigger {
26     private static Logger logger = LoggerFactory.getLogger(XxlJobTrigger.class);
27
28     /**
29      * trigger job
30      *
31      * @param jobId
32      * @param triggerType
33      * @param failRetryCount
34      *             >=0: use this param
35      *             <0: use param from job info config
36      * @param executorShardingParam
37      * @param executorParam
38      *          null: use job param
39      *          not null: cover job param
40      * @param addressList
41      *          null: use executor addressList
42      *          not null: cover
43      */
44     public static void trigger(int jobId,
45                                TriggerTypeEnum triggerType,
46                                int failRetryCount,
47                                String executorShardingParam,
48                                String executorParam,
49                                String addressList) {
50
51         // load data
52         XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
53         if (jobInfo == null) {
54             logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
55             return;
56         }
57         if (executorParam != null) {
58             jobInfo.setExecutorParam(executorParam);
59         }
60         int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
61         XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
62
63         // cover addressList
64         if (addressList!=null && addressList.trim().length()>0) {
65             group.setAddressType(1);
66             group.setAddressList(addressList.trim());
67         }
68
69         // sharding param
70         int[] shardingParam = null;
71         if (executorShardingParam!=null){
72             String[] shardingArr = executorShardingParam.split("/");
73             if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
74                 shardingParam = new int[2];
75                 shardingParam[0] = Integer.valueOf(shardingArr[0]);
76                 shardingParam[1] = Integer.valueOf(shardingArr[1]);
77             }
78         }
79         if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
80                 && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
81                 && shardingParam==null) {
82             for (int i = 0; i < group.getRegistryList().size(); i++) {
83                 processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
84             }
85         } else {
86             if (shardingParam == null) {
87                 shardingParam = new int[]{0, 1};
88             }
89             processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
90         }
91
92     }
93
94     private static boolean isNumeric(String str){
95         try {
96             int result = Integer.valueOf(str);
97             return true;
98         } catch (NumberFormatException e) {
99             return false;
100         }
101     }
102
103     /**
104      * @param group                     job group, registry list may be empty
105      * @param jobInfo
106      * @param finalFailRetryCount
107      * @param triggerType
108      * @param index                     sharding index
109      * @param total                     sharding index
110      */
111     private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
112
113         // param
114         ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
115         ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
116         String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
117
118         // 1、save log-id
119         XxlJobLog jobLog = new XxlJobLog();
120         jobLog.setJobGroup(jobInfo.getJobGroup());
121         jobLog.setJobId(jobInfo.getId());
122         jobLog.setTriggerTime(new Date());
123         XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
124         logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
125
126         // 2、init trigger-param
127         TriggerParam triggerParam = new TriggerParam();
128         triggerParam.setJobId(jobInfo.getId());
129         triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
130         triggerParam.setExecutorParams(jobInfo.getExecutorParam());
131         triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
132         triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
133         triggerParam.setLogId(jobLog.getId());
134         triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
135         triggerParam.setGlueType(jobInfo.getGlueType());
136         triggerParam.setGlueSource(jobInfo.getGlueSource());
137         triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
138         triggerParam.setBroadcastIndex(index);
139         triggerParam.setBroadcastTotal(total);
140
141         // 3、init address
142         String address = null;
143         ReturnT<String> routeAddressResult = null;
144         if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
145             if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
146                 if (index < group.getRegistryList().size()) {
147                     address = group.getRegistryList().get(index);
148                 } else {
149                     address = group.getRegistryList().get(0);
150                 }
151             } else {
152                 routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
153                 if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
154                     address = routeAddressResult.getContent();
155                 }
156             }
157         } else {
158             routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
159         }
160
161         // 4、trigger remote executor
162         ReturnT<String> triggerResult = null;
163         if (address != null) {
164             triggerResult = runExecutor(triggerParam, address);
165         } else {
166             triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
167         }
168
169         // 5、collection trigger info
170         StringBuffer triggerMsgSb = new StringBuffer();
171         triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
172         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
173         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
174                 .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
175         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
176         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
177         if (shardingParam != null) {
178             triggerMsgSb.append("("+shardingParam+")");
179         }
180         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
181         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
182         triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
183
184         triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
185                 .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
186
187         // 6、save log trigger-info
188         jobLog.setExecutorAddress(address);
189         jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
190         jobLog.setExecutorParam(jobInfo.getExecutorParam());
191         jobLog.setExecutorShardingParam(shardingParam);
192         jobLog.setExecutorFailRetryCount(finalFailRetryCount);
193         //jobLog.setTriggerTime();
194         jobLog.setTriggerCode(triggerResult.getCode());
195         jobLog.setTriggerMsg(triggerMsgSb.toString());
196         XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
197
198         logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
199     }
200
201     /**
202      * run executor
203      * @param triggerParam
204      * @param address
205      * @return
206      */
207     public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
208         ReturnT<String> runResult = null;
209         try {
210             ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
211             runResult = executorBiz.run(triggerParam);
212         } catch (Exception e) {
213             logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
214             runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
215         }
216
217         StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
218         runResultSB.append("<br>address:").append(address);
219         runResultSB.append("<br>code:").append(runResult.getCode());
220         runResultSB.append("<br>msg:").append(runResult.getMsg());
221
222         runResult.setMsg(runResultSB.toString());
223         return runResult;
224     }
225
226 }