public interface ClusterTokenServer { void start() throws Exception; void stop() throws Exception; }
2.1 NettyTransportServer基本参数public class NettyTransportServer implements ClusterTokenServer { // 定义EventLoop线程数,默认与当前服务器核数相关,支持用户自定义配置 private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); // 支持连接创建失败重试 private static final int MAX_RETRY_TIMES = 3; private static final int RETRY_SLEEP_MS = 2000; // 当前服务绑定端口 private final int port; // NettyServer创建时的bossGroup 和 workGroup private NioEventLoopGroup bossGroup; private NioEventLoopGroup workerGroup; // 这里是对连接上来的客户端连接的一个管理 private final ConnectionPool connectionPool = new ConnectionPool(); // 跟客户端一样,也是一个状态位的判断,防止多次启动 private final AtomicInteger currentState = new AtomicInteger(SERVER_STATUS_OFF); private final AtomicInteger failedTimes = new AtomicInteger(0); // 构造函数,只需要提供一个port端口信息即可 public NettyTransportServer(int port) { this.port = port; } ... }
除了常规的port和bossGroup、workerGroup之外,这里的参数还有些骚 *** 作是值得我们学习的。
2.2 NettyTransportServer.start() 启动服务端public class NettyTransportServer implements ClusterTokenServer { public void start() { // 首先进行状态位的设置,避免多次启动 if (!currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) { return; } ServerBootstrap b = new ServerBootstrap(); // reactor模式,设置workerGroup线程数 this.bossGroup = new NioEventLoopGroup(1); this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // server tcp的基本参数 .option(ChannelOption.SO_BACKLOG, 128) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LengthFieldbasedframeDecoder(1024, 0, 2, 0, 2)); p.addLast(new NettyRequestDecoder()); p.addLast(new LengthFieldPrepender(2)); p.addLast(new NettyResponseEncoder()); // 最主要的是TokenServerHandler这个Handler,用于处理客户端请求 p.addLast(new TokenServerHandler(connectionPool)); } }) // 设置客户端连接的基本TCP参数 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.SO_SNDBUF, 32 * 1024) .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .childOption(ChannelOption.SO_TIMEOUT, 10) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_RCVBUF, 32 * 1024); b.bind(port).addListener(new GenericFutureListener () { // 监听server绑定是否成功,如果失败,则继续重试,最大重试次数不超过MAX_RETRY_TIMES public void operationComplete(ChannelFuture future) { if (future.cause() != null) { RecordLog.info("[NettyTransportServer] Token server start failed (port=" + port + "), failedTimes: " + failedTimes.get(), future.cause()); currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_OFF); int failCount = failedTimes.incrementAndGet(); if (failCount > MAX_RETRY_TIMES) { return; } try { // 这里的sleep,类似于指数避退的方式,很有意思 Thread.sleep(failCount * RETRY_SLEEP_MS); start(); } catch (Throwable e) { RecordLog.info("[NettyTransportServer] Failed to start token server when retrying", e); } } else { RecordLog.info("[NettyTransportServer] Token server started success at port " + port); currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED); } } }); } }
这里的重试 *** 作,还是值得我们学习下的,一般来说绑定本地port应该不会失败,但如果因为某些原因失败时,我们应该在代码中可以自动重试
2.3 NettyTransportServer.stop() 停止服务端public class NettyTransportServer implements ClusterTokenServer { public void stop() { // 还是先判断状态位 while (currentState.get() == SERVER_STATUS_STARTING) { try { Thread.sleep(500); } catch (InterruptedException e) { // Ignore. } } if (currentState.compareAndSet(SERVER_STATUS_STARTED, SERVER_STATUS_OFF)) { try { // 关闭线程资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); // 这里还会关闭所有的客户端连接 connectionPool.shutdownAll(); failedTimes.set(0); RecordLog.info("[NettyTransportServer] Sentinel token server stopped"); } catch (Exception ex) { RecordLog.warn("[NettyTransportServer] Failed to stop token server (port=" + port + ")", ex); } } } }2.4 管理客户端连接
这个 *** 作是之前没有考虑过的,看了这里之后,发现还可以直接管理客户端连接。
2.4.1 定义ConnectionPool连接管理池
public class ConnectionPool { @SuppressWarnings("PMD.ThreadPoolCreationRule") private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2); // 实际就是一个Map private final MapCONNECTION_MAP = new ConcurrentHashMap (); ... }
2.4.2 客户端连接创建时添加到连接池中
public class TokenServerHandler extends ChannelInboundHandlerAdapter { private final ConnectionPool globalConnectionPool; public TokenServerHandler(ConnectionPool globalConnectionPool) { this.globalConnectionPool = globalConnectionPool; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 在这里调用的 globalConnectionPool.createConnection(ctx.channel()); String remoteAddress = getRemoteAddress(ctx); } } // ConnectionPool public class ConnectionPool { @SuppressWarnings("PMD.ThreadPoolCreationRule") private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2); // 实际就是一个Map private final MapCONNECTION_MAP = new ConcurrentHashMap (); public void createConnection(Channel channel) { if (channel != null) { // 创建完NettyConnection后,直接保存到Map中 Connection connection = new NettyConnection(channel, this); // 这里的key以客户端的ip:port String connKey = getConnectionKey(channel); CONNECTION_MAP.put(connKey, connection); } } private String getConnectionKey(Channel channel) { InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress(); String remoteIp = socketAddress.getAddress().getHostAddress(); int remotePort = socketAddress.getPort(); return remoteIp + ":" + remotePort; } } // NettyConnection public class NettyConnection implements Connection { // 基本属性 private String remoteIp; private int remotePort; private Channel channel; // 这里记录了当前channel最后一次read时间,用于后续 *** 作 private long lastReadTime; private ConnectionPool pool; }
2.4.2 ConnectionPool清理长期未read NettyConnection
// TokenServerHandler 请求处理类 public class TokenServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 当发生read调用时,先将connection的 readTime更新下 globalConnectionPool.refreshLastReadTime(ctx.channel()); ... } } // ConnectionPool public class ConnectionPool { public void refreshLastReadTime(Channel channel) { if (channel != null) { String connKey = getConnectionKey(channel); Connection connection = CONNECTION_MAP.get(connKey); if (connection != null) { // 更新为当前时间 connection.refreshLastReadTime(System.currentTimeMillis()); } } } // 定时器清理长时间未read的connection private ScheduledFuture scanTaskFuture = null; private synchronized void startScan() { if (scanTaskFuture == null || scanTaskFuture.isCancelled() || scanTaskFuture.isDone()) { // 启动定时器,最终任务交由ScanIdleConnectionTask执行 scanTaskFuture = TIMER.scheduleAtFixedRate( new ScanIdleConnectionTask(this), 10, 30, TimeUnit.SECONDS); } } } // ScanIdleConnectionTask public class ScanIdleConnectionTask implements Runnable { // 将ConnectionPool当做入参传递过来 private final ConnectionPool connectionPool; public ScanIdleConnectionTask(ConnectionPool connectionPool) { this.connectionPool = connectionPool; } @Override public void run() { try { int idleSeconds = ClusterServerConfigManager.getIdleSeconds(); long idleTimeMillis = idleSeconds * 1000; if (idleTimeMillis < 0) { idleTimeMillis = ServerTransportConfig.DEFAULT_IDLE_SEConDS * 1000; } long now = System.currentTimeMillis(); // 获取所有连接,判断当前时间与连接的上次读时间的差值,如果超过我们规定的值,则直接关闭connection List总结:connections = connectionPool.listAllConnection(); for (Connection conn : connections) { if ((now - conn.getLastReadTime()) > idleTimeMillis) { RecordLog.info( String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. " + "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds) ); conn.close(); } } } catch (Throwable t) { RecordLog.warn("[ScanIdleConnectionTask] Failed to clean-up idle tasks", t); } } }