1 三组概念欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我个人微信「java_front」一起交流学习
负载均衡、集群容错、服务降级这三个概念在DUBBO中非常重要,同理其它分布式框架也都有相同或者相近之概念。
从调用顺序角度分析,调用顺序依次是负载均衡、集群容错、服务降级。从解决问题角度分析,负载均衡解决了「选哪一个」问题,集群容错解决了「换哪一个」问题,服务降级解决了「全错怎么办」问题。
假设有1个服务消费者面对10个提供者,这时面临第一个问题就是「选哪一个」进行调用,所以负载均衡最先调用,假设选定了5号服务提供者进行服务调用。
假设消费者调用5号提供者发生了超时异常,这时面临第二个问题就是「换哪一个」进行调用:5号超时要不要换1号试一试,或者直接返回不进行重试,所以集群容错第二个调用。
假设已经重试了1号、3号、6号提供者全部超时,这时面临「全错怎么办」这第三个问题,这时可以直接返回一个固定值或者提示文案,所以服务降级第三个调用。
负载均衡作为整个调用链路第一个节点非常重要,本文结合DUBBO源码分析以下七种负载均衡策略:
简单随机加权随机简单轮询简单加权轮询平滑加权轮询一致性哈希最少活跃数
2 简单随机
简单随机含义是服务消费者每次会任意访问一个服务提供者,并且从概率角度看每个提供者被访问概率一致,可以通过指定范围随机数实现。第一步编写服务器代码
public class MyServer { private String ip; public MyServer(String ip) { this.ip = ip; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } }
第二步编写基础负载均衡策略,其它策略可以复用
public abstract class AbstractLoadBalance { public MyServer select(ListserverList) { return doSelect(serverList); } public abstract MyServer doSelect(List serverList); }
第三步编写简单随机策略
public class RandomBalance extends AbstractLoadBalance { @Override public MyServer doSelect(ListserverList) { // 随机数范围[0,serverListSize) int index = ThreadLocalRandom.current().nextInt(serverList.size()); return serverList.get(index); } }
第四步编写测试代码
public class LoadBalanceTest { public static void main(String[] args) { ListserverList = buildData(); testRandomBalance(serverList); } public static void testRandomBalance(List serverList) { AbstractLoadBalance randomBalance = new RandomBalance(); for (int i = 0; i < 10; i++) { MyServer server = randomBalance.select(serverList); System.out.println("RandomBalance route server=" + server); } } public static List buildData() { List serverList = new ArrayList (); MyServer server1 = new MyServer("192.1.1.1"); MyServer server2 = new MyServer("192.1.1.2"); MyServer server3 = new MyServer("192.1.1.3"); serverList.add(server1); serverList.add(server2); serverList.add(server3); return serverList; } }
第五步输出结果,循环次数越多结果越准确
RandomBalance route server=MyServer(ip=192.1.1.2) RandomBalance route server=MyServer(ip=192.1.1.1) RandomBalance route server=MyServer(ip=192.1.1.3) RandomBalance route server=MyServer(ip=192.1.1.2) RandomBalance route server=MyServer(ip=192.1.1.1) RandomBalance route server=MyServer(ip=192.1.1.1) RandomBalance route server=MyServer(ip=192.1.1.2) RandomBalance route server=MyServer(ip=192.1.1.2) RandomBalance route server=MyServer(ip=192.1.1.3) RandomBalance route server=MyServer(ip=192.1.1.3)
3 加权随机 3.1 设计思路
加权随机新增了权重这个概念,假设服务器A权重等于1,服务器B权重等于5,从概率角度看B服务器被访问概率5倍于A服务器。实现按照权重访问有很多种方式,我们选择使用概率区间这个思路。
假设现在有三台服务器,服务器权重分别是3、5、2,那么三者构成如下图概率区间:
概率区间计算步骤如下图:
3.2 代码实例
第一步编写服务器代码
public class MyServer { private String ip; private int weight; public MyServer(String ip) { this.ip = ip; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; } }
第二步编写加权随机策略
public class RandomWeightBalance extends AbstractLoadBalance { @Override public MyServer doSelect(ListserverList) { // 所有服务器总权重 int totalWeight = 0; // 第一个服务器权重 int firstWeight = serverList.get(0).getWeight(); // 所有服务器权重相等 boolean sameWeight = true; // 遍历所有服务器 for (MyServer server : serverList) { // 计算总权重 totalWeight += server.getWeight(); // 任意一个invoker权重不等于第一个权重则设置sameWeight=false if (sameWeight && server.getWeight() != firstWeight) { sameWeight = false; } } // 权重不相等则根据权重选择 if (!sameWeight) { // 在总区间范围[0,totalWeight)生成随机数A Integer offset = ThreadLocalRandom.current().nextInt(totalWeight); // 遍历所有服务器区间 for (MyServer server : serverList) { // 如果A在server区间直接返回 if (offset < server.getWeight()) { return server; } // 如果A不在server区间则减去此区间范围并继续匹配其它区间 offset -= server.getWeight(); } } // 所有服务器权重相等则随机选择 return serverList.get(ThreadLocalRandom.current().nextInt(serverList.size())); } }
第三步编写测试代码
public class LoadBalanceTest { public static void main(String[] args) { ListserverList = buildData(); testRandomWeightBalance(serverList); } public static void testRandomWeightBalance(List serverList) { AbstractLoadBalance randomBalance = new RandomWeightBalance(); for (int i = 0; i < 10; i++) { MyServer server = randomBalance.select(serverList); System.out.println("RandomWeightBalance route server=" + server); } } public static List buildData() { List serverList = new ArrayList (); MyServer server1 = new MyServer("192.1.1.1", 3); MyServer server2 = new MyServer("192.1.1.2", 5); MyServer server3 = new MyServer("192.1.1.3", 2); serverList.add(server1); serverList.add(server2); serverList.add(server3); return serverList; } }
第四步输出结果,循环次数越多结果越准确
RandomWeightBalance route server=MyServer(ip=192.1.1.2, weight=2) RandomWeightBalance route server=MyServer(ip=192.1.1.2, weight=3) RandomWeightBalance route server=MyServer(ip=192.1.1.1, weight=3) RandomWeightBalance route server=MyServer(ip=192.1.1.1, weight=3) RandomWeightBalance route server=MyServer(ip=192.1.1.3, weight=2) RandomWeightBalance route server=MyServer(ip=192.1.1.3, weight=2) RandomWeightBalance route server=MyServer(ip=192.1.1.2, weight=5) RandomWeightBalance route server=MyServer(ip=192.1.1.1, weight=3) RandomWeightBalance route server=MyServer(ip=192.1.1.2, weight=5) RandomWeightBalance route server=MyServer(ip=192.1.1.2, weight=5)
3.3 DUBBO源码
public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; @Override protectedInvoker doSelect(List > invokers, URL url, Invocation invocation) { // invoker数量 int length = invokers.size(); // 所有权重是否相等 boolean sameWeight = true; // 权重数组 int[] weights = new int[length]; // 第一个权重 int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // 权重之和 int totalWeight = firstWeight; // 遍历所有invoker for (int i = 1; i < length; i++) { // 获取权重 int weight = getWeight(invokers.get(i), invocation); weights[i] = weight; // 计算总权重 totalWeight += weight; // 任意一个invoker权重不等于第一个权重则设置sameWeight=false if (sameWeight && weight != firstWeight) { sameWeight = false; } } // 权重不相等则根据权重选择 if (totalWeight > 0 && !sameWeight) { int offset = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // 所有服务权重相等则随机选择 return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
4 简单轮询
简单轮询含义是服务消费者每次会依次访问一个服务提供者,并且从概率角度看每个提供者被访问概率一致,可以通过原子变量累加实现。第一步编写简单轮询策略
public class RoundRobinBalance extends AbstractLoadBalance { private AtomicInteger atomicIndex = new AtomicInteger(0); @Override public MyServer doSelect(ListserverList) { // atomicIndex自增大于服务器数量时取余 int index = atomicIndex.getAndIncrement() % serverList.size(); return serverList.get(index); } }
第二步编写测试代码
public class LoadBalanceTest { public static void main(String[] args) { ListserverList = buildData(); testRoundRobinBalance(serverList); } public static void testRoundRobinBalance(List serverList) { AbstractLoadBalance roundRobinBalance = new RoundRobinBalance(); for (int i = 0; i < 10; i++) { MyServer server = roundRobinBalance.select(serverList); System.out.println("RoundRobinBalance route server=" + server); } } public static List buildData() { List serverList = new ArrayList (); MyServer server1 = new MyServer("192.1.1.1"); MyServer server2 = new MyServer("192.1.1.2"); MyServer server3 = new MyServer("192.1.1.3"); serverList.add(server1); serverList.add(server2); serverList.add(server3); return serverList; } }
第三步输出结果
RoundRobinBalance route server=MyServer(ip=192.1.1.1) RoundRobinBalance route server=MyServer(ip=192.1.1.2) RoundRobinBalance route server=MyServer(ip=192.1.1.3) RoundRobinBalance route server=MyServer(ip=192.1.1.1) RoundRobinBalance route server=MyServer(ip=192.1.1.2) RoundRobinBalance route server=MyServer(ip=192.1.1.3) RoundRobinBalance route server=MyServer(ip=192.1.1.1) RoundRobinBalance route server=MyServer(ip=192.1.1.2) RoundRobinBalance route server=MyServer(ip=192.1.1.3) RoundRobinBalance route server=MyServer(ip=192.1.1.1)
5 简单加权轮询
简单加权轮询新增了权重这个概念,假设服务器A权重等于1,服务器B权重等于5,从概率角度看B服务器被访问概率5倍于A服务器,我们还是使用概率区间这个思路,相较于加权随机会有一些变化。
第一步编写简单加权轮询策略
public class RoundRobinWeightBalance1 extends AbstractLoadBalance { private AtomicInteger atomicIndex = new AtomicInteger(0); @Override public MyServer doSelect(ListserverList) { int totalWeight = 0; int firstWeight = serverList.get(0).getWeight(); boolean sameWeight = true; for (MyServer server : serverList) { totalWeight += server.getWeight(); if (sameWeight && server.getWeight() != firstWeight) { sameWeight = false; } } if (!sameWeight) { // 自增方式计算offset int offset = atomicIndex.getAndIncrement() % totalWeight; for (MyServer server : serverList) { if (offset < server.getWeight()) { return server; } offset -= server.getWeight(); } } int index = atomicIndex.getAndIncrement() % serverList.size(); return serverList.get(index); } }
第二步编写测试代码
public class LoadBalanceTest { public static void main(String[] args) { ListserverList = buildData(); testRoundRobinWeightBalance1(serverList); } public static void testRoundRobinWeightBalance1(List serverList) { AbstractLoadBalance roundRobinBalance = new RoundRobinWeightBalance1(); for (int i = 0; i < 10; i++) { MyServer server = roundRobinBalance.select(serverList); System.out.println("RoundRobinWeightBalance1 route server=" + server); } } public static List buildData() { List serverList = new ArrayList (); MyServer server1 = new MyServer("192.1.1.1", 3); MyServer server2 = new MyServer("192.1.1.2", 5); MyServer server3 = new MyServer("192.1.1.3", 2); serverList.add(server1); serverList.add(server2); serverList.add(server3); return serverList; } }
第三步输出结果
RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.3, weight=2) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.3, weight=2)
6 平滑加权轮询 6.1 设计思路
简单加权轮询有什么问题?我们分析其输出结果发现,连续3次访问服务器1,连续5次访问服务器2,连续2次访问服务器3,所以简单加权轮询策略会导致请求集中问题。
RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.1, weight=3) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.2, weight=5) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.3, weight=2) RoundRobinWeightBalance1 route server=MyServer(ip=192.1.1.3, weight=2)
所以需要使用平滑加权轮询策略,将请求比较均匀地分散至各个服务器,计算步骤如下:
6.2 代码实例
第一步编写服务器代码
public class MyServer { private String ip; private int weight; private int currentWeight = 0; public MyServer(String ip) { this.ip = ip; } public MyServer(String ip, int weight) { this.ip = ip; this.weight = weight; } public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; } public int getCurrentWeight() { return currentWeight; } public void setCurrentWeight(int currentWeight) { this.currentWeight = currentWeight; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } }
第二步编写平滑加权轮询策略
public class RoundRobinWeightBalance2 extends AbstractLoadBalance { private AtomicInteger atomicIndex = new AtomicInteger(0); @Override public MyServer doSelect(ListserverList) { int totalWeight = 0; int firstWeight = serverList.get(0).getWeight(); boolean sameWeight = true; for (MyServer server : serverList) { totalWeight += server.getWeight(); if (sameWeight && server.getWeight() != firstWeight) { sameWeight = false; } // 设置动态权重 -> currentWeight += weight server.setCurrentWeight(server.getCurrentWeight() + server.getWeight()); } if (!sameWeight) { // 最大动态权重服务器 -> max(currentWeight) MyServer maxCurrentWeightServer = serverList.stream().max((s1, s2) -> s1.getCurrentWeight() - s2.getCurrentWeight()).get(); // 设置最大动态权重 -> max(currentWeight) - totalWeight maxCurrentWeightServer.setCurrentWeight(maxCurrentWeightServer.getCurrentWeight() - totalWeight); // 返回最大动态权重服务器 return maxCurrentWeightServer; } // 权重相同依次轮询 int index = atomicIndex.getAndIncrement() % serverList.size(); return serverList.get(index); } }
第三步编写测试代码
public class LoadBalanceTest { public static void main(String[] args) { ListserverList = buildData(); testRoundRobinWeightBalance2(serverList); } public static void testRoundRobinWeightBalance2(List serverList) { AbstractLoadBalance roundRobinBalance = new RoundRobinWeightBalance2(); for (int i = 0; i < 10; i++) { MyServer server = roundRobinBalance.select(serverList); System.out.println("RoundRobinWeightBalance2 route server=" + server); } } public static List buildData() { List serverList = new ArrayList (); MyServer server1 = new MyServer("192.1.1.1", 3); MyServer server2 = new MyServer("192.1.1.2", 5); MyServer server3 = new MyServer("192.1.1.3", 2); serverList.add(server1); serverList.add(server2); serverList.add(server3); return serverList; } }
第四步输出结果
RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.2, weight=5, currentWeight=-5) RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.1, weight=3, currentWeight=-4) RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.3, weight=2, currentWeight=-4) RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.2, weight=5, currentWeight=0) RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.1, weight=3, currentWeight=-5) RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.2, weight=5, currentWeight=0) RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.2, weight=5, currentWeight=-5) RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.3, weight=2, currentWeight=-4) RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.1, weight=3, currentWeight=-3) RoundRobinWeightBalance2 route server=MyServer(ip=192.1.1.2, weight=5, currentWeight=0)
6.3 DUBBO源码
public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin"; private static int RECYCLE_PERIOD = 60000; protected static class WeightedRoundRobin { // 权重 private int weight; // 动态权重 private AtomicLong current = new AtomicLong(0); // 更新时间 private long lastUpdate; public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; current.set(0); } public long increaseCurrent() { return current.addAndGet(weight); } public void sel(int total) { current.addAndGet(-1 * total); } public long getLastUpdate() { return lastUpdate; } public void setLastUpdate(long lastUpdate) { this.lastUpdate = lastUpdate; } } private ConcurrentMap> methodWeightMap = new ConcurrentHashMap >(); private AtomicBoolean updateLock = new AtomicBoolean(); protected Collection getInvokerAddrList(List > invokers, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); Map map = methodWeightMap.get(key); if (map != null) { return map.keySet(); } return null; } @Override protected Invoker doSelect(List > invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); ConcurrentMap map = methodWeightMap.get(key); if (map == null) { methodWeightMap.putIfAbsent(key, new ConcurrentHashMap ()); map = methodWeightMap.get(key); } // 总权重 int totalWeight = 0; // 最大当前权重 long maxCurrent = Long.MIN_VALUE; // 当前时间 long now = System.currentTimeMillis(); // 选中提供者 Invoker selectedInvoker = null; // 选中提供者权重对象 WeightedRoundRobin selectedWRR = null; // 遍历所有提供者 for (Invoker invoker : invokers) { String identifyString = invoker.getUrl().toIdentityString(); WeightedRoundRobin weightedRoundRobin = map.get(identifyString); // 获取权重 int weight = getWeight(invoker, invocation); if (weightedRoundRobin == null) { weightedRoundRobin = new WeightedRoundRobin(); weightedRoundRobin.setWeight(weight); map.putIfAbsent(identifyString, weightedRoundRobin); } if (weight != weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); } // 选择动态权重最大提供者 long cur = weightedRoundRobin.increaseCurrent(); weightedRoundRobin.setLastUpdate(now); if (cur > maxCurrent) { maxCurrent = cur; selectedInvoker = invoker; selectedWRR = weightedRoundRobin; } // 计算总权重 totalWeight += weight; } // 更新负载均衡容器 if (!updateLock.get() && invokers.size() != map.size()) { if (updateLock.compareAndSet(false, true)) { try { ConcurrentMap newMap = new ConcurrentHashMap (); newMap.putAll(map); Iterator > it = newMap.entrySet().iterator(); while (it.hasNext()) { Entry item = it.next(); if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) { it.remove(); } } methodWeightMap.put(key, newMap); } finally { updateLock.set(false); } } } if (selectedInvoker != null) { // 最大动态权重减去总权重 selectedWRR.sel(totalWeight); return selectedInvoker; } return invokers.get(0); } }
7 一致性哈希
一致性哈希策略具有三个特性:第一是在不新增或者删除提供者情况下,同一个客户端总是可以访问同一个提供者。第二是一致性哈希可以有效分散新增或者删除提供者带来的波动性。第三是一致性哈希虚拟节点可以更加有效分散特性二之波动性。
7.1 特性分析 (1) 特性一
在不新增或者删除提供者情况下,同一个客户端总是可以访问同一个提供者。第一步6个提供者分布在哈希环中,第二步client1发起访问请求,此时计算客户端哈希值位于哈希环位置,第三步沿着哈希环顺时针旋转,找到与客户端哈希值最近提供者server5。当哈希环结构不发生改变时,client1总是路由到server5。
哈希环结构应该选择什么数据结构呢?我们可以选择TreeMap构建哈希环,其底层使用了红黑树。
public class TreeMapTest { public static void main(String[] args) { // 构建哈希环 TreeMaptreeMap = new TreeMap (); treeMap.put(1, new MyServer("1")); treeMap.put(2, new MyServer("2")); treeMap.put(3, new MyServer("3")); treeMap.put(4, new MyServer("4")); treeMap.put(5, new MyServer("5")); treeMap.put(6, new MyServer("6")); // 找到第一个大于客户端哈希值的服务器 Integer clientHashCode = 5; SortedMap tailMap = treeMap.tailMap(clientHashCode, false); MyServer server = tailMap.get(tailMap.firstKey()); // MyServer(ip=6) System.out.println(server); } }
(2) 特性二
一致性哈希可以有效分散新增或者删除提供者带来的波动性,例如新增服务器server7,但是并不影响client1路由结果:
服务器server5发生宕机只会影响client1路由结果,并不会影响其它客户端路由结果:
(3) 特性三
一致性哈希虚拟节点可以更加有效分散特性二之波动性,例如我们可以为每个服务器节点新增一个虚拟节点,使得服务器分散得更加均匀。
7.2 代码实例
第一步编写基础负载均衡策略
public abstract class AbstractConsistentHashLoadBalance { public MyServer select(String clientIP, ListserverList) { return doSelect(clientIP, serverList); } public abstract MyServer doSelect(String clientIP, List serverList); }
第二步编写一致性哈希策略
public class ConsistentHashBalance1 extends AbstractConsistentHashLoadBalance { private ConsistentHashSelector consistentHashSelector; @Override public MyServer doSelect(String clientIP, ListserverList) { initialConsistentHashSelector(serverList); return consistentHashSelector.select(clientIP); } private class ConsistentHashSelector { private Integer identityHashCode; private TreeMap serverNodes = new TreeMap (); // 构建哈希环 public ConsistentHashSelector(Integer identityHashCode, List serverList) { this.identityHashCode = identityHashCode; TreeMap newServerNodes = new TreeMap (); for (MyServer server : serverList) { newServerNodes.put(hashCode(server.getIp()), server); } serverNodes = newServerNodes; } // 根据客户端IP路由 public MyServer select(String clientIP) { // 计算客户端哈希值 int clientHashCode = hashCode(clientIP); // 找到第一个大于客户端哈希值的服务器 SortedMap tailMap = serverNodes.tailMap(clientHashCode, false); if (CollectionUtils.isEmpty(tailMap)) { Integer firstKey = serverNodes.firstKey(); return serverNodes.get(firstKey); } // 找不到表示在最后一个节点和第一个节点之间 ->选择第一个节点 Integer firstKey = tailMap.firstKey(); return tailMap.get(firstKey); } // 计算哈希值 private int hashCode(String key) { return Objects.hashCode(key); } // 提供者列表哈希值 -> 如果新增或者删除提供者会发生变化 public Integer getIdentityHashCode() { return identityHashCode; } } private void initialConsistentHashSelector(List serverList) { // 计算提供者列表哈希值 Integer newIdentityHashCode = System.identityHashCode(serverList); // 提供者列表哈希值没有变化则无需重新构建哈希环 if (null != consistentHashSelector && (null != consistentHashSelector.getIdentityHashCode() && newIdentityHashCode == consistentHashSelector.getIdentityHashCode())) { return; } // 提供者列表哈希值发生变化则重新构建哈希环 consistentHashSelector = new ConsistentHashSelector(newIdentityHashCode, serverList); } }
第三步编写测试代码
public class LoadBalanceTest { public static void main(String[] args) { testConsistentHashBalance1(); } public static void testConsistentHashBalance1() { ListserverList = new ArrayList (); MyServer server1 = new MyServer("1"); MyServer server2 = new MyServer("2"); MyServer server3 = new MyServer("3"); MyServer server4 = new MyServer("4"); MyServer server5 = new MyServer("5"); MyServer server6 = new MyServer("6"); serverList.add(server1); serverList.add(server2); serverList.add(server3); serverList.add(server4); serverList.add(server5); serverList.add(server6); AbstractConsistentHashLoadBalance consistentHashBalance = new ConsistentHashBalance1(); for (int i = 0; i < 10; i++) { String clientIP = "5"; MyServer server = consistentHashBalance.select(clientIP, serverList); System.out.println("clientIP=" + clientIP + ",consistentHashBalance1 route server=" + server); } } }
第四步输出结果
clientIP=5,consistentHashBalance1 route server=MyServer(ip=6) clientIP=5,consistentHashBalance1 route server=MyServer(ip=6) clientIP=5,consistentHashBalance1 route server=MyServer(ip=6) clientIP=5,consistentHashBalance1 route server=MyServer(ip=6) clientIP=5,consistentHashBalance1 route server=MyServer(ip=6) clientIP=5,consistentHashBalance1 route server=MyServer(ip=6) clientIP=5,consistentHashBalance1 route server=MyServer(ip=6) clientIP=5,consistentHashBalance1 route server=MyServer(ip=6) clientIP=5,consistentHashBalance1 route server=MyServer(ip=6) clientIP=5,consistentHashBalance1 route server=MyServer(ip=6)
如果新增虚拟节点参看以下代码
public class ConsistentHashBalance2 extends AbstractConsistentHashLoadBalance { private ConsistentHashSelector consistentHashSelector; @Override public MyServer doSelect(String clientIP, ListserverList) { initialSelector(serverList); return consistentHashSelector.select(clientIP); } private class ConsistentHashSelector { private Integer identityHashCode; private Integer VIRTUAL_NODES_NUM = 16; private TreeMap serverNodes = new TreeMap (); public ConsistentHashSelector(Integer identityHashCode, List serverList) { this.identityHashCode = identityHashCode; TreeMap newServerNodes = new TreeMap (); for (MyServer server : serverList) { // 虚拟节点 for (int i = 0; i < VIRTUAL_NODES_NUM; i++) { int virtualKey = hashCode(server.getIp() + "_" + i); newServerNodes.put(virtualKey, server); } } serverNodes = newServerNodes; } public MyServer select(String clientIP) { int clientHashCode = hashCode(clientIP); SortedMap tailMap = serverNodes.tailMap(clientHashCode, false); if (CollectionUtils.isEmpty(tailMap)) { Integer firstKey = serverNodes.firstKey(); return serverNodes.get(firstKey); } Integer firstKey = tailMap.firstKey(); return tailMap.get(firstKey); } private int hashCode(String key) { return Objects.hashCode(key); } public Integer getIdentityHashCode() { return identityHashCode; } } private void initialSelector(List serverList) { Integer newIdentityHashCode = System.identityHashCode(serverList); if (null != consistentHashSelector && (null != consistentHashSelector.getIdentityHashCode() && newIdentityHashCode == consistentHashSelector.getIdentityHashCode())) { return; } consistentHashSelector = new ConsistentHashSelector(newIdentityHashCode, serverList); } }
7.3 DUBBO源码
public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash"; private final ConcurrentMap> selectors = new ConcurrentHashMap >(); @Override protected Invoker doSelect(List > invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector selector = (ConsistentHashSelector ) selectors.get(key); // 提供者列表哈希值发生变化则重新构建哈希环 if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector (invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector ) selectors.get(key); } return selector.select(invocation); } private static final class ConsistentHashSelector { private final TreeMap > virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; // 构建哈希环 ConsistentHashSelector(List > invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap >(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker invoker : invokers) { String address = invoker.getUrl().getAddress(); // 新增虚拟节点(默认160个) for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } // 负载均衡 public Invoker select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = md5(key); return selectForKey(hash(digest, 0)); } private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } private Invoker selectForKey(long hash) { Map.Entry > entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } // 哈希运算 private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes = value.getBytes(StandardCharsets.UTF_8); md5.update(bytes); return md5.digest(); } } }
8 最少活跃数策略
每个提供者维护并发处理的任务个数,任务个数越大活跃度越高。在服务消费者进行负载均衡时,第一查询提供者负载情况,第二选择活跃度最低的提供者,我们直接分析DUBBO源码:
public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive"; @Override protectedInvoker doSelect(List > invokers, URL url, Invocation invocation) { // invoker数量 int length = invokers.size(); // 最小调用次数 int leastActive = -1; // 调用次数等于最小次数invoker数量 int leastCount = 0; // 调用次数等于最小调用次数invoker下标集合 int[] leastIndexes = new int[length]; // 每个服务提供者权重 int[] weights = new int[length]; // 总权重 int totalWeight = 0; // 第一个调用者权重 int firstWeight = 0; // 权重值是否相同 boolean sameWeight = true; // 遍历invokers for (int i = 0; i < length; i++) { Invoker invoker = invokers.get(i); // 调用次数 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 获取权重 int afterWarmup = getWeight(invoker, invocation); // 设置权重 weights[i] = afterWarmup; // 第一个invoker或者调用次数小于最小调用次数 if (leastActive == -1 || active < leastActive) { leastActive = active; leastCount = 1; leastIndexes[0] = i; totalWeight = afterWarmup; firstWeight = afterWarmup; sameWeight = true; } // 当前服务提供者调用次数等于最小调用次数 else if (active == leastActive) { // 记录下标 leastIndexes[leastCount++] = i; // 新增总权重值 totalWeight += afterWarmup; // 权重值是否相同 if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // 只有一个invoker调用次数等于最小调用次数直接返回 if (leastCount == 1) { return invokers.get(leastIndexes[0]); } // 多个invoker调用次数等于最小调用次数并且权重值不相同->根据权重值选择 if (!sameWeight && totalWeight > 0) { int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // 多个invoker调用次数等于最小调用次数并且权重值相同->随机选择 return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
9 文章总结
第一本文首先分析了负载均衡、集群容错、服务降级这三组概念,第二结合代码分析了简单随机,加权随机,简单轮询,简单加权轮询,平滑加权轮询,一致性哈希,最少活跃数七种负载均衡策略,其中权重计算、平滑加权轮询,一致性哈希算法值得注意,希望本文对大家有所帮助。
欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我个人微信「java_front」一起交流学习
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)