Netty 编解码器详解

Netty 编解码器详解,第1张

什么是编解码器?

在网络中都是以字节码的数据形式来传输数据的,如何将其和目标应用程序的自定义消息对象数据格式进行相互转换。这种转换逻辑就需要编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。

一、Netty编解码器

Netty里面的编解码器(入站解码,出站编码):

  • 解码器:将消息从字节或其他序列形式转换成指定的消息对象。即负责处理“入站 InboundHandler”数据。
  • 编码器:将消息对象转换成字节或其他序列形式在网络上传输。即负责“出站 OutboundHandler” 数据。

Netty 的编解码器实现了 ·ChannelHandlerAdapter·,也是一种特殊的 ChannelHandler,所以,依赖于 hannelPipeline,可以将多个编解码器链接在一起,以实现复杂的转换逻辑。

服务器编码数据后发送到客户端,客户端需要对数据进行解码。
由于是双向通信,因此,在服务端和客户端的中均需要添加编解码器。

1、解码器 1.1 解码器

解码器:将消息从字节或其他序列形式转换成指定的消息对象。

解码器负责处理“入站 InboundHandler”数据。 解码器实现了 ChannelInboundHandler,需要将解码器放在 ChannelPipeline中。

Netty中主要提供了两个解码器抽象基类:

  • ByteToMessageDecoder: 用于将字节转为消息,需要检查缓冲区是否有足够的字节
    • ReplayingDecoder: 继承 ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的 ByteBuf都支持。 项目复杂性高则使用 ReplayingDecoder,否则使用ByteToMessageDecoder
  • MessageToMessageDecoder: 用于从一种消息解码为另外一种消息

核心方法: decode()方法是必须实现的唯一抽象方法。

decode(ChannelHandlerContext ctx, ByteBuf msg, List out)

1.2 抽象类ByteToMessageDecoder

ByteToMessageDecoder:将字节解码为消息(或者另一个字节序列)。

由于我们不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,需要我们检查缓冲区是否有足够的字节,直到它准备好处理。

decode()方法被调用时将会传入一个包含了传入数据的 ByteBuf,以及一个用来添加解码消息的 List。
对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该 List,或者该 ByteBuf 中没有更多可读取的字节时为止。然后,如果该 List 不为空,那么它的内容将会被传递给 ChannelPipeline 中的下一个ChannelInboundHandler。

TooLongFrameException:

由于 Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。

为了解除这个常见的顾虑,Netty 提供了 TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出。

为了避免这种情况,你可以设置一个最大字节数的阈值,如果超出该阈值,则会导致抛出一个 TooLongFrameException(随后会被 ChannelHandler.exceptionCaught()方法捕获)。然后,如何处理该异常则完全取决于该解码器的用户。某些协议(如 HTTP)可能允许你返回一个特殊的响应。而在其他的情况下,唯一的选择可能就是关闭对应的连接。

1.3 抽象类MessageToMessageDecoder

MessageToMessageDecoder:将一种消息类型解码为另一种消息。

使用该抽象基类可以使消息在两个消息格式之间进行转换。例如,从 String->Integer,POJO到POJO等。

该抽象基类的完整声明:

public abstract class MessageToMessageDecoder extends ChannelInboundHandlerAdapter

  • MessageToMessageDecoder,T 代表源数据的类型
2、编码器 2.1 编码器

编码器:将消息对象转换成字节或其他序列形式在网络上传输。

编码器负责处理“出站 OutboundHandler” 数据。 编码器实现了 ChannelOutboundHandler,需要将解码器放在 ChannelPipeline中。它和上面的解码器的功能正好相反。

Netty中主要提供了两个编码器抽象基类:

  • MessageToByteEncoder:将消息编码为字节
  • MessageToMessageEncoder,:将消息编码为消息,T 代表源数据的类型

核心方法:encode()方法是你需要实现的唯一抽象方法。

2.2 抽象类MessageToByteEncoder

MessageToByteEncoder:将消息编码为字节

decode()方法被调用时将会传入要被该类编码为 ByteBuf 的出站消息(类型为 I 的)。该 ByteBuf 随后将会被转发给 ChannelPipeline 中的下一个 ChannelOutboundHandler

这个类只有一个方法,而解码器有两个。
原因是解码器通常需要在 Channel 关闭之后产生最后一个消息(因此也就有了 decodeLast()方法。这显然不适用于编码器的场景(在连接被关闭之后仍然产生一个消息是毫无意义的)。

2.3 抽象类MessageToMessageEncoder

MessageToMessageEncoder:将消息编码为消息

每个通过 write()方法写入的消息都将会被传递给 encode()方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给 ChannelPipeline中的下一个 ChannelOutboundHandler。

3、编解码器类

上面将解码器和编码器作为单独的实体了解,但是有时在同一个类中管理入站和出站数据和消息的转换是很有用的。Netty 的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器/编码器对。这些类同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口。


相关的类:

  • 抽象类 ByteToMessageCodec
  • 抽象类 MessageToMessageCodec
3.1 抽象类ByteToMessageCodec

通过使用 ByteToMessageCodec ,我们可以在单个的类中实现将字节转为消息的往返过程。

ByteToMessageCodec 是一个参数化的类,定义如下:

public abstract class ByteToMessageCodec extends ChannelDuplexHandler

3.2 抽象类MessageToMessageCodec

通过使用 MessageToMessageCodec,我们可以在单个的类中实现将消息转为消息的往返过程。

MessageToMessageCodec 是一个参数化的类,定义如下:

public abstract class MessageToMessageCodec extends ChannelDuplexHandler

三、解码器/编码器示例 1、MessageToMessage

示例:将消息转为消息的往返过程。String -> String。通过单独的解码器/编码器实现。

1)解码器

public class StringMessageDecoder extends MessageToMessageDecoder<ByteBuf> {

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
		System.out.println("StringMessageDecoder 消息正在进行解码...");
		//将 ByteBuf转换为 String,传递到下一个handler
		out.add(msg.toString(CharsetUtil.UTF_8));
	}

}

2)编码器

public class StringMessageEncoder extends MessageToMessageEncoder<String> {
	@Override
	protected void encode(ChannelHandlerContext ctx, String msg, List out) throws Exception {
		System.out.println("StringMessageEncoder 消息正在进行编码....");
		//将 String转换为 ByteBuf,传递到下一个handler
		out.add(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
	}
}

3)客户端Handler

public class MyStringClientHandler extends SimpleChannelInboundHandler {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ChannelFuture future = ctx.writeAndFlush("Hello,我是 Netty客户端,发送 String类型数据。");
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("数据发送成功!");
                } else {
                    System.out.println("数据发送失败!");
                }
            }
        });
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        //因为我们使用了 String 编解码器,所以不会是 ByteBuf,字节信息
        System.out.println("Client Accept Server("+ ctx.channel().remoteAddress() +")消息 ->" + msg);
    }

}

4)服务端Handler

public class MyStringServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("MyServerHandler 连接已建立...");
		super.channelActive(ctx);
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//获取客户端发送过来的消息
		System.out.println("Server Accept Client Context ("+ ctx.channel().remoteAddress() +")消息 ->" + msg);

	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		//发送消息给客户端
		//ByteBuf byteBuf = Unpooled.copiedBuffer("Server Receive Client msg", CharsetUtil.UTF_8);
		//ctx.writeAndFlush(byteBuf);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//发生异常,关闭通道
		//cause.printStackTrace();
		ctx.close();
	}
}

5)将解码器/编码器添加到 ChannelPipeline中

因为是双向通信,因此,在服务端和客户端的pipeline中均需要添加编解码器。

	private static class StringChannelInitializerImpl extends ChannelInitializer<SocketChannel> {

		@Override
		protected void initChannel(SocketChannel socketChannel) throws Exception {
			// 添加客户端通道的处理器
			socketChannel.pipeline()
					//添加解码器和编码器
					.addLast("stringMessageDecoder", new StringMessageDecoder())
					.addLast("stringMessageEncoder", new StringMessageEncoder())
					.addLast(new MyStringClientHandler());
		}
	}

6)测试

先启动服务端,再启动客户端,结果如下:

四、编解码器类示例 1、MessageToMessage

示例:将消息转为消息的往返过程。String -> String。通过单个的编解码器类实现。

将上面单独的解码器/编码器实现改造一下搞定。

1)编解码器类

/**
 * String 编码解码器, 出入站相对于服务端和客户端时相对的
* 继承 MessageToMessageCodec 时,可以加泛型
* */
public class StringMessageCodec extends MessageToMessageCodec{ /** * 编码 * @param ctx * @param msg * @param out * @throws Exception */ @Override protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception { System.out.println("StringMessageCodec 消息正在进行编码..."); String str = (String) msg; out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));//传递到下一个handler } /** * 解码 * @param ctx * @param msg * @param out * @throws Exception */ @Override protected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception { System.out.println("StringMessageCodec 正在进行消息解码..."); ByteBuf byteBuf = (ByteBuf) msg; out.add(byteBuf.toString(CharsetUtil.UTF_8));//传递到下一个handler } //@Override //protected void encode(ChannelHandlerContext channelHandlerContext, String msg, List out) throws Exception { // out.add(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));//传递到下一个handler //} // //@Override //protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List out) throws Exception { // out.add(byteBuf.toString(CharsetUtil.UTF_8));//传递到下一个handler //} }

2)将编解码器类添加到 ChannelPipeline中

socketChannel.pipeline()
					//添加解码器和编码器				
					.addLast(new StringMessageCodec())
					.addLast(new MyStringClientHandler());

handler同上,测试结果也同上。

– 求知若饥,虚心若愚。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/874417.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)