https://www.zsythink.net/archives/1182
https://hub.fastgit.org/apache/dubbo/blob/master/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/loadbalance/ConsistentHashLoadBalance.java
package org.apache.dubbo.rpc.cluster.loadbalance; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.io.Bytes; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.support.RpcUtils; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN; public class ConsistentHashLoadBalance extends AbstractLoadBalance { public static final String NAME = "consistenthash"; public static final String HASH_NODES = "hash.nodes"; public static final String HASH_ARGUMENTS = "hash.arguments"; private final ConcurrentMap> selectors = new ConcurrentHashMap >(); @SuppressWarnings("unchecked") @Override protected Invoker doSelect(List > invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // using the hashcode of list to compute the hash only pay attention to the elements in the list int invokersHashCode = invokers.hashCode(); ConsistentHashSelector selector = (ConsistentHashSelector ) selectors.get(key); if (selector == null || selector.identityHashCode != invokersHashCode) { selectors.put(key, new ConsistentHashSelector (invokers, methodName, invokersHashCode)); selector = (ConsistentHashSelector ) selectors.get(key); } return selector.select(invocation); } private static final class ConsistentHashSelector { private final TreeMap > virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List > invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap >(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160); String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 设置replicaNumber数量的虚拟节点,以防止节点偏斜现象 for (Invoker invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = Bytes.getMD5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = Bytes.getMD5(key); return selectForKey(hash(digest, 0)); } private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } private Invoker selectForKey(long hash) { // 返回给定key大于或等于的最小entry Map.Entry > entry = virtualInvokers.ceilingEntry(hash); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)