houzhongyi
2024-07-11 e7c1260db32209a078a962aaa0ad5492c35774fb
提交 | 用户 | 时间
e7c126 1 package com.xxl.job.admin.core.route.strategy;
H 2
3 import com.xxl.job.admin.core.route.ExecutorRouter;
4 import com.xxl.job.core.biz.model.ReturnT;
5 import com.xxl.job.core.biz.model.TriggerParam;
6
7 import java.io.UnsupportedEncodingException;
8 import java.security.MessageDigest;
9 import java.security.NoSuchAlgorithmException;
10 import java.util.List;
11 import java.util.SortedMap;
12 import java.util.TreeMap;
13
14 /**
15  * 分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
16  *      a、virtual node:解决不均衡问题
17  *      b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围
18  * Created by xuxueli on 17/3/10.
19  */
20 public class ExecutorRouteConsistentHash extends ExecutorRouter {
21
22     private static int VIRTUAL_NODE_NUM = 100;
23
24     /**
25      * get hash code on 2^32 ring (md5散列的方式计算hash值)
26      * @param key
27      * @return
28      */
29     private static long hash(String key) {
30
31         // md5 byte
32         MessageDigest md5;
33         try {
34             md5 = MessageDigest.getInstance("MD5");
35         } catch (NoSuchAlgorithmException e) {
36             throw new RuntimeException("MD5 not supported", e);
37         }
38         md5.reset();
39         byte[] keyBytes = null;
40         try {
41             keyBytes = key.getBytes("UTF-8");
42         } catch (UnsupportedEncodingException e) {
43             throw new RuntimeException("Unknown string :" + key, e);
44         }
45
46         md5.update(keyBytes);
47         byte[] digest = md5.digest();
48
49         // hash code, Truncate to 32-bits
50         long hashCode = ((long) (digest[3] & 0xFF) << 24)
51                 | ((long) (digest[2] & 0xFF) << 16)
52                 | ((long) (digest[1] & 0xFF) << 8)
53                 | (digest[0] & 0xFF);
54
55         long truncateHashCode = hashCode & 0xffffffffL;
56         return truncateHashCode;
57     }
58
59     public String hashJob(int jobId, List<String> addressList) {
60
61         // ------A1------A2-------A3------
62         // -----------J1------------------
63         TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
64         for (String address: addressList) {
65             for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
66                 long addressHash = hash("SHARD-" + address + "-NODE-" + i);
67                 addressRing.put(addressHash, address);
68             }
69         }
70
71         long jobHash = hash(String.valueOf(jobId));
72         SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
73         if (!lastRing.isEmpty()) {
74             return lastRing.get(lastRing.firstKey());
75         }
76         return addressRing.firstEntry().getValue();
77     }
78
79     @Override
80     public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
81         String address = hashJob(triggerParam.getJobId(), addressList);
82         return new ReturnT<String>(address);
83     }
84
85 }