Netty实战之Sentinel框架应用(一)

Netty实战之Sentinel框架应用(一),第1张

Netty实战之Sentinel框架应用(一) 前言:

在学习完了Netty的基本知识后,我们来实战下。

笔者之前有使用过Sentinel的相关技术,其中client与server端的交互就是使用Netty来完成的。

那本文我们就来分析下NettyClient的使用。

笔者分析的Sentinel源码版本为1.7.0

1.定义客户端交互接口

面向接口编程,而不是面向类编程。作为一个框架设计者,需要时刻谨记该原则。

Sentinel也是,在创建客户端连接时,首先提出一个接口。

public interface ClusterTransportClient {

    
    void start() throws Exception;

    
    void stop() throws Exception;

    
    ClusterResponse sendRequest(ClusterRequest request) throws Exception;

    
    boolean isReady();
}

分析下其主要方法

start() 用于启动一个client

stop() 用于关闭一个client

isReady() 用于判断client的状态,若已创建完成连接,则判定为可用态,可以用于发送请求

sendRequest() client发送请求

2.定义基于Netty的实现类

面向接口而不是实现类。如果以后我们需要切换为别的client创建方式(比如Mina),则可以直接基于(Mina)来创建客户端连接。

下面来看下基于Netty的ClusterTransportClient接口实现

2.1 NettyTransportClient的参数
public class NettyTransportClient implements ClusterTransportClient {

    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
            new NamedThreadFactory("sentinel-cluster-transport-client-scheduler"));

    public static final int RECONNECT_DELAY_MS = 2000;

    // 用于连接的host port
    private final String host;
    private final int port;

    // 基于Netty创建完对Server连接后,返回的Channel,可用于后续的请求发送
    private Channel channel;
    
    // Netty自带的
    private NioEventLoopGroup eventLoopGroup;
    
    // 请求处理Handler
    private TokenClientHandler clientHandler;

    private final AtomicInteger idGenerator = new AtomicInteger(0);
    
    // 防止多次创建的状态位
    private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF);
    
    // 失败连接次数
    private final AtomicInteger failConnectedTime = new AtomicInteger(0);

    private final AtomicBoolean shouldRetry = new AtomicBoolean(true);

    // 构造方法中主要就是提供server的host port信息
    public NettyTransportClient(String host, int port) {
        AssertUtil.assertNotBlank(host, "remote host cannot be blank");
        AssertUtil.isTrue(port > 0, "port should be positive");
        this.host = host;
        this.port = port;
    }
    
    ...
}

从当前的构造方法可以看出,host和port都是通过外部调用创建的。其他参数都是比较正常的Netty client创建中所使用到的。

下面来看下start()方法

2.2 NettyTransportClient.start() 启动客户端连接
public class NettyTransportClient implements ClusterTransportClient {
    public void start() throws Exception {
        shouldRetry.set(true);
        startInternal();
    }

    private void startInternal() {
        connect(initClientBootstrap());
    }
    
    // 用于创建Bootstrap
    private Bootstrap initClientBootstrap() {
        Bootstrap b = new Bootstrap();
        eventLoopGroup = new NioEventLoopGroup();
        b.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
            	// 配置底层TCP相关参数
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
                .handler(new ChannelInitializer() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        clientHandler = new TokenClientHandler(currentState, disconnectCallback);

                        // 设置请求执行相关Handler类
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new LengthFieldbasedframeDecoder(1024, 0, 2, 0, 2));
                        pipeline.addLast(new NettyResponseDecoder());
                        pipeline.addLast(new LengthFieldPrepender(2));
                        pipeline.addLast(new NettyRequestEncoder());
                        pipeline.addLast(clientHandler);
                    }
                });

        return b;
    }
    
    // connect方法用于真正创建远程连接
    private void connect(Bootstrap b) {
        // 这里的currentState状态位,避免多次创建连接
        if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) {
            b.connect(host, port)
                	// 添加一个连接监听
                    .addListener(new GenericFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) {
                            if (future.cause() != null) {
                                // 连接失败,则打印日志,记录失败次数
                                RecordLog.warn(
                                        String.format("[NettyTransportClient] Could not connect to <%s:%d> after %d times",
                                                host, port, failConnectedTime.get()), future.cause());
                                failConnectedTime.incrementAndGet();
                                channel = null;
                            } else {
                                failConnectedTime.set(0);
                                channel = future.channel();
                                RecordLog.info(
                                        "[NettyTransportClient] Successfully connect to server <" + host + ":" + port + ">");
                            }
                        }
                    });
        }
    }
}

有关于currentState状态位的使用,我们可以学习下,通过该状态位的使用,可以避免创建多个连接。这样NettyTransportClient对外提供的Channel一直都是同一个。

按照上面的start()方式,那么当连接失败时,客户端就不再重试了嘛?为什么没有看到重试方法呢?

答案是有重试的,只不过不是在当前startInternal()方法,而是启动了一个定时任务,来不停的执行重试。

2.3 创建连接重试
public class NettyTransportClient implements ClusterTransportClient {
	private Runnable disconnectCallback = new Runnable() {
        @Override
        public void run() {
            // start()方法调用之后,shouldRetry=true,所以会直接执行到下面的调度方法
            if (!shouldRetry.get()) {
                return;
            }
            // 通过指定频率来执行重连
            SCHEDULER.schedule(new Runnable() {
                @Override
                public void run() {
                    if (shouldRetry.get()) {
                        RecordLog.info("[NettyTransportClient] Reconnecting to server <" + host + ":" + port + ">");
                        try {
                            startInternal();
                        } catch (Exception e) {
                            RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e);
                        }
                    }
                }
            }, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS);
            cleanUp();
        }
    };
}

disconnectCallback确实实现了连接的重试,那么这个RUNNABLE是什么时候被触发的呢?

全文搜索下,可以看到在Bootstrap创建的时候添加进去的

private Bootstrap initClientBootstrap() {
        Bootstrap b = new Bootstrap();
        eventLoopGroup = new NioEventLoopGroup();
        b.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
                .handler(new ChannelInitializer() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        // 在创建TokenClientHandler的时候被使用到了,那么具体是如何使用的呢?具体见TokenClientHandler
                        clientHandler = new TokenClientHandler(currentState, disconnectCallback);
                        ...
                        pipeline.addLast(clientHandler);
                    }
                });

        return b;
    }

2.3.1 TokenClientHandler

public class TokenClientHandler extends ChannelInboundHandlerAdapter {

    private final AtomicInteger currentState;
    private final Runnable disconnectCallback;

    public TokenClientHandler(AtomicInteger currentState, Runnable disconnectCallback) {
        this.currentState = currentState;
        this.disconnectCallback = disconnectCallback;
    }
    
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        RecordLog.info("[TokenClientHandler] Client channel unregistered, remote address: " + getRemoteAddress(ctx));
        currentState.set(ClientConstants.CLIENT_STATUS_OFF);

        // 在这里被调用到了
        disconnectCallback.run();
    }
}

从这一系列的骚 *** 作,在channel断开时,也就是unregistered方法时,此时就需要进行重连了。

2.4 NettyTransportClient.stop() 关闭客户端连接
public class NettyTransportClient implements ClusterTransportClient {
	public void stop() throws Exception {
        // Stop retrying for connection.
        shouldRetry.set(false);

        // 如果正在连接状态的,则sleep 200ms
        while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) {
            try {
                Thread.sleep(200);
            } catch (Exception ex) {
                // Ignore.
            }
        }

        // 调用如下
        cleanUp();
        failConnectedTime.set(0);

        RecordLog.info("[NettyTransportClient] Cluster transport client stopped");
    }
    
    private void cleanUp() {
        // 直接关闭channel
        if (channel != null) {
            channel.close();
            channel = null;
        }
        if (eventLoopGroup != null) {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

关闭客户端就是常规的方式,多了一个状态的判断。

2.5 NettyTransportClient.sendRequest() 发送请求
public class NettyTransportClient implements ClusterTransportClient {
	@Override
    public ClusterResponse sendRequest(ClusterRequest request) throws Exception {
        if (!isReady()) {
            throw new SentinelClusterException(ClusterErrorMessages.CLIENT_NOT_READY);
        }
        if (!validRequest(request)) {
            throw new SentinelClusterException(ClusterErrorMessages.BAD_REQUEST);
        }
        
        // 获取一个唯一ID,标志当前这个唯一请求
        int xid = getCurrentId();
        try {
            // request设置完id之后,直接调用channel进行发送
            request.setId(xid);
            channel.writeAndFlush(request);

            // 发送请求和接收响应是异步的,通过Future来实现
            ChannelPromise promise = channel.newPromise();
            TokenClientPromiseHolder.putPromise(xid, promise);

            // 在这里等待响应特定时间后,如果还没有获取到结果,则直接抛出异常
            if (!promise.await(ClusterClientConfigManager.getRequestTimeout())) {
                throw new SentinelClusterException(ClusterErrorMessages.REQUEST_TIME_OUT);
            }

            // 这里通过TokenClientPromiseHolder来完成请求响应的异步封装,详细可参考2.5.1
            SimpleEntry entry = TokenClientProseHolder.getEntry(xid);
            if (entry == null || entry.getValue() == null) {
                // Should not go through here.
                throw new SentinelClusterException(ClusterErrorMessages.UNEXPECTED_STATUS);
            }
            return entry.getValue();
        } finally {
            TokenClientPromiseHolder.remove(xid);
        }
    }
}

2.5.1 响应异步封装

笔者觉得这里还是比较重要的知识点,尤其是我们需要对响应进行异步接收的时候。

public final class TokenClientPromiseHolder {

    // 该Map主要用于存放ID对应的channelPromise
    private static final Map> PROMISE_MAP = new ConcurrentHashMap<>();

    // 请求发送结束后,则调用该方法,将id对应的promise存放到map中
    public static void putPromise(int xid, ChannelPromise promise) {
        PROMISE_MAP.put(xid, new SimpleEntry(promise, null));
    }

    public static SimpleEntry getEntry(int xid) {
        return PROMISE_MAP.get(xid);
    }

    public static void remove(int xid) {
        PROMISE_MAP.remove(xid);
    }

    // 当接收到响应时,则调用该方法
    public static  boolean completePromise(int xid, ClusterResponse response) {
        if (!PROMISE_MAP.containsKey(xid)) {
            return false;
        }
        // 根据请求id查找到对应的ChannelPromise
        SimpleEntry entry = PROMISE_MAP.get(xid);
        if (entry != null) {
            ChannelPromise promise = entry.getKey();
            if (promise.isDone() || promise.isCancelled()) {
                return false;
            }
            // 设置promise状态为success,并将值设置到entry中
            entry.setValue(response);
            promise.setSuccess();
            return true;
        }
        return false;
    }

    private TokenClientPromiseHolder() {
    }
}

异步封装响应状态和响应内容,还是很重要的,那么这个completePromise()方法在何时被调用呢?

public class TokenClientHandler extends ChannelInboundHandlerAdapter {
    // 读到server端的响应时
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ClusterResponse) {
            ClusterResponse response = (ClusterResponse) msg;

            if (response.getType() == ClusterConstants.MSG_TYPE_PING) {
                handlePingResponse(ctx, response);
                return;
            }

            // 接收到响应时,则调用该方法,server端也会将该id传回。
            TokenClientPromiseHolder.completePromise(response.getId(), response);
        }
    }
}
总结:

从Sentienl创建客户端连接的过程中,我们还是能学到很多知识点的。

比如连接状态的判断,避免多次创建连接;

重试连接线程的执行,在channelUnregistered()时候调用;

每次请求都设置一个唯一的ID,这样可以异步接收响应;

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5687274.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存