Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。
1.2、Netty的地位Netty在Java网络应用框架中的地位就好比Spring框架在JavaEE开发中的地位。以下的框架都使用了Netty,因为它们有网络通信要求:
- Cassandra - nosql数据库
- Spark - 大数据分布式计算框架
- Hadoop - 大数据分布式存储框架
- RocketMQ - ali开源的消息队列
- ElasticSearch - 搜索引擎
- gRPC - rpc框架
- Dubbo - rpc框架
- Spring 5.x - flux api完全抛弃了tomcat,使用netty作为服务器端
- Zookeeper - 分布式协调框架
Netty vs NIO ,基于NIO开发工作量大,bug多:
- 需要自己构建协议;
- 解决TCP传输问题,如粘包、半包;
- epoll空轮询导致CPU 100%;
- 对API进行增强,使之更易用,如FastThreadLocal => ThreadLocal,ByteBuf => ByteBuf。
Netty vs 其他网络应用框架:
- Mina由apache维护,将来3.x版本可能会有较大重构,破坏API向下兼容性,Netty的开发迭代更迅速,API更简洁、文档更优秀。
- 久经考验,16年,Netty版本:2.x 2004, 3.x 2008, 4.x 2013, 5.x 已废弃(没用明显的性能提升,维护成本高)。
开发一个简单的服务器端和客户端:
- 客户端向服务器端发送hello,world;
- 服务器仅接收,不返回。
加入依赖:
pom.xml:
...
io.netty
netty-all
...
2.2、服务器端
package com.clp.netty3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class HelloServer {
public static void main(String[] args) {
//1、ServerBootstrap:服务器端启动器,负责组装netty组件,启动服务器
new ServerBootstrap()
//2、NioEventLoopGroup(selector, thread),EventLoop中包含了线程和选择器
//16、由某个EventLoop处理read事件,接收到ByteBuf,然后交给后续的handler
.group(new NioEventLoopGroup()) //监听 accept read .. 事件
//3、选择服务器的ServerSocketChannel实现
.channel(NioServerSocketChannel.class)
//4、boss:负责处理连接;worker:处理读写,决定了worker(child)能执行哪些 *** 作(handler)
.childHandler(
//5、Channel代表和客户端进行数据读写的通道,Initializer初始化器,负责添加别的handler
new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//添加具体的handler
ch.pipeline().addLast(new StringDecoder()); //17、将ByteBuf转换为字符串,然后给下一个处理器
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //自定义handler
@Override //读事件
//18、执行channelRead()方法,打印hello
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//打印上一步转换好的字符串
System.out.println(msg);
}
});
}
})
//6、绑定监听端口
.bind(8080);
}
}
2.3、客户端
package com.clp.netty3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
//7、创建启动器类
new Bootstrap()
//8、添加EventLoop
.group(new NioEventLoopGroup())
//9、选择客户端channel实现
.channel(NioSocketChannel.class)
//10、添加处理器
.handler(new ChannelInitializer() {
@Override //12、在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); //15、将字符串转为ByteBuf类型,然后发送给服务器端
}
})
//11、连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync() //13、阻塞方法,直到连接建立
.channel() //代表连接对象
//14、向服务器发送数据
.writeAndFlush("hello world"); //发送数据
}
}
2.4、注意的问题
一开始要树立正确的观念。
- 把channel理解为数据的通道;
- 把msg理解为流动的数据,最开始输入的是ByteBuf,但经过pipeline的加工,会变成其它类型对象,最后输出又变成ByteBuf;
- 把handler理解为数据的处理工序。工序有多道,合在一起就是pipeline,pipeline负责发布事件(读、读取完成)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法);handler分为Inbound(入站)和Outbound(出站)两类。
- 把EventLoop理解为处理数据的工人。工人可以管理多个channel的IO *** 作,并且一旦工人负责了某个channel,就要负责到底(绑定);工人既可以执行IO *** 作,也可以进行任务处理,每位工人有任务队列,队列里可以有多个channel的待处理任务,任务分为普通任务、定时任务;工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每道工序指定不同的工人。
1、事件循环对象
EventLoop本质是一个单线程执行器(同时维护了一个Selector),里面有run()方法处理Channel上源源不断的IO事件。
它的关系比较复杂:
- 一条线是继承j.u.c.ScheduledExecutorService,因此包含了线程池中所有的方法;
- 另一条线是继承自netty自己的OrderedEventExecutor。提供了boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop;提供了parent()方法来看看自己属于哪个EventLoopGroup。
2、事件循环组
EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register()方法来绑定其中一个EventLoop,后续这个Channel上的IO事件都由此EventLoop来处理(保证了IO事件处理时的线程安全)。
继承自netty自己的EventExecutorGroup:
- 实现了iterable接口提供的EventLoop的能力;
- 另有next()方法获取集合中下一个EventLoop。
package com.clp.netty3;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TestEventLoop {
public static void main(String[] args) {
//1、创建事件循环
EventLoopGroup group = new NioEventLoopGroup(2); //处理 io 事件、普通任务、定时任务
// EventLoopGroup group = new DefaultEventLoop(); //处理 普通任务、定时任务
//2、获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
//3、执行普通任务
group.next().submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("ok");
});
//4、执行定时任务
group.next().scheduleAtFixedRate(() -> {
log.debug("ok");
}, 0, 1, TimeUnit.SECONDS); //初始时间为0,间隔时间为1,单位为秒
log.debug("main");
}
}
package com.clp.netty3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
//创建一个独立的EventGroup
EventLoopGroup group = new DefaultEventLoopGroup(); //只能处理普通任务和定时任务
new ServerBootstrap()
//boss 和 worker
//细分1:第一个参数:boss 只负责ServerSocketChannel上的accept事件;第二个参数:2个worker 只负责socketChannel上的读写
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(NioServerSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override //ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); //将消息传递给下一个handler
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
@Override //ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080); //绑定端口
}
}
package com.clp.netty3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
//创建启动器类
Channel channel = new Bootstrap()
//添加EventLoop
.group(new NioEventLoopGroup())
//选择客户端channel实现
.channel(NioSocketChannel.class)
//添加处理器
.handler(new ChannelInitializer() {
@Override //12、在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); //15、将字符串转为ByteBuf类型,然后发送给服务器端
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync() //阻塞方法,直到连接建立
.channel();//代表连接对象
System.out.println(channel);
System.out.println("");
}
}
3、handler执行中如何换人?
3.2、Channelchannel的主要作用:
- close()可以用来关闭channel;
- closeFuture()用来处理用来处理channel的关闭。① sync()方法作用是同步等待channel关闭;② 而addListener()方法是异步等待channel关闭。
- pipeline()方法添加处理器;
- write()方法将数据写入;
- writeAndFlush()方法将数据写入并刷出。
1、ChannelFuture处理结果
package com.clp.netty3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
//创建启动器类
//带有Future、Promise的类型都是和异步方法配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
//添加EventLoop
.group(new NioEventLoopGroup())
//选择客户端channel实现
.channel(NioSocketChannel.class)
//添加处理器
.handler(new ChannelInitializer() {
@Override //在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); //将字符串转为ByteBuf类型,然后发送给服务器端
}
})
//连接到服务器
//异步非阻塞,主线程发起了调用,但是真正执行connect的是nio线程。
.connect(new InetSocketAddress("localhost", 8080)); //假设1s 后连接成功
// //方法1:使用sync()方法同步处理结果
// channelFuture.sync(); //阻塞住当前线程,直到nio线程连接建立完毕
//若不调用sync()方法,则会无阻塞向下执行,获取channel
// Channel channel = channelFuture.channel();//代表连接对象
// System.out.println(channel);
//方法2:使用addListener(回调对象)方法,异步处理结果(交给nio线程处理)
channelFuture.addListener(new ChannelFutureListener() {
//在nio线程连接建立好之后,会调用这个方法
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
System.out.println(channel);
}
});
}
}
2、ChannelFuture关闭问题
package com.clp.netty3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
channelFuture.sync(); //等待连接建立
Channel channel = channelFuture.channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while(true) {
String line = scanner.nextLine();
if("q".equals(line)) {
channel.close(); //异步 *** 作,交给其他线程执行关闭 *** 作
// log.debug("处理关闭之后的 *** 作"); //不能在这里善后
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
//获取ClosedFuture对象:1)同步处理关闭;2)异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
// //1、同步处理关闭
// log.debug("waiting close...");
// closeFuture.sync(); //当channel调用close()方法之后,sync()才会继续往下执行
// log.debug("处理关闭之后的 *** 作");
//2、异步处理关闭
closeFuture.addListener(new ChannelFutureListener() {
@Override
//谁(线程)关闭谁调用
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("处理关闭之后的 *** 作"); //通道channel关闭后...
group.shutdownGracefully(); //关闭正在运行的线程
}
});
}
}
3.2.2、为什么要异步
要点:
- 单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势;
- 异步并没有缩短响应时间,反而有所增加;
- 合理进行任务拆分,也是利用异步的关键。
在异步处理时,经常用到这两个接口。
首先要说明netty中的Future与jdk中的Future同名,但是是两个接口,netty的Future继承自jdk的Future,而Promise又对netty Future进行了扩展。
- jdk Future只能同步等待任务结束(或成功、或失败)才能得到结果;
- netty Future可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
- netty Promise不仅有netty Future的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过isSuccess判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addListener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)