public class DubboProtocol extends AbstractProtocol { public1.1 DubboProtocol.getClients()Invoker refer(Class type, URL url) throws RpcException { return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url)); } public Invoker protocolBindingRefer(Class serviceType, URL url) throws RpcException { optimizeSerialization(url); // 在这里调用getClients()方法来创建客户端连接,详见1.1 DubboInvoker invoker = new DubboInvoker (serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; } }
public class DubboProtocol extends AbstractProtocol { private ExchangeClient[] getClients(URL url) { boolean useShareConnect = false; int connections = url.getParameter(CONNECTIONS_KEY, 0); List1.2 DubboProtocol.getSharedClient()shareClients = null; // if not configured, connection is shared, otherwise, one connection for one service // 如果url没有设置connections属性,则默认对当前provider_url创建一个共享的Connection if (connections == 0) { useShareConnect = true; String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null); connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr); // 继续交由getSharedClient()方法来处理,具体见1.2 shareClients = getSharedClient(url, connections); } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (useShareConnect) { clients[i] = shareClients.get(i); } else { clients[i] = initClient(url); } } return clients; } }
public class DubboProtocol extends AbstractProtocol { // key=host:port value=Exchanger,用来缓存已创建的Client private final Map1.3 DubboProtocol.buildReferenceCountExchangeClientList()> referenceClientMap = new ConcurrentHashMap<>(); private List getSharedClient(URL url, int connectNum) { // 这里的key即provider的ip:port String key = url.getAddress(); // 获取缓存对象,第一次创建时默认都为空 List clients = referenceClientMap.get(key); if (checkClientCanUse(clients)) { batchClientRefIncr(clients); return clients; } locks.putIfAbsent(key, new Object()); synchronized (locks.get(key)) { clients = referenceClientMap.get(key); // dubbo check if (checkClientCanUse(clients)) { batchClientRefIncr(clients); return clients; } // connectNum must be greater than or equal to 1 connectNum = Math.max(connectNum, 1); // If the clients is empty, then the first initialization is if (CollectionUtils.isEmpty(clients)) { // 在这里创建client,具体见1.3 clients = buildReferenceCountExchangeClientList(url, connectNum); referenceClientMap.put(key, clients); } else { for (int i = 0; i < clients.size(); i++) { ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i); // If there is a client in the list that is no longer available, create a new one to replace him. if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) { clients.set(i, buildReferenceCountExchangeClient(url)); continue; } referenceCountExchangeClient.incrementAndGetCount(); } } locks.remove(key); return clients; } } }
public class DubboProtocol extends AbstractProtocol { private ListbuildReferenceCountExchangeClientList(URL url, int connectNum) { List clients = new ArrayList<>(); for (int i = 0; i < connectNum; i++) { // 继续交由buildReferenceCountExchangeClient处理 clients.add(buildReferenceCountExchangeClient(url)); } return clients; } private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) { // 交由initClient()处理 ExchangeClient exchangeClient = initClient(url); return new ReferenceCountExchangeClient(exchangeClient); } private ExchangeClient initClient(URL url) { // 设置客户端类型,默认为netty_client String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT)); ... ExchangeClient client; try { // 是否懒加载,默认为false if (url.getParameter(LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); } else { // 最终交由Exchangers来处理,具体见2、2.1 client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; } }
总结:经过这一大段代码,DubboProtocol.refer()方法,会根据provider_url中的connections参数(默认为1,针对当前provider创建一个client,当前应用对该provider的 *** 作都交由这一个client来处理),创建对应个数的Client,经过一系列的调用,最终交由Exchangers来创建。
2.Exchangers.connect()public class Exchangers { public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { ... url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 故最终还是交由HeaderExchanger.connect()方法,具体见2.1 return getExchanger(url).connect(url, handler); } // 根据url获取对应类型的Exchanger,默认使用HeaderExchanger public static Exchanger getExchanger(URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger(String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); } }
2.1 HeaderExchanger.connect()public class HeaderExchanger implements Exchanger { public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 与分析provider时类似,继续交由Transports来处理,具体见2.2 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } }2.2 Transporters.connect()
public class Transporters { public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } // 创建ChannelHandler,根据输入的handlers个数不同,创建不同的Handler对象 // 本例中默认为DecodeHandler ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } // Transporter默认为NettyTransporter,所以最终还是调用NettyTransporter.connect()进行处理,具体见2.3 return getTransporter().connect(url, handler); } public static Transporter getTransporter() { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); } }2.3 NettyTransporter.connect()
public class NettyTransporter implements Transporter { public Client connect(URL url, ChannelHandler handler) throws RemotingException { // 最终创建NettyClient,具体见3 return new NettyClient(url, handler); } }
与我们之前分析服务端的创建过程差不多,也是Exchanger --> Transporter --> Netty(NettyServer、NettyClient)
3.NettyClientpublic class NettyClient extends AbstractClient { private Bootstrap bootstrap; private volatile Channel channel; public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException { // 调用父类的构造处理 super(url, wrapChannelHandler(url, handler)); } // 创建Netty Bootstrap protected void doOpen() throws Throwable { // 标准的创建Netty_client Bootstrap的过程 final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); bootstrap = new Bootstrap(); bootstrap.group(NIO_EVENT_LOOP_GROUP) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .channel(socketChannelClass()); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout())); bootstrap.handler(new ChannelInitializer总结:() { @Override protected void initChannel(SocketChannel ch) throws Exception { int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler)); } NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); // 添加Handler处理器 ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) .addLast("handler", nettyClientHandler); String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST); if(socksProxyHost != null) { int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort)); ch.pipeline().addFirst(socks5ProxyHandler); } } }); } // 创建对服务端的连接 protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); ChannelFuture future = bootstrap.connect(getConnectAddress()); try { boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS); if (ret && future.isSuccess()) { Channel newChannel = future.channel(); try { Channel oldChannel = NettyClient.this.channel; if (oldChannel != null) { try { if (logger.isInfoEnabled()) { logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel); } oldChannel.close(); } finally { NettyChannel.removeChannelIfDisconnected(oldChannel); } } } finally { ... } } ... } finally { ... } } } public abstract class AbstractClient extends AbstractEndpoint implements Client { public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); initExecutor(url); try { // 交由子类NettyClient来执行 doOpen(); } catch (Throwable t) { ... } try { // connect. connect(); ... } } ... } // 创建线程池,同NettyServer时的创建过程,只不过client最终的线程池为CachedThreadPool private void initExecutor(URL url) { // 设置客户端线程名称,本例中 threadname=DubboClientHandler-xxx:20880 url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME); url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL); executor = executorRepository.createExecutorIfAbsent(url); } }