Java网络编程Netty学习笔记2——Netty入门

Java网络编程Netty学习笔记2——Netty入门,第1张

1、概述 1.1、Netty是什么

Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。

1.2、Netty的地位

Netty在Java网络应用框架中的地位就好比Spring框架在JavaEE开发中的地位。以下的框架都使用了Netty,因为它们有网络通信要求:

  • Cassandra - nosql数据库
  • Spark - 大数据分布式计算框架
  • Hadoop - 大数据分布式存储框架
  • RocketMQ - ali开源的消息队列
  • ElasticSearch - 搜索引擎
  • gRPC - rpc框架
  • Dubbo - rpc框架
  • Spring 5.x - flux api完全抛弃了tomcat,使用netty作为服务器端
  • Zookeeper - 分布式协调框架
1.3、Netty的优势

Netty vs NIO ,基于NIO开发工作量大,bug多:

  • 需要自己构建协议;
  • 解决TCP传输问题,如粘包、半包;
  • epoll空轮询导致CPU 100%;
  • 对API进行增强,使之更易用,如FastThreadLocal => ThreadLocal,ByteBuf => ByteBuf。

Netty vs 其他网络应用框架:

  • Mina由apache维护,将来3.x版本可能会有较大重构,破坏API向下兼容性,Netty的开发迭代更迅速,API更简洁、文档更优秀。
  • 久经考验,16年,Netty版本:2.x 2004, 3.x 2008, 4.x 2013, 5.x 已废弃(没用明显的性能提升,维护成本高)。
2、Hello World 2.1、目标

开发一个简单的服务器端和客户端:

  • 客户端向服务器端发送hello,world;
  • 服务器仅接收,不返回。

加入依赖:

pom.xml:

        ...
        
            io.netty
            netty-all
        
        
        ...
2.2、服务器端
package com.clp.netty3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class HelloServer {
    public static void main(String[] args) {
        //1、ServerBootstrap:服务器端启动器,负责组装netty组件,启动服务器
        new ServerBootstrap()
                //2、NioEventLoopGroup(selector, thread),EventLoop中包含了线程和选择器
                //16、由某个EventLoop处理read事件,接收到ByteBuf,然后交给后续的handler
                .group(new NioEventLoopGroup()) //监听 accept read .. 事件
                //3、选择服务器的ServerSocketChannel实现
                .channel(NioServerSocketChannel.class)
                //4、boss:负责处理连接;worker:处理读写,决定了worker(child)能执行哪些 *** 作(handler)
                .childHandler(
                        //5、Channel代表和客户端进行数据读写的通道,Initializer初始化器,负责添加别的handler
                        new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //添加具体的handler
                        ch.pipeline().addLast(new StringDecoder()); //17、将ByteBuf转换为字符串,然后给下一个处理器
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //自定义handler
                            @Override //读事件
                            //18、执行channelRead()方法,打印hello
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                //打印上一步转换好的字符串
                                System.out.println(msg);
                            }
                        });
                    }
                })
                //6、绑定监听端口
                .bind(8080);
    }
}
2.3、客户端
package com.clp.netty3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        //7、创建启动器类
        new Bootstrap()
                //8、添加EventLoop
                .group(new NioEventLoopGroup())
                //9、选择客户端channel实现
                .channel(NioSocketChannel.class)
                //10、添加处理器
                .handler(new ChannelInitializer() {
                    @Override //12、在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder()); //15、将字符串转为ByteBuf类型,然后发送给服务器端
                    }
                })
                //11、连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync() //13、阻塞方法,直到连接建立
                .channel() //代表连接对象
                //14、向服务器发送数据
                .writeAndFlush("hello world"); //发送数据
    }
}
2.4、注意的问题

一开始要树立正确的观念。

  • 把channel理解为数据的通道;
  • 把msg理解为流动的数据,最开始输入的是ByteBuf,但经过pipeline的加工,会变成其它类型对象,最后输出又变成ByteBuf;
  • 把handler理解为数据的处理工序。工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法);handler分为Inbound(入站)和Outbound(出站)两类。
  • 把EventLoop理解为处理数据的工人。工人可以管理多个channel的IO *** 作,并且一旦工人负责了某个channel,就要负责到底(绑定);工人既可以执行IO *** 作,也可以进行任务处理,每位工人有任务队列,队列里可以有多个channel的待处理任务,任务分为普通任务、定时任务;工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人。
3、组件 3.1、EventLoop

1、事件循环对象

EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run()方法处理Channel上源源不断的IO事件。

它的关系比较复杂:

  • 一条线是继承j.u.c.ScheduledExecutorService,因此包含了线程池中所有的方法;
  • 另一条线是继承自netty自己的OrderedEventExecutor。提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop;提供了parent()方法来看看自己属于哪个EventLoopGroup。

2、事件循环组

EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register()方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理(保证了IO事件处理时的线程安全)。

继承自netty自己的EventExecutorGroup:

  • 实现了iterable接口提供的EventLoop的能力;
  • 另有next()方法获取集合中下一个EventLoop。
package com.clp.netty3;

import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

@Slf4j
public class TestEventLoop {
    public static void main(String[] args) {
        //1、创建事件循环
        EventLoopGroup group = new NioEventLoopGroup(2); //处理 io 事件、普通任务、定时任务
//        EventLoopGroup group = new DefaultEventLoop(); //处理 普通任务、定时任务

        //2、获取下一个事件循环对象
        System.out.println(group.next());
        System.out.println(group.next());
        System.out.println(group.next());

        //3、执行普通任务
        group.next().submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("ok");
        });

        //4、执行定时任务
        group.next().scheduleAtFixedRate(() -> {
            log.debug("ok");
        }, 0, 1, TimeUnit.SECONDS); //初始时间为0,间隔时间为1,单位为秒

        log.debug("main");
    }
}
package com.clp.netty3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        //创建一个独立的EventGroup
        EventLoopGroup group = new DefaultEventLoopGroup(); //只能处理普通任务和定时任务
        new ServerBootstrap()
                //boss 和 worker
                //细分1:第一个参数:boss 只负责ServerSocketChannel上的accept事件;第二个参数:2个worker 只负责socketChannel上的读写
                .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioServerSocketChannel ch) throws Exception {
                        ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
                            @Override //ByteBuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                                ctx.fireChannelRead(msg); //将消息传递给下一个handler
                            }
                        }).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                            @Override //ByteBuf
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8080); //绑定端口
    }
}
package com.clp.netty3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //创建启动器类
        Channel channel = new Bootstrap()
                //添加EventLoop
                .group(new NioEventLoopGroup())
                //选择客户端channel实现
                .channel(NioSocketChannel.class)
                //添加处理器
                .handler(new ChannelInitializer() {
                    @Override //12、在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder()); //15、将字符串转为ByteBuf类型,然后发送给服务器端
                    }
                })
                //连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync() //阻塞方法,直到连接建立
                .channel();//代表连接对象
        System.out.println(channel);
        System.out.println("");
    }
}

3、handler执行中如何换人?

3.2、Channel

channel的主要作用:

  • close()可以用来关闭channel;
  • closeFuture()用来处理用来处理channel的关闭。① sync()方法作用是同步等待channel关闭;② 而addListener()方法是异步等待channel关闭。
  • pipeline()方法添加处理器;
  • write()方法将数据写入;
  • writeAndFlush()方法将数据写入并刷出。
3.2.1、ChannelFuture

1、ChannelFuture处理结果

package com.clp.netty3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {
        //创建启动器类
        //带有Future、Promise的类型都是和异步方法配套使用,用来处理结果
        ChannelFuture channelFuture = new Bootstrap()
                //添加EventLoop
                .group(new NioEventLoopGroup())
                //选择客户端channel实现
                .channel(NioSocketChannel.class)
                //添加处理器
                .handler(new ChannelInitializer() {
                    @Override //在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder()); //将字符串转为ByteBuf类型,然后发送给服务器端
                    }
                })
                //连接到服务器
                //异步非阻塞,主线程发起了调用,但是真正执行connect的是nio线程。
                .connect(new InetSocketAddress("localhost", 8080)); //假设1s 后连接成功

//        //方法1:使用sync()方法同步处理结果
//        channelFuture.sync(); //阻塞住当前线程,直到nio线程连接建立完毕
        //若不调用sync()方法,则会无阻塞向下执行,获取channel
//        Channel channel = channelFuture.channel();//代表连接对象
//        System.out.println(channel);

        //方法2:使用addListener(回调对象)方法,异步处理结果(交给nio线程处理)
        channelFuture.addListener(new ChannelFutureListener() {
            //在nio线程连接建立好之后,会调用这个方法
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Channel channel = channelFuture.channel();
                System.out.println(channel);
            }
        });
    }
}

2、ChannelFuture关闭问题

package com.clp.netty3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class CloseFutureClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync(); //等待连接建立
        Channel channel = channelFuture.channel();

        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while(true) {
                String line = scanner.nextLine();
                if("q".equals(line)) {
                    channel.close(); //异步 *** 作,交给其他线程执行关闭 *** 作
//                    log.debug("处理关闭之后的 *** 作"); //不能在这里善后
                    break;
                }
                channel.writeAndFlush(line);
            }
        }, "input").start();

        //获取ClosedFuture对象:1)同步处理关闭;2)异步处理关闭
        ChannelFuture closeFuture = channel.closeFuture();
//        //1、同步处理关闭
//        log.debug("waiting close...");
//        closeFuture.sync(); //当channel调用close()方法之后,sync()才会继续往下执行
//        log.debug("处理关闭之后的 *** 作");
        //2、异步处理关闭
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            //谁(线程)关闭谁调用
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                log.debug("处理关闭之后的 *** 作"); //通道channel关闭后...
                group.shutdownGracefully(); //关闭正在运行的线程
            }
        });

    }
}
3.2.2、为什么要异步

要点:

  • 单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势;
  • 异步并没有缩短响应时间,反而有所增加;
  • 合理进行任务拆分,也是利用异步的关键。
3.3、Future & Promise

在异步处理时,经常用到这两个接口。

首先要说明netty中的Future与jdk中的Future同名,但是是两个接口,netty的Future继承自jdk的Future,而Promise又对netty Future进行了扩展。

  • jdk Future只能同步等待任务结束(或成功、或失败)才能得到结果;
  • netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
  • netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过isSuccess判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addListener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果
3.3.1、JDK Future

3.3.2、

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/726035.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存