houzhongyi
2024-07-11 e7c1260db32209a078a962aaa0ad5492c35774fb
提交 | 用户 | 时间
e7c126 1 package com.xxl.job.admin.core.thread;
H 2
3 import com.xxl.job.admin.core.complete.XxlJobCompleter;
4 import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
5 import com.xxl.job.admin.core.model.XxlJobLog;
6 import com.xxl.job.admin.core.util.I18nUtil;
7 import com.xxl.job.core.biz.model.HandleCallbackParam;
8 import com.xxl.job.core.biz.model.ReturnT;
9 import com.xxl.job.core.util.DateUtil;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12
13 import java.util.Date;
14 import java.util.List;
15 import java.util.concurrent.*;
16
17 /**
18  * job lose-monitor instance
19  *
20  * @author xuxueli 2015-9-1 18:05:56
21  */
22 public class JobCompleteHelper {
23     private static Logger logger = LoggerFactory.getLogger(JobCompleteHelper.class);
24     
25     private static JobCompleteHelper instance = new JobCompleteHelper();
26     public static JobCompleteHelper getInstance(){
27         return instance;
28     }
29
30     // ---------------------- monitor ----------------------
31
32     private ThreadPoolExecutor callbackThreadPool = null;
33     private Thread monitorThread;
34     private volatile boolean toStop = false;
35     public void start(){
36
37         // for callback
38         callbackThreadPool = new ThreadPoolExecutor(
39                 2,
40                 20,
41                 30L,
42                 TimeUnit.SECONDS,
43                 new LinkedBlockingQueue<Runnable>(3000),
44                 new ThreadFactory() {
45                     @Override
46                     public Thread newThread(Runnable r) {
47                         return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
48                     }
49                 },
50                 new RejectedExecutionHandler() {
51                     @Override
52                     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
53                         r.run();
54                         logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
55                     }
56                 });
57
58
59         // for monitor
60         monitorThread = new Thread(new Runnable() {
61
62             @Override
63             public void run() {
64
65                 // wait for JobTriggerPoolHelper-init
66                 try {
67                     TimeUnit.MILLISECONDS.sleep(50);
68                 } catch (InterruptedException e) {
69                     if (!toStop) {
70                         logger.error(e.getMessage(), e);
71                     }
72                 }
73
74                 // monitor
75                 while (!toStop) {
76                     try {
77                         // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
78                         Date losedTime = DateUtil.addMinutes(new Date(), -10);
79                         List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
80
81                         if (losedJobIds!=null && losedJobIds.size()>0) {
82                             for (Long logId: losedJobIds) {
83
84                                 XxlJobLog jobLog = new XxlJobLog();
85                                 jobLog.setId(logId);
86
87                                 jobLog.setHandleTime(new Date());
88                                 jobLog.setHandleCode(ReturnT.FAIL_CODE);
89                                 jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
90
91                                 XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
92                             }
93
94                         }
95                     } catch (Exception e) {
96                         if (!toStop) {
97                             logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
98                         }
99                     }
100
101                     try {
102                         TimeUnit.SECONDS.sleep(60);
103                     } catch (Exception e) {
104                         if (!toStop) {
105                             logger.error(e.getMessage(), e);
106                         }
107                     }
108
109                 }
110
111                 logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
112
113             }
114         });
115         monitorThread.setDaemon(true);
116         monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
117         monitorThread.start();
118     }
119
120     public void toStop(){
121         toStop = true;
122
123         // stop registryOrRemoveThreadPool
124         callbackThreadPool.shutdownNow();
125
126         // stop monitorThread (interrupt and wait)
127         monitorThread.interrupt();
128         try {
129             monitorThread.join();
130         } catch (InterruptedException e) {
131             logger.error(e.getMessage(), e);
132         }
133     }
134
135
136     // ---------------------- helper ----------------------
137
138     public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
139
140         callbackThreadPool.execute(new Runnable() {
141             @Override
142             public void run() {
143                 for (HandleCallbackParam handleCallbackParam: callbackParamList) {
144                     ReturnT<String> callbackResult = callback(handleCallbackParam);
145                     logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
146                             (callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
147                 }
148             }
149         });
150
151         return ReturnT.SUCCESS;
152     }
153
154     private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
155         // valid log item
156         XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(handleCallbackParam.getLogId());
157         if (log == null) {
158             return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
159         }
160         if (log.getHandleCode() > 0) {
161             return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback.");     // avoid repeat callback, trigger child job etc
162         }
163
164         // handle msg
165         StringBuffer handleMsg = new StringBuffer();
166         if (log.getHandleMsg()!=null) {
167             handleMsg.append(log.getHandleMsg()).append("<br>");
168         }
169         if (handleCallbackParam.getHandleMsg() != null) {
170             handleMsg.append(handleCallbackParam.getHandleMsg());
171         }
172
173         // success, save log
174         log.setHandleTime(new Date());
175         log.setHandleCode(handleCallbackParam.getHandleCode());
176         log.setHandleMsg(handleMsg.toString());
177         XxlJobCompleter.updateHandleInfoAndFinish(log);
178
179         return ReturnT.SUCCESS;
180     }
181
182
183
184 }