潘志宝
2024-09-06 c06f48bded461209f117167fbf89ed57a3f37ef4
提交 | 用户 | 时间
e7c126 1 package com.xxl.job.admin.core.thread;
H 2
3 import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
4 import com.xxl.job.admin.core.cron.CronExpression;
5 import com.xxl.job.admin.core.model.XxlJobInfo;
6 import com.xxl.job.admin.core.scheduler.MisfireStrategyEnum;
7 import com.xxl.job.admin.core.scheduler.ScheduleTypeEnum;
8 import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 import java.sql.Connection;
13 import java.sql.PreparedStatement;
14 import java.sql.SQLException;
15 import java.util.*;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.TimeUnit;
18
19 /**
20  * @author xuxueli 2019-05-21
21  */
22 public class JobScheduleHelper {
23     private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);
24
25     private static JobScheduleHelper instance = new JobScheduleHelper();
26     public static JobScheduleHelper getInstance(){
27         return instance;
28     }
29
30     public static final long PRE_READ_MS = 5000;    // pre read
31
32     private Thread scheduleThread;
33     private Thread ringThread;
34     private volatile boolean scheduleThreadToStop = false;
35     private volatile boolean ringThreadToStop = false;
36     private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
37
38     public void start(){
39
40         // schedule thread
41         scheduleThread = new Thread(new Runnable() {
42             @Override
43             public void run() {
44
45                 try {
46                     TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
47                 } catch (InterruptedException e) {
48                     if (!scheduleThreadToStop) {
49                         logger.error(e.getMessage(), e);
50                     }
51                 }
52                 logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
53
54                 // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
55                 int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
56
57                 while (!scheduleThreadToStop) {
58
59                     // Scan Job
60                     long start = System.currentTimeMillis();
61
62                     Connection conn = null;
63                     Boolean connAutoCommit = null;
64                     PreparedStatement preparedStatement = null;
65
66                     boolean preReadSuc = true;
67                     try {
68
69                         conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
70                         connAutoCommit = conn.getAutoCommit();
71                         conn.setAutoCommit(false);
72
73                         preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
74                         preparedStatement.execute();
75
76                         // tx start
77
78                         // 1、pre read
79                         long nowTime = System.currentTimeMillis();
80                         List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
81                         if (scheduleList!=null && scheduleList.size()>0) {
82                             // 2、push time-ring
83                             for (XxlJobInfo jobInfo: scheduleList) {
84
85                                 // time-ring jump
86                                 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
87                                     // 2.1、trigger-expire > 5s:pass && make next-trigger-time
88                                     logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
89
90                                     // 1、misfire match
91                                     MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
92                                     if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
93                                         // FIRE_ONCE_NOW 》 trigger
94                                         JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
95                                         logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
96                                     }
97
98                                     // 2、fresh next
99                                     refreshNextValidTime(jobInfo, new Date());
100
101                                 } else if (nowTime > jobInfo.getTriggerNextTime()) {
102                                     // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
103
104                                     // 1、trigger
105                                     JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
106                                     logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
107
108                                     // 2、fresh next
109                                     refreshNextValidTime(jobInfo, new Date());
110
111                                     // next-trigger-time in 5s, pre-read again
112                                     if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
113
114                                         // 1、make ring second
115                                         int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
116
117                                         // 2、push time ring
118                                         pushTimeRing(ringSecond, jobInfo.getId());
119
120                                         // 3、fresh next
121                                         refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
122
123                                     }
124
125                                 } else {
126                                     // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
127
128                                     // 1、make ring second
129                                     int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
130
131                                     // 2、push time ring
132                                     pushTimeRing(ringSecond, jobInfo.getId());
133
134                                     // 3、fresh next
135                                     refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
136
137                                 }
138
139                             }
140
141                             // 3、update trigger info
142                             for (XxlJobInfo jobInfo: scheduleList) {
143                                 XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
144                             }
145
146                         } else {
147                             preReadSuc = false;
148                         }
149
150                         // tx stop
151
152
153                     } catch (Exception e) {
154                         if (!scheduleThreadToStop) {
155                             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
156                         }
157                     } finally {
158
159                         // commit
160                         if (conn != null) {
161                             try {
162                                 conn.commit();
163                             } catch (SQLException e) {
164                                 if (!scheduleThreadToStop) {
165                                     logger.error(e.getMessage(), e);
166                                 }
167                             }
168                             try {
169                                 conn.setAutoCommit(connAutoCommit);
170                             } catch (SQLException e) {
171                                 if (!scheduleThreadToStop) {
172                                     logger.error(e.getMessage(), e);
173                                 }
174                             }
175                             try {
176                                 conn.close();
177                             } catch (SQLException e) {
178                                 if (!scheduleThreadToStop) {
179                                     logger.error(e.getMessage(), e);
180                                 }
181                             }
182                         }
183
184                         // close PreparedStatement
185                         if (null != preparedStatement) {
186                             try {
187                                 preparedStatement.close();
188                             } catch (SQLException e) {
189                                 if (!scheduleThreadToStop) {
190                                     logger.error(e.getMessage(), e);
191                                 }
192                             }
193                         }
194                     }
195                     long cost = System.currentTimeMillis()-start;
196
197
198                     // Wait seconds, align second
199                     if (cost < 1000) {  // scan-overtime, not wait
200                         try {
201                             // pre-read period: success > scan each second; fail > skip this period;
202                             TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
203                         } catch (InterruptedException e) {
204                             if (!scheduleThreadToStop) {
205                                 logger.error(e.getMessage(), e);
206                             }
207                         }
208                     }
209
210                 }
211
212                 logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
213             }
214         });
215         scheduleThread.setDaemon(true);
216         scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
217         scheduleThread.start();
218
219
220         // ring thread
221         ringThread = new Thread(new Runnable() {
222             @Override
223             public void run() {
224
225                 while (!ringThreadToStop) {
226
227                     // align second
228                     try {
229                         TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
230                     } catch (InterruptedException e) {
231                         if (!ringThreadToStop) {
232                             logger.error(e.getMessage(), e);
233                         }
234                     }
235
236                     try {
237                         // second data
238                         List<Integer> ringItemData = new ArrayList<>();
239                         int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
240                         for (int i = 0; i < 2; i++) {
241                             List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
242                             if (tmpData != null) {
243                                 ringItemData.addAll(tmpData);
244                             }
245                         }
246
247                         // ring trigger
248                         logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
249                         if (ringItemData.size() > 0) {
250                             // do trigger
251                             for (int jobId: ringItemData) {
252                                 // do trigger
253                                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
254                             }
255                             // clear
256                             ringItemData.clear();
257                         }
258                     } catch (Exception e) {
259                         if (!ringThreadToStop) {
260                             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
261                         }
262                     }
263                 }
264                 logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
265             }
266         });
267         ringThread.setDaemon(true);
268         ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
269         ringThread.start();
270     }
271
272     private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
273         Date nextValidTime = generateNextValidTime(jobInfo, fromTime);
274         if (nextValidTime != null) {
275             jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
276             jobInfo.setTriggerNextTime(nextValidTime.getTime());
277         } else {
278             jobInfo.setTriggerStatus(0);
279             jobInfo.setTriggerLastTime(0);
280             jobInfo.setTriggerNextTime(0);
281             logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}",
282                     jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());
283         }
284     }
285
286     private void pushTimeRing(int ringSecond, int jobId){
287         // push async ring
288         List<Integer> ringItemData = ringData.get(ringSecond);
289         if (ringItemData == null) {
290             ringItemData = new ArrayList<Integer>();
291             ringData.put(ringSecond, ringItemData);
292         }
293         ringItemData.add(jobId);
294
295         logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
296     }
297
298     public void toStop(){
299
300         // 1、stop schedule
301         scheduleThreadToStop = true;
302         try {
303             TimeUnit.SECONDS.sleep(1);  // wait
304         } catch (InterruptedException e) {
305             logger.error(e.getMessage(), e);
306         }
307         if (scheduleThread.getState() != Thread.State.TERMINATED){
308             // interrupt and wait
309             scheduleThread.interrupt();
310             try {
311                 scheduleThread.join();
312             } catch (InterruptedException e) {
313                 logger.error(e.getMessage(), e);
314             }
315         }
316
317         // if has ring data
318         boolean hasRingData = false;
319         if (!ringData.isEmpty()) {
320             for (int second : ringData.keySet()) {
321                 List<Integer> tmpData = ringData.get(second);
322                 if (tmpData!=null && tmpData.size()>0) {
323                     hasRingData = true;
324                     break;
325                 }
326             }
327         }
328         if (hasRingData) {
329             try {
330                 TimeUnit.SECONDS.sleep(8);
331             } catch (InterruptedException e) {
332                 logger.error(e.getMessage(), e);
333             }
334         }
335
336         // stop ring (wait job-in-memory stop)
337         ringThreadToStop = true;
338         try {
339             TimeUnit.SECONDS.sleep(1);
340         } catch (InterruptedException e) {
341             logger.error(e.getMessage(), e);
342         }
343         if (ringThread.getState() != Thread.State.TERMINATED){
344             // interrupt and wait
345             ringThread.interrupt();
346             try {
347                 ringThread.join();
348             } catch (InterruptedException e) {
349                 logger.error(e.getMessage(), e);
350             }
351         }
352
353         logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
354     }
355
356
357     // ---------------------- tools ----------------------
358     public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
359         ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
360         if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {
361             Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
362             return nextValidTime;
363         } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) {
364             return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
365         }
366         return null;
367     }
368
369 }