潘志宝
9 天以前 11a7424fd4e119e5094764bf19cc359e2b3eb76d
提交 | 用户 | 时间
e7c126 1 package com.xxl.job.admin.core.thread;
H 2
3 import com.xxl.job.admin.core.conf.XxlJobAdminConfig;
4 import com.xxl.job.admin.core.model.XxlJobGroup;
5 import com.xxl.job.admin.core.model.XxlJobRegistry;
6 import com.xxl.job.core.biz.model.RegistryParam;
7 import com.xxl.job.core.biz.model.ReturnT;
8 import com.xxl.job.core.enums.RegistryConfig;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.springframework.util.StringUtils;
12
13 import java.util.*;
14 import java.util.concurrent.*;
15
16 /**
17  * job registry instance
18  * @author xuxueli 2016-10-02 19:10:24
19  */
20 public class JobRegistryHelper {
21     private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
22
23     private static JobRegistryHelper instance = new JobRegistryHelper();
24     public static JobRegistryHelper getInstance(){
25         return instance;
26     }
27
28     private ThreadPoolExecutor registryOrRemoveThreadPool = null;
29     private Thread registryMonitorThread;
30     private volatile boolean toStop = false;
31
32     public void start(){
33
34         // for registry or remove
35         registryOrRemoveThreadPool = new ThreadPoolExecutor(
36                 2,
37                 10,
38                 30L,
39                 TimeUnit.SECONDS,
40                 new LinkedBlockingQueue<Runnable>(2000),
41                 new ThreadFactory() {
42                     @Override
43                     public Thread newThread(Runnable r) {
44                         return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
45                     }
46                 },
47                 new RejectedExecutionHandler() {
48                     @Override
49                     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
50                         r.run();
51                         logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
52                     }
53                 });
54
55         // for monitor
56         registryMonitorThread = new Thread(new Runnable() {
57             @Override
58             public void run() {
59                 while (!toStop) {
60                     try {
61                         // auto registry group
62                         List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
63                         if (groupList!=null && !groupList.isEmpty()) {
64
65                             // remove dead address (admin/executor)
66                             List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
67                             if (ids!=null && ids.size()>0) {
68                                 XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
69                             }
70
71                             // fresh online address (admin/executor)
72                             HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
73                             List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
74                             if (list != null) {
75                                 for (XxlJobRegistry item: list) {
76                                     if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
77                                         String appname = item.getRegistryKey();
78                                         List<String> registryList = appAddressMap.get(appname);
79                                         if (registryList == null) {
80                                             registryList = new ArrayList<String>();
81                                         }
82
83                                         if (!registryList.contains(item.getRegistryValue())) {
84                                             registryList.add(item.getRegistryValue());
85                                         }
86                                         appAddressMap.put(appname, registryList);
87                                     }
88                                 }
89                             }
90
91                             // fresh group address
92                             for (XxlJobGroup group: groupList) {
93                                 List<String> registryList = appAddressMap.get(group.getAppname());
94                                 String addressListStr = null;
95                                 if (registryList!=null && !registryList.isEmpty()) {
96                                     Collections.sort(registryList);
97                                     StringBuilder addressListSB = new StringBuilder();
98                                     for (String item:registryList) {
99                                         addressListSB.append(item).append(",");
100                                     }
101                                     addressListStr = addressListSB.toString();
102                                     addressListStr = addressListStr.substring(0, addressListStr.length()-1);
103                                 }
104                                 group.setAddressList(addressListStr);
105                                 group.setUpdateTime(new Date());
106
107                                 XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
108                             }
109                         }
110                     } catch (Exception e) {
111                         if (!toStop) {
112                             logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
113                         }
114                     }
115                     try {
116                         TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
117                     } catch (InterruptedException e) {
118                         if (!toStop) {
119                             logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
120                         }
121                     }
122                 }
123                 logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
124             }
125         });
126         registryMonitorThread.setDaemon(true);
127         registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
128         registryMonitorThread.start();
129     }
130
131     public void toStop(){
132         toStop = true;
133
134         // stop registryOrRemoveThreadPool
135         registryOrRemoveThreadPool.shutdownNow();
136
137         // stop monitir (interrupt and wait)
138         registryMonitorThread.interrupt();
139         try {
140             registryMonitorThread.join();
141         } catch (InterruptedException e) {
142             logger.error(e.getMessage(), e);
143         }
144     }
145
146
147     // ---------------------- helper ----------------------
148
149     public ReturnT<String> registry(RegistryParam registryParam) {
150
151         // valid
152         if (!StringUtils.hasText(registryParam.getRegistryGroup())
153                 || !StringUtils.hasText(registryParam.getRegistryKey())
154                 || !StringUtils.hasText(registryParam.getRegistryValue())) {
155             return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
156         }
157
158         // async execute
159         registryOrRemoveThreadPool.execute(new Runnable() {
160             @Override
161             public void run() {
162                 int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
163                 if (ret < 1) {
164                     XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
165
166                     // fresh
167                     freshGroupRegistryInfo(registryParam);
168                 }
169             }
170         });
171
172         return ReturnT.SUCCESS;
173     }
174
175     public ReturnT<String> registryRemove(RegistryParam registryParam) {
176
177         // valid
178         if (!StringUtils.hasText(registryParam.getRegistryGroup())
179                 || !StringUtils.hasText(registryParam.getRegistryKey())
180                 || !StringUtils.hasText(registryParam.getRegistryValue())) {
181             return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
182         }
183
184         // async execute
185         registryOrRemoveThreadPool.execute(new Runnable() {
186             @Override
187             public void run() {
188                 int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
189                 if (ret > 0) {
190                     // fresh
191                     freshGroupRegistryInfo(registryParam);
192                 }
193             }
194         });
195
196         return ReturnT.SUCCESS;
197     }
198
199     private void freshGroupRegistryInfo(RegistryParam registryParam){
200         // Under consideration, prevent affecting core tables
201     }
202
203
204 }