在学习完了Netty的基本知识后,我们来实战下。
笔者之前有使用过Sentinel的相关技术,其中client与server端的交互就是使用Netty来完成的。
那本文我们就来分析下NettyClient的使用。
笔者分析的Sentinel源码版本为1.7.0
1.定义客户端交互接口面向接口编程,而不是面向类编程。作为一个框架设计者,需要时刻谨记该原则。
Sentinel也是,在创建客户端连接时,首先提出一个接口。
public interface ClusterTransportClient { void start() throws Exception; void stop() throws Exception; ClusterResponse sendRequest(ClusterRequest request) throws Exception; boolean isReady(); }
分析下其主要方法
start() 用于启动一个client
stop() 用于关闭一个client
isReady() 用于判断client的状态,若已创建完成连接,则判定为可用态,可以用于发送请求
sendRequest() client发送请求
2.定义基于Netty的实现类面向接口而不是实现类。如果以后我们需要切换为别的client创建方式(比如Mina),则可以直接基于(Mina)来创建客户端连接。
下面来看下基于Netty的ClusterTransportClient接口实现
2.1 NettyTransportClient的参数public class NettyTransportClient implements ClusterTransportClient { @SuppressWarnings("PMD.ThreadPoolCreationRule") private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, new NamedThreadFactory("sentinel-cluster-transport-client-scheduler")); public static final int RECONNECT_DELAY_MS = 2000; // 用于连接的host port private final String host; private final int port; // 基于Netty创建完对Server连接后,返回的Channel,可用于后续的请求发送 private Channel channel; // Netty自带的 private NioEventLoopGroup eventLoopGroup; // 请求处理Handler private TokenClientHandler clientHandler; private final AtomicInteger idGenerator = new AtomicInteger(0); // 防止多次创建的状态位 private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF); // 失败连接次数 private final AtomicInteger failConnectedTime = new AtomicInteger(0); private final AtomicBoolean shouldRetry = new AtomicBoolean(true); // 构造方法中主要就是提供server的host port信息 public NettyTransportClient(String host, int port) { AssertUtil.assertNotBlank(host, "remote host cannot be blank"); AssertUtil.isTrue(port > 0, "port should be positive"); this.host = host; this.port = port; } ... }
从当前的构造方法可以看出,host和port都是通过外部调用创建的。其他参数都是比较正常的Netty client创建中所使用到的。
下面来看下start()方法
2.2 NettyTransportClient.start() 启动客户端连接public class NettyTransportClient implements ClusterTransportClient { public void start() throws Exception { shouldRetry.set(true); startInternal(); } private void startInternal() { connect(initClientBootstrap()); } // 用于创建Bootstrap private Bootstrap initClientBootstrap() { Bootstrap b = new Bootstrap(); eventLoopGroup = new NioEventLoopGroup(); b.group(eventLoopGroup) .channel(NioSocketChannel.class) // 配置底层TCP相关参数 .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout()) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { clientHandler = new TokenClientHandler(currentState, disconnectCallback); // 设置请求执行相关Handler类 ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldbasedframeDecoder(1024, 0, 2, 0, 2)); pipeline.addLast(new NettyResponseDecoder()); pipeline.addLast(new LengthFieldPrepender(2)); pipeline.addLast(new NettyRequestEncoder()); pipeline.addLast(clientHandler); } }); return b; } // connect方法用于真正创建远程连接 private void connect(Bootstrap b) { // 这里的currentState状态位,避免多次创建连接 if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) { b.connect(host, port) // 添加一个连接监听 .addListener(new GenericFutureListener () { @Override public void operationComplete(ChannelFuture future) { if (future.cause() != null) { // 连接失败,则打印日志,记录失败次数 RecordLog.warn( String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times", host, port, failConnectedTime.get()), future.cause()); failConnectedTime.incrementAndGet(); channel = null; } else { failConnectedTime.set(0); channel = future.channel(); RecordLog.info( "[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">"); } } }); } } }
有关于currentState状态位的使用,我们可以学习下,通过该状态位的使用,可以避免创建多个连接。这样NettyTransportClient对外提供的Channel一直都是同一个。
按照上面的start()方式,那么当连接失败时,客户端就不再重试了嘛?为什么没有看到重试方法呢?
答案是有重试的,只不过不是在当前startInternal()方法,而是启动了一个定时任务,来不停的执行重试。
2.3 创建连接重试public class NettyTransportClient implements ClusterTransportClient { private Runnable disconnectCallback = new Runnable() { @Override public void run() { // start()方法调用之后,shouldRetry=true,所以会直接执行到下面的调度方法 if (!shouldRetry.get()) { return; } // 通过指定频率来执行重连 SCHEDULER.schedule(new Runnable() { @Override public void run() { if (shouldRetry.get()) { RecordLog.info("[NettyTransportClient] Reconnecting to server <" + host + ":" + port + ">"); try { startInternal(); } catch (Exception e) { RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e); } } } }, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS); cleanUp(); } }; }
disconnectCallback确实实现了连接的重试,那么这个RUNNABLE是什么时候被触发的呢?
全文搜索下,可以看到在Bootstrap创建的时候添加进去的
private Bootstrap initClientBootstrap() { Bootstrap b = new Bootstrap(); eventLoopGroup = new NioEventLoopGroup(); b.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout()) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { // 在创建TokenClientHandler的时候被使用到了,那么具体是如何使用的呢?具体见TokenClientHandler clientHandler = new TokenClientHandler(currentState, disconnectCallback); ... pipeline.addLast(clientHandler); } }); return b; }
2.3.1 TokenClientHandler
public class TokenClientHandler extends ChannelInboundHandlerAdapter { private final AtomicInteger currentState; private final Runnable disconnectCallback; public TokenClientHandler(AtomicInteger currentState, Runnable disconnectCallback) { this.currentState = currentState; this.disconnectCallback = disconnectCallback; } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + getRemoteAddress(ctx)); currentState.set(ClientConstants.CLIENT_STATUS_OFF); // 在这里被调用到了 disconnectCallback.run(); } }
从这一系列的骚 *** 作,在channel断开时,也就是unregistered方法时,此时就需要进行重连了。
2.4 NettyTransportClient.stop() 关闭客户端连接public class NettyTransportClient implements ClusterTransportClient { public void stop() throws Exception { // Stop retrying for connection. shouldRetry.set(false); // 如果正在连接状态的,则sleep 200ms while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) { try { Thread.sleep(200); } catch (Exception ex) { // Ignore. } } // 调用如下 cleanUp(); failConnectedTime.set(0); RecordLog.info("[NettyTransportClient] Cluster transport client stopped"); } private void cleanUp() { // 直接关闭channel if (channel != null) { channel.close(); channel = null; } if (eventLoopGroup != null) { eventLoopGroup.shutdownGracefully(); } } }
关闭客户端就是常规的方式,多了一个状态的判断。
2.5 NettyTransportClient.sendRequest() 发送请求public class NettyTransportClient implements ClusterTransportClient { @Override public ClusterResponse sendRequest(ClusterRequest request) throws Exception { if (!isReady()) { throw new SentinelClusterException(ClusterErrorMessages.CLIENT_NOT_READY); } if (!validRequest(request)) { throw new SentinelClusterException(ClusterErrorMessages.BAD_REQUEST); } // 获取一个唯一ID,标志当前这个唯一请求 int xid = getCurrentId(); try { // request设置完id之后,直接调用channel进行发送 request.setId(xid); channel.writeAndFlush(request); // 发送请求和接收响应是异步的,通过Future来实现 ChannelPromise promise = channel.newPromise(); TokenClientPromiseHolder.putPromise(xid, promise); // 在这里等待响应特定时间后,如果还没有获取到结果,则直接抛出异常 if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) { throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT); } // 这里通过TokenClientPromiseHolder来完成请求响应的异步封装,详细可参考2.5.1 SimpleEntryentry = TokenClientProseHolder.getEntry(xid); if (entry == null || entry.getValue() == null) { // Should not go through here. throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS); } return entry.getValue(); } finally { TokenClientPromiseHolder.remove(xid); } } }
2.5.1 响应异步封装
笔者觉得这里还是比较重要的知识点,尤其是我们需要对响应进行异步接收的时候。
public final class TokenClientPromiseHolder { // 该Map主要用于存放ID对应的channelPromise private static final Map> PROMISE_MAP = new ConcurrentHashMap<>(); // 请求发送结束后,则调用该方法,将id对应的promise存放到map中 public static void putPromise(int xid, ChannelPromise promise) { PROMISE_MAP.put(xid, new SimpleEntry (promise, null)); } public static SimpleEntry getEntry(int xid) { return PROMISE_MAP.get(xid); } public static void remove(int xid) { PROMISE_MAP.remove(xid); } // 当接收到响应时,则调用该方法 public static boolean completePromise(int xid, ClusterResponse response) { if (!PROMISE_MAP.containsKey(xid)) { return false; } // 根据请求id查找到对应的ChannelPromise SimpleEntry entry = PROMISE_MAP.get(xid); if (entry != null) { ChannelPromise promise = entry.getKey(); if (promise.isDone() || promise.isCancelled()) { return false; } // 设置promise状态为success,并将值设置到entry中 entry.setValue(response); promise.setSuccess(); return true; } return false; } private TokenClientPromiseHolder() { } }
异步封装响应状态和响应内容,还是很重要的,那么这个completePromise()方法在何时被调用呢?
public class TokenClientHandler extends ChannelInboundHandlerAdapter { // 读到server端的响应时 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ClusterResponse) { ClusterResponse> response = (ClusterResponse) msg; if (response.getType() == ClusterConstants.MSG_TYPE_PING) { handlePingResponse(ctx, response); return; } // 接收到响应时,则调用该方法,server端也会将该id传回。 TokenClientPromiseHolder.completePromise(response.getId(), response); } } }总结:
从Sentienl创建客户端连接的过程中,我们还是能学到很多知识点的。
比如连接状态的判断,避免多次创建连接;
重试连接线程的执行,在channelUnregistered()时候调用;
每次请求都设置一个唯一的ID,这样可以异步接收响应;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)