很久没更新博客了,主要是 最近发生的事情太多了。开始学习~
API使用简单,开发门槛低;功能强大,预置了多种编解码功能,支持多种主流协议;定制能力强,可以通过ChannelHandler对通信框架进行灵活地扩展;性能高,通过与其他业界主流的NIO框架对比, Netty的综合性能最优成熟、稳定, Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼;社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会加入经历了大规模的商业应用考验,质量得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它已经完全能够满足不同行业的商业应用了。Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,它已经得到成百上千的商用项目验证,例如Hadoop的RPC框架avro使用Netty作为底层通信框架;很多其他业界主流的RPC框架,也使用Netty来构建高性能的异步通信能力。通过对Netty的分析,我们将它的优点总结如下。
正是因为这些优点, Netty逐渐成为Java NIO编程的首选框架。 解码器介绍(需要了解的) 粘包/拆包
LineBasedFrameDecoder(行解码器)TCP是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
DelimiterBasedFrameDecoder(特殊分隔符解码器)LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有"\n"或者"\r\n",如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。
FixedLengthFrameDecoder(固定长度解码器)DelimiterBasedFrameDecoder用于对使用分隔符结尾的消息进行自动解码
StringDecoder(字符串解码器)FixedLengthFrameDecoder用于对固定长度的消息进行自动解码。
备注StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的handlero LineBasedFrameDecoder + StringDecoder组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。
使用步骤(代码中集合了三种案例) 服务端 EchoServer1.class 准备工作类有了上述解码器,再结合其他的解码器,如字符串解码器等,可以轻松地完成对很多消息的自动解码,而且不再需要考虑TCP粘包/拆包导致的读半包问题,极大地提升了开发效率。应用DelimiterBasedFrameDecoder和FixedLengthFrameDecoder进行开发非常简单,在绝大数情况下,只要将DelimiterBasedFrameDecoder或FixedLengthFrameDecoder添加到对应ChannelPipeline的起始位即可。
jar 包:netty-all-5.0.0.Alpha2.jar
/**
* Netty 准备工作类,负责初始化,及一些参数配置
*/
public class EchoServer1 {
public void bind(int port) throws Exception{
//配置服务端的Nio线程组
//两个线程组,专门用于网络事件的处理 实际上是reactor线程组
//这里创建两个。一个用户服务端接受客户端的连接,一个用于进行SocketChannel的网络赌侠
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
System.out.println(" bind:");
try {
//辅助启动类
ServerBootstrap b = new ServerBootstrap();
//传入进来
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
//将bloglog 设置1024
//① 特殊分隔符 解码器
.option(ChannelOption.SO_BACKLOG,1024)
//② 固定长度解码器
// .option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
//处理网络的IO事件
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ① 特殊分隔符 解码器
// ByteBuf delimiter= Unpooled.copiedBuffer("$_".getBytes("UTF-8"));
// ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));//标识符解码器
//② 固定长度解码器
// ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
//③ 行 解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoServerHandler());//处理
}
});
//绑定端口。同步等待成功
//同步阻塞方法
ChannelFuture f=b.bind(port).sync();
//等待服务端监听端口关闭 /主要用于异步 *** 作的回调通知 ,
f.channel().closeFuture().sync();
}finally {
System.out.println(" 退出:");
//退出
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
EchoServerHandler.class 处理发送 接收消息
/**
* EchoServerHandler 负责 处理发送 接收消息
*/
public class EchoServerHandler extends ChannelHandlerAdapter {
int counter = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接受也在这里
String body = (String) msg;
// 前面已经对消息进行了自动解码。
System.out.println("This is " + ++counter + "tiems recive client:" + body);
//① 特殊符号 解码器
// body += "$_";
// String sendClientMsg = "服務端過來得消息" + "$_";
//② 固定长度
// String sendClientMsg = "a b c d e f g h i j k i t" + "";
//③ 行 解码器
String line = "ASSSSSSS" + System.getProperty("line.separator");//换行
//这里是对数据的处理
ByteBuf echo = Unpooled.copiedBuffer(line.getBytes());
//这个是返回到 原始状态,把消息返还给 客户端。
ctx.writeAndFlush(echo);
System.out.println("发送给客户端:");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
cause.printStackTrace();
ctx.close();
}
}
启动
public class Main {
/**
* 开启
* @param args
*/
public static void main(String[] args) {
int port=8888;
try {
new EchoServer1().bind(port);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
客户端
EchoClient 客户端准备工作类
implementation 'io.netty:netty-all:5.0.0.Alpha2'
/**
* 客户端准备工作
*/
class EchoClient {
fun connet(port: Int, host: String) {
var group: EventLoopGroup = NioEventLoopGroup()
try {
var b: Bootstrap = Bootstrap()
b.group(group).channel(NioSocketChannel::class.java)
.option(ChannelOption.TCP_NODELAY, true)
.handler(LoggingHandler(LogLevel.INFO))
.handler(object : ChannelInitializer<SocketChannel>() {
override fun initChannel(ch: SocketChannel?) {
//①特殊标识 解码器
// var delimiter=Unpooled.copiedBuffer(("$"+"_").toByteArray())
// ch!!.pipeline().addLast(DelimiterBasedFrameDecoder(1024,delimiter))
//②长度 解码器
// ch!!.pipeline().addLast(FixedLengthFrameDecoder(20))
//③ 换行解码器
ch!!.pipeline().addLast(LineBasedFrameDecoder(1024))
ch!!.pipeline().addLast(StringDecoder(Charsets.UTF_8))
ch!!.pipeline().addLast(EchoClientHandler())
}
})
//发起异步连接 *** 作
var f: ChannelFuture = b.connect(host, port).sync()
//等待客户端链路关闭
f.channel().closeFuture().sync()
} finally {
//优雅关闭线程组
group.shutdownGracefully();
}
}
EchoClientHandler 客户端接受发送类
/**
* 客户端接受发送类
*/
class EchoClientHandler : ChannelHandlerAdapter() {
var counter: Int = 0
//① 固定长度 ② 特殊符号
// val ECHO_REQ = " 你好 , 我是客户端。啊$" + "_"
//③ 行 解析
val ECHO_REQ = ("你好 ,我是客户端" + System.getProperty("line.separator")).toByteArray()
override fun channelActive(ctx: ChannelHandlerContext?) {
var msg: ByteBuf
println("channelActive")
for (i in 1..100) {
//发送服务端
//① 固定长度 ② 特殊符号
// ctx!!.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.toByteArray()))
// ③ 行 解析
ctx!!.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ));
}
}
override fun channelRead(ctx: ChannelHandlerContext?, msg: Any?) {
counter++
//读取服务端消息
println("This is " + "$counter" + "times receive server:$msg")
}
override fun channelReadComplete(ctx: ChannelHandlerContext?) {
println("channelReadComplete")
//读取结束
ctx!!.flush()
}
override fun exceptionCaught(ctx: ChannelHandlerContext?, cause: Throwable?) {
cause!!.printStackTrace()
ctx!!.close()
}
}
//启动
startBtn.setOnClickListener {
var port =8888
EchoClient().connet(port,"192.168.1.28")
}
结尾
具体注释都在代码里 ①②③ 分别注释即可。
参考《Netty权威指南》
服务端采用JAVA 客户端 采用kotlin
后续增加 进阶篇
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)