Netty EventLoopGroup ServerBootstrap 简单实现服务端与客户端的通信

Netty EventLoopGroup ServerBootstrap 简单实现服务端与客户端的通信,第1张

Netty EventLoopGroup ServerBootstrap 简单实现服务端客户端的通信

Netty 简单实现服务端与客户端的通信

一、服务端与客户端设计

1.服务启动类2.服务端管道处理器3.客户端启动类4.客户端管道处理器5.启动验证

·服务端·客户端 二、示例分析

1.ServerBootstrap的Group方法2.ServerBootstrap的Channel方法

1.反射实现类ReflectiveChannelFactory2.非空校验并赋值channelFactory 3.ServerBootstrap的Option、ChildOption方法

1.ChannelConfig实现类DefaultServerSocketChannelConfig 4.ServerBootstrap的ChildHandler方法,实现类

一、服务端与客户端设计 1.服务启动类
package com.example.netty.server;

import com.example.netty.server.handler.NettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;


public class NettyServer {

    public static void main(String[] args) throws InterruptedException {

        //创建BossGroup 和 WorkerGroup
        
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务端的启动对象 配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用链式编程进行设置
            
            bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    .childHandler(new ChannelInitializer() {
                        //给 PipLine 设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyServerHandler());
                        }
                    });

            System.out.println("Server is Ready");

            //绑定一个端口并且同步 生成了一个 ChannelFuture 对象 启动服务
            ChannelFuture cf = bootstrap.bind(6668).sync();

            //对关闭通道进行侦听
            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
2.服务端管道处理器
package com.example.netty.server.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;


public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {

        System.out.println("Server Read Thread :" + Thread.currentThread().getName());
        System.out.println("Server ctx=" + ctx);

        //将message转成一个ByteBuf
        ByteBuf buf = (ByteBuf) message;

        System.out.println("Client Send Message is:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("Client Ip Address is:" + ctx.channel().remoteAddress());
    }

    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        //将数据写入到缓冲区并刷新
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Client",CharsetUtil.UTF_8));

    }

    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
3.客户端启动类
package com.example.netty.client;

import com.example.netty.client.handler.NettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


public class NettyClient {

    public static void main(String[] args) throws InterruptedException {

        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            //创建客户端启动对象
            Bootstrap bootstrap = new Bootstrap();

            
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("Client OK ...");

            //启动客户端 连接服务端
            ChannelFuture cf = bootstrap.connect("127.0.0.1",6668).sync();

            //关闭通道增加监听
            cf.channel().closeFuture().sync();
        }finally {
            group.shutdownGracefully();
        }
    }
}
4.客户端管道处理器
package com.example.netty.client.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;


public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client :" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server ", CharsetUtil.UTF_8));
    }

    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("Server SendBack Message:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("Server Ip Address :" + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
5.启动验证 ·服务端
Connected to the target VM, address: '127.0.0.1:60934', transport: 'socket'
Java HotSpot(TM) 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
Server is Ready
Server Read Thread :nioEventLoopGroup-3-1
Server ctx=ChannelHandlerContext(NettyServerHandler#0, [id: 0x7748b291, L:/127.0.0.1:6668 - R:/127.0.0.1:60940])
Client Send Message is:Hello Server 
Client Ip Address is:/127.0.0.1:60940
·客户端
Connected to the target VM, address: '127.0.0.1:60937', transport: 'socket'
Java HotSpot(TM) 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
Client OK ...
Client :ChannelHandlerContext(NettyClientHandler#0, [id: 0xb9a0d3d1, L:/127.0.0.1:60940 - R:/127.0.0.1:6668])
Server SendBack Message:Hello Client
Server Ip Address :/127.0.0.1:6668
二、示例分析 1.ServerBootstrap的Group方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
    return this;
}
2.ServerBootstrap的Channel方法
public B channel(Class channelClass) {
    return channelFactory(new ReflectiveChannelFactory(
            ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}
1.反射实现类ReflectiveChannelFactory
package io.netty.channel;

import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.StringUtil;

import java.lang.reflect.Constructor;


public class ReflectiveChannelFactory implements ChannelFactory {

    private final Constructor constructor;

    public ReflectiveChannelFactory(Class clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }

    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(ReflectiveChannelFactory.class) +
                '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)";
    }
}
2.非空校验并赋值channelFactory
public B channelFactory(io.netty.channel.ChannelFactory channelFactory) {
    return channelFactory((ChannelFactory) channelFactory);
}


@Deprecated
public B channelFactory(ChannelFactory channelFactory) {
    ObjectUtil.checkNotNull(channelFactory, "channelFactory");
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return self();
}
3.ServerBootstrap的Option、ChildOption方法
多个客户端如果请求同时到达,则需要一个队列来进行放置,以便服务端依次处理
Option      ---> bossGroup
ChildOption ---> workGroup
1.ChannelConfig实现类DefaultServerSocketChannelConfig
@Override
public  boolean setOption(ChannelOption option, T value) {
    validate(option, value);

    if (option == SO_RCVBUF) {
        setReceiveBufferSize((Integer) value);
    } else if (option == SO_REUSEADDR) {
        setReuseAddress((Boolean) value);
    } else if (option == SO_BACKLOG) {
        setBacklog((Integer) value);
    } else {
        return super.setOption(option, value);
    }

    return true;
}
4.ServerBootstrap的ChildHandler方法,实现类
DefaultChannelPipeline implements ChannelPipeline 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716949.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存