Netty实战之Sentinel框架应用(二)

Netty实战之Sentinel框架应用(二),第1张

Netty实战之Sentinel框架应用(二) 前言:

上文中了解完Sentinel中NettyClient的创建之后,本文就来学习下NettyServer是如何创建的。

相对NettyClient而言,server相对容易些。

1.定义服务端接口

同样的,面向接口编程。服务端也是首先创建一个接口,如下所示:

public interface ClusterTokenServer {

    
    void start() throws Exception;

    
    void stop() throws Exception;
}

相对客户端的接口而言,简单许多,只有start()和stop()方法 

2.定义基于Netty的服务端实现类

同样的,基于接口的创建方式,可以很方便的切换到其他方式(如Mina)的服务端创建方式。

2.1 NettyTransportServer基本参数
public class NettyTransportServer implements ClusterTokenServer {

    // 定义EventLoop线程数,默认与当前服务器核数相关,支持用户自定义配置
    private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1,
            SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
    // 支持连接创建失败重试
    private static final int MAX_RETRY_TIMES = 3;
    private static final int RETRY_SLEEP_MS = 2000;

    // 当前服务绑定端口
    private final int port;

    // NettyServer创建时的bossGroup 和 workGroup
    private NioEventLoopGroup bossGroup;
    private NioEventLoopGroup workerGroup;

    // 这里是对连接上来的客户端连接的一个管理
    private final ConnectionPool connectionPool = new ConnectionPool();

    // 跟客户端一样,也是一个状态位的判断,防止多次启动
    private final AtomicInteger currentState = new AtomicInteger(SERVER_STATUS_OFF);
    private final AtomicInteger failedTimes = new AtomicInteger(0);

    // 构造函数,只需要提供一个port端口信息即可
    public NettyTransportServer(int port) {
        this.port = port;
    }
    ...
}

除了常规的port和bossGroup、workerGroup之外,这里的参数还有些骚 *** 作是值得我们学习的。

1)state状态位的判断,在客户端创建时也有用到,可以防止多次创建

2)失败重试参数的设置。设置最大重试次数和失败重试时间

3)有关于EventLoopGroup的线程数,支持用于自定义,否则使用默认值(线程数与当前机器的核数相关)

4)提供ConnectionPool用于管理连接上来的客户端连接

2.2 NettyTransportServer.start() 启动服务端
public class NettyTransportServer implements ClusterTokenServer {
	public void start() {
        // 首先进行状态位的设置,避免多次启动
        if (!currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) {
            return;
        }

        ServerBootstrap b = new ServerBootstrap();
        // reactor模式,设置workerGroup线程数
        this.bossGroup = new NioEventLoopGroup(1);
        this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS);
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
            	// server tcp的基本参数
                .option(ChannelOption.SO_BACKLOG, 128)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new LengthFieldbasedframeDecoder(1024, 0, 2, 0, 2));
                        p.addLast(new NettyRequestDecoder());
                        p.addLast(new LengthFieldPrepender(2));
                        p.addLast(new NettyResponseEncoder());
                        // 最主要的是TokenServerHandler这个Handler,用于处理客户端请求
                        p.addLast(new TokenServerHandler(connectionPool));
                    }
                })
            
            	// 设置客户端连接的基本TCP参数
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
                .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                .childOption(ChannelOption.SO_TIMEOUT, 10)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
        b.bind(port).addListener(new GenericFutureListener() {
            // 监听server绑定是否成功,如果失败,则继续重试,最大重试次数不超过MAX_RETRY_TIMES
            public void operationComplete(ChannelFuture future) {
                if (future.cause() != null) {
                    RecordLog.info("[NettyTransportServer] Token server start failed (port=" + port + "), failedTimes: " + failedTimes.get(),
                            future.cause());
                    currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_OFF);
                    int failCount = failedTimes.incrementAndGet();
                    if (failCount > MAX_RETRY_TIMES) {
                        return;
                    }

                    try {
                        // 这里的sleep,类似于指数避退的方式,很有意思
                        Thread.sleep(failCount * RETRY_SLEEP_MS);
                        start();
                    } catch (Throwable e) {
                        RecordLog.info("[NettyTransportServer] Failed to start token server when retrying", e);
                    }
                } else {
                    RecordLog.info("[NettyTransportServer] Token server started success at port " + port);
                    currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED);
                }
            }
        });
    }
}

这里的重试 *** 作,还是值得我们学习下的,一般来说绑定本地port应该不会失败,但如果因为某些原因失败时,我们应该在代码中可以自动重试

2.3 NettyTransportServer.stop() 停止服务端
public class NettyTransportServer implements ClusterTokenServer {
	public void stop() {
        // 还是先判断状态位
        while (currentState.get() == SERVER_STATUS_STARTING) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                // Ignore.
            }
        }

        if (currentState.compareAndSet(SERVER_STATUS_STARTED, SERVER_STATUS_OFF)) {
            try {
                // 关闭线程资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
                // 这里还会关闭所有的客户端连接
                connectionPool.shutdownAll();

                failedTimes.set(0);

                RecordLog.info("[NettyTransportServer] Sentinel token server stopped");
            } catch (Exception ex) {
                RecordLog.warn("[NettyTransportServer] Failed to stop token server (port=" + port + ")", ex);
            }
        }
    }
}
2.4 管理客户端连接

这个 *** 作是之前没有考虑过的,看了这里之后,发现还可以直接管理客户端连接。

简单思考一下:我们获取了所有的客户端连接之后可以做什么呢?我们可以对长时间空闲的连接直接关闭;也可以定向关闭某一个ip过来的连接;

下面来看下其是如何获取到所有的客户端连接的。

2.4.1 定义ConnectionPool连接管理池

public class ConnectionPool {

    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2);

    // 实际就是一个Map
    private final Map CONNECTION_MAP = new ConcurrentHashMap();

    ...
}

2.4.2 客户端连接创建时添加到连接池中

public class TokenServerHandler extends ChannelInboundHandlerAdapter {

    private final ConnectionPool globalConnectionPool;

    public TokenServerHandler(ConnectionPool globalConnectionPool) {
        this.globalConnectionPool = globalConnectionPool;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 在这里调用的
        globalConnectionPool.createConnection(ctx.channel());
        String remoteAddress = getRemoteAddress(ctx);
    }
}

// ConnectionPool
public class ConnectionPool {

    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2);

    // 实际就是一个Map
    private final Map CONNECTION_MAP = new ConcurrentHashMap();

    public void createConnection(Channel channel) {
        if (channel != null) {
            // 创建完NettyConnection后,直接保存到Map中
            Connection connection = new NettyConnection(channel, this);
            // 这里的key以客户端的ip:port
            String connKey = getConnectionKey(channel);
            CONNECTION_MAP.put(connKey, connection);
        }
    }
    
    private String getConnectionKey(Channel channel) {
        InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
        String remoteIp = socketAddress.getAddress().getHostAddress();
        int remotePort = socketAddress.getPort();
        return remoteIp + ":" + remotePort;
    }
}

// NettyConnection
public class NettyConnection implements Connection {

    // 基本属性
    private String remoteIp;
    private int remotePort;
    private Channel channel;

    // 这里记录了当前channel最后一次read时间,用于后续 *** 作
    private long lastReadTime;

    private ConnectionPool pool;
}

这里创建NettyConnection对象,封装了Channel的基本信息

2.4.2 ConnectionPool清理长期未read NettyConnection

// TokenServerHandler 请求处理类
public class TokenServerHandler extends ChannelInboundHandlerAdapter {
 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 当发生read调用时,先将connection的 readTime更新下
        globalConnectionPool.refreshLastReadTime(ctx.channel());
     	...   
    }
}

// ConnectionPool
public class ConnectionPool {
    public void refreshLastReadTime(Channel channel) {
        if (channel != null) {
            String connKey = getConnectionKey(channel);
            Connection connection = CONNECTION_MAP.get(connKey);
            if (connection != null) {
                // 更新为当前时间
                connection.refreshLastReadTime(System.currentTimeMillis());
            }
        }
    }
    
    // 定时器清理长时间未read的connection
    private ScheduledFuture scanTaskFuture = null;
    private synchronized void startScan() {
        if (scanTaskFuture == null
                || scanTaskFuture.isCancelled()
                || scanTaskFuture.isDone()) {
            // 启动定时器,最终任务交由ScanIdleConnectionTask执行
            scanTaskFuture = TIMER.scheduleAtFixedRate(
                    new ScanIdleConnectionTask(this), 10, 30, TimeUnit.SECONDS);
        }
    }
}

// ScanIdleConnectionTask
public class ScanIdleConnectionTask implements Runnable {
    // 将ConnectionPool当做入参传递过来
    private final ConnectionPool connectionPool;
    public ScanIdleConnectionTask(ConnectionPool connectionPool) {
        this.connectionPool = connectionPool;
    }

    @Override
    public void run() {
        try {
            int idleSeconds = ClusterServerConfigManager.getIdleSeconds();
            long idleTimeMillis = idleSeconds * 1000;
            if (idleTimeMillis < 0) {
                idleTimeMillis = ServerTransportConfig.DEFAULT_IDLE_SEConDS * 1000;
            }
            long now = System.currentTimeMillis();
            // 获取所有连接,判断当前时间与连接的上次读时间的差值,如果超过我们规定的值,则直接关闭connection
            List connections = connectionPool.listAllConnection();
            for (Connection conn : connections) {
                if ((now - conn.getLastReadTime()) > idleTimeMillis) {
                    RecordLog.info(
                            String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. "
                                    + "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds)
                    );
                    conn.close();
                }
            }
        } catch (Throwable t) {
            RecordLog.warn("[ScanIdleConnectionTask] Failed to clean-up idle tasks", t);
        }
    }
}
总结:

一个简单的NettyServer创建过程,经过框架的升级之后还是有很多不同点的。

多看源码多学习,向大佬们致敬!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5686434.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存