潘志宝
9 天以前 11a7424fd4e119e5094764bf19cc359e2b3eb76d
提交 | 用户 | 时间
e7c126 1 package com.xxl.job.admin.core.thread;
H 2
3 import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
4 import com.xxl.job.admin.core.trigger.TriggerTypeEnum;
5 import com.xxl.job.admin.core.trigger.XxlJobTrigger;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8
9 import java.util.concurrent.*;
10 import java.util.concurrent.atomic.AtomicInteger;
11
12 /**
13  * job trigger thread pool helper
14  *
15  * @author xuxueli 2018-07-03 21:08:07
16  */
17 public class JobTriggerPoolHelper {
18     private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
19
20
21     // ---------------------- trigger pool ----------------------
22
23     // fast/slow thread pool
24     private ThreadPoolExecutor fastTriggerPool = null;
25     private ThreadPoolExecutor slowTriggerPool = null;
26
27     public void start(){
28         fastTriggerPool = new ThreadPoolExecutor(
29                 10,
30                 XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
31                 60L,
32                 TimeUnit.SECONDS,
33                 new LinkedBlockingQueue<Runnable>(1000),
34                 new ThreadFactory() {
35                     @Override
36                     public Thread newThread(Runnable r) {
37                         return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
38                     }
39                 });
40
41         slowTriggerPool = new ThreadPoolExecutor(
42                 10,
43                 XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
44                 60L,
45                 TimeUnit.SECONDS,
46                 new LinkedBlockingQueue<Runnable>(2000),
47                 new ThreadFactory() {
48                     @Override
49                     public Thread newThread(Runnable r) {
50                         return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
51                     }
52                 });
53     }
54
55
56     public void stop() {
57         //triggerPool.shutdown();
58         fastTriggerPool.shutdownNow();
59         slowTriggerPool.shutdownNow();
60         logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
61     }
62
63
64     // job timeout count
65     private volatile long minTim = System.currentTimeMillis()/60000;     // ms > min
66     private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
67
68
69     /**
70      * add trigger
71      */
72     public void addTrigger(final int jobId,
73                            final TriggerTypeEnum triggerType,
74                            final int failRetryCount,
75                            final String executorShardingParam,
76                            final String executorParam,
77                            final String addressList) {
78
79         // choose thread pool
80         ThreadPoolExecutor triggerPool_ = fastTriggerPool;
81         AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
82         if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
83             triggerPool_ = slowTriggerPool;
84         }
85
86         // trigger
87         triggerPool_.execute(new Runnable() {
88             @Override
89             public void run() {
90
91                 long start = System.currentTimeMillis();
92
93                 try {
94                     // do trigger
95                     XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
96                 } catch (Exception e) {
97                     logger.error(e.getMessage(), e);
98                 } finally {
99
100                     // check timeout-count-map
101                     long minTim_now = System.currentTimeMillis()/60000;
102                     if (minTim != minTim_now) {
103                         minTim = minTim_now;
104                         jobTimeoutCountMap.clear();
105                     }
106
107                     // incr timeout-count-map
108                     long cost = System.currentTimeMillis()-start;
109                     if (cost > 500) {       // ob-timeout threshold 500ms
110                         AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
111                         if (timeoutCount != null) {
112                             timeoutCount.incrementAndGet();
113                         }
114                     }
115
116                 }
117
118             }
119         });
120     }
121
122
123
124     // ---------------------- helper ----------------------
125
126     private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
127
128     public static void toStart() {
129         helper.start();
130     }
131     public static void toStop() {
132         helper.stop();
133     }
134
135     /**
136      * @param jobId
137      * @param triggerType
138      * @param failRetryCount
139      *             >=0: use this param
140      *             <0: use param from job info config
141      * @param executorShardingParam
142      * @param executorParam
143      *          null: use job param
144      *          not null: cover job param
145      */
146     public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
147         helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
148     }
149
150 }