java 怎么实现http1.1中规定的pipeline的长连接

java 怎么实现http1.1中规定的pipeline的长连接,第1张

目前web上的消息通讯方式主要有以下几种。

轮询,长连接,websocket

轮询:隔一段时间访问服务器,服务器不管有没有新消息都立刻返回。

长连接:页面向服务器发出请求,由服务器决定什么时候返回。(如果有新消息则立刻返回,没有的话就保持连接,直到有新消息才返回)

websocket:类似Java Socket,由Http请求模拟实现的socket。

要实现长连接的关键就是: 由服务器端决定什么时候返回数据。比如在servlet中。

doGet(...){

...

Thread.sleep(30000)

return ...

}

这就是一个长连接的例子,只是没有任何意义而已。

你要做的就是在doGet中阻塞住,

while(!hasNewMsg){

sleep(500)

}

return newMsg...

当然你的ajax超时时间要设置长一点。

如果可以的话,最好可以使用websocket。

服务端

// 设置一个处理客户端消息和各种消息事件的类(Handler)bootstrap.setPipelineFactory(newChannelPipelineFactory() {@OverridepublicChannelPipeline getPipeline()throwsException {returnChannels.pipeline(newObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())),newObjectServerHandler()) }})

客户端

// 设置一个处理服务端消息和各种消息事件的类(Handler)

bootstrap.setPipelineFactory(newChannelPipelineFactory() {@OverridepublicChannelPipeline getPipeline()throwsException {returnChannels.pipeline(newObjectEncoder(),newObjectClientHandler()) }})

要传递对象,自然要有一个被传递模型,一个简单的Pojo,当然,实现序列化接口是必须的。

/** * @author lihzh * @alia OneCoder * @bloghttp://www.coderli.com */public class Command implementsSerializable { privatestaticfinallongserialVersionUID = 7590999461767050471LprivateString actionNamepublicString getActionName() {returnactionName } publicvoidsetActionName(String actionName) {this.actionName = actionName }}

服务端和客户端里,我们自定义的Handler实现如下:

ObjectServerHandler .java

/** * 对象传递服务端代码 * * @author lihzh * @alia OneCoder * @bloghttp://www.coderli.com */public class ObjectServerHandler extendsSimpleChannelHandler { /** * 当接受到消息的时候触发 */@OverridepublicvoidmessageReceived(ChannelHandlerContext ctx, MessageEvent e)throwsException {Command command = (Command) e.getMessage() // 打印看看是不是我们刚才传过来的那个System.out.println(command.getActionName()) }}

ObjectClientHandler .java

/** * 对象传递,客户端代码 * * @author lihzh * @alia OneCoder * @bloghttp://www.coderli.com */public class ObjectClientHandler extendsSimpleChannelHandler { /** * 当绑定到服务端的时候触发,给服务端发消息。 * * @author lihzh * @alia OneCoder */@OverridepublicvoidchannelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {// 向服务端发送Object信息sendObject(e.getChannel()) } /** * 发送Object * * @param channel * @author lihzh * @alia OneCoder */privatevoidsendObject(Channel channel) {Command command =newCommand() command.setActionName("Hello action.") channel.write(command) } }

启动后,服务端正常打印结果:Hello action.

简单梳理一下思路:

通过Netty传递,都需要基于流,以ChannelBuffer的形式传递。所以,Object ->ChannelBuffer.

Netty提供了转换工具,需要我们配置到Handler。

样例从客户端 ->服务端,单向发消息,所以在客户端配置了编码,服务端解码。如果双向收发,则需要全部配置Encoder和Decoder。

这里需要注意,注册到Server的Handler是有顺序的,如果你颠倒一下注册顺序:

bootstrap.setPipelineFactory(newChannelPipelineFactory() {

@OverridepublicChannelPipeline getPipeline()throwsException {returnChannels.pipeline(newObjectServerHandler(),newObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))) }})

结果就是,会先进入我们自己的业务,再进行解码。这自然是不行的,会强转失败。至此,你应该会用Netty传递对象了吧。

pipeline是netty里最重要的几个组件之一,我们从四个部分讲诉pipeline是怎么运作的

1、pipeline增加最开始先加了synchronized防止并发

2、然后检查这个handler是否已经被添加了,如果没有被添加,就添加

如果这个handler实例不是sharable的且已经被添加,就会报错,handler被加上Sharable,就是共享的handler,handler的实例就可以服用了

isSharable方法会使用ThreadLocal缓存已经判断是否共享的handler

添加节点到双向链表

3、创建一个新的context,如果没有传入名称,会生成一个

这里会创建一个DefaultChannelHandlerContext,创建的时候涉及到这个上下文的命名,如果创建的时候没有传入名称,会自动生成一个

这里会先从缓存里面获取是否这个handler已经缓存这个handler的名称,如果没有,就会用这个handler的类名称生成一个,放到缓存里

如果这个名称已经在这个pipeline里的某一个节点使用了,那就会把最后一位的数字加1,继续判断有没有,直到生成的名称还没有被使用

4、执行节点被添加的handlerAdded,这里执行有三种方式:1)如果节点还没有被注册到eventLoop,则创建一个任务后面触发,2)如果当前线程不是executor线程,则在executor线程触发handlerAdded,3)如果当前线程是executor线程,则立即触发handlerAdded

第一种方式,会在handlerRegistered时候触发创建的任务

第二种方式,在executor里执行

第三种方式,直接执行

我们看pipeline的构造函数,会发现创建pipeline的时候会创建一个头部节点一个尾部节点,一个新建的pipeline都是会有这两个节点的,那这两个节点是用来干嘛的呢,让我们分析一下

channel读到数据之后,会调用fireChannelRead让各个节点处理,这里第一个节点是headContext,netty的pipeline执行读取数据的流程是按添加的顺序。比如先后分别添加了ABC三个节点,那么读取数据流转的流程就是headContext->A->B->C->tailContext,如果channelRead的实现是ctx.fireChannelRead,那么就会按照这样的流程流转

写数据的时候,ctx.channel().writeAndFlush和ctx.writeAndFlush会有不同的实现

ctx.channel().writeAndFlush调用的是io.netty.channel.AbstractChannel#writeAndFlush(java.lang.Object),会调用到pipeline的writeAndFlush,然后调用到tail的writeAndFlush,此时调用的顺序是tailContext->C->B->A->headContext

ctx.writeAndFlush会调用io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object)

也就是当前上下文开始执行,如果当前执行到B节点,那么write执行的流程就是B->A->headContext


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/yw/12166146.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-21
下一篇 2023-05-21

发表评论

登录后才能评论

评论列表(0条)

保存