先看公文。
之前我自己也看到了上面的文字,不是很懂。随着对分布式系统的深入接触和对DUBBO源代码的研究,我们有了更准确的认识。总结一下:阅读技术文献时,对某些语言的理解受到读者自身情况的限制,理解往往有深有浅。闲话少说。本文将逐一分析图中的组件。
1。调用者
Invoker是可以调用服务的提供者的抽象。Invoker封装了提供者地址和服务接口信息。
1.1API
1.2分层树形结构
分析表明有两个主要分支。
1.AbstractInvoker:主要具体的远程实现,与RPC协议相关,属于dubbo-rpc-api范畴。
2.AbstractClusterInvoicer:主要逻辑包括Invoker的选择,与高可用性相关,属于dubbo-cluster范畴。
1.3abstractinvokerVSAbstractClusterInvoker
1.3.1类图比较
通过下图不难发现:AbstractClusterInvoker相比AbstractInvoker有更多的select、doselect、doselect、reselect方法。这些方法的作用也是可以猜到的。
1.3.2对比两个实现类
选择AbstractInvoker的子类DubboInvoker,
以及FailoverClusterInvoker,AbstractClusterInvoker的子类。
FailoverClusterInvoker是dubbo集群容错的默认方案,Dubbo协议是默认协议。
先看AbstractInvoker分支。
public abstract class AbstractInvoker<T> implements Invoker<T> { public Result invoke(Invocation inv) throws RpcException { RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); Map<String, String> context = RpcContext.getContext().getAttachments(); try { return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception } catch (RpcException e) { } catch (Throwable e) { } } protected abstract Result doInvoke(Invocation invocation) throws Throwable; .. } public class DubboInvoker<T> extends AbstractInvoker<T> { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); logger.debug("doInvoke start -----------------------------"); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); }finally { logger.debug("doInvoke end-----------------------------"); } } ... }这里可以清楚的看到,远程调用有三种:
1。不需要返回值的调用(所谓的单向)
2。异步
3。同步。
在第一种情况下,客户机只发送请求,而不管返回的结果。
对于第二种情况,客户端除了发出请求之外,还需要将结果插入ThreadLocal变量,以便客户端可以获得返回值。
对于第三种情况,客户端除了发出请求之外,还会同步等待返回结果。
再次查看AbstractClusterInvoker分支。
public abstract class AbstractClusterInvoker<T> implements Invoker<T> { public Result invoke(final Invocation invocation) throws RpcException { checkWheatherDestoried(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } protected List<Invoker<T>> list(Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); return invokers; } protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException; .. } public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重试时,进行重新选择,避免重试时invoker列表已发生变化. //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变 if (i > 0) { checkWheatherDestoried(); copyinvokers = list(invocation); //重新检查一下 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List)invoked); try { Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } } ... }综上所述,AbstractClusterInvoker和AbstractClusterInvoker都需要实现Invoker接口,并且都声明了doInvoke抽象方法。但是方法定义有所不同。
protected abstract Result doInvoke(Invocation invocation) throws Throwable; protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException;AbstractClusterInvoker需要一个Invoker列表,它们来自目录,而负载均衡可以理解为一种负载均衡策略。
两者的联系:AbstractClusterInvoker比AbstractInvoker多了一些选项和负载均衡的部分,最后会调用AbstractInvoker的分支,负责具体的RPC调用工作。
1.4常用容错方案的比较
优点
缺点
实现类[/br
无法自动切换。出现故障时,再次尝试另一台服务器,通常是为了读取 *** 作(推荐)
[
快速失败,只调用一次,立即报告失败。它通常用于非幂等写 *** 作
[
故障保护。出现异常时,直接忽略。它通常用于写审计日志和其他 *** 作
故障自动恢复,失败请求后台记录,定期重传,通常用于消息通知 *** 作
并行调用多个服务器,只要其中一个成功,就会返回。它通常用于实时读取 *** 作
[/h
串
广播调用所有提供者,逐个调用,任意一台报错则报错,通常用于更新提供方本地状态速度慢,任意一台报错则报错
BroadcastClusterInvoker
2.摘要
*ClusterInvoker使用路由器和目录来获取Invoker列表。
*ClusterInvoker然后在LoadBalance提供的负载平衡策略的帮助下,返回Invoker列表中的一个可用Invoker。
*Directory代表多个调用者,可以理解为调用者的逻辑集合,负责离线和在线调用者。
路由器和目录都给我提供了调用者列表。通过提供的API,很容易找出区别。
//Directory List<Invoker<T>> list(Invocation invocation) throws RpcException; //Route <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;目录:返回的调用者列表,代表当前正常对外提供服务的调用者列表。重点维护可用节点列表。
路由:返回的调用者列表在已有的可用调用者列表中,根据规则重新过滤,重点是规则匹配、过滤等。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)