每次关闭服务器时,后台报线程未关闭

每次关闭服务器时,后台报线程未关闭,第1张

(一)handler处理篇
首先,是handler,初次接触netty的朋友要注意,handler不是一个单例即每个channel下都会有自己的一个handler实例
public class ServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = LoggergetLogger(
ServerHandlerclassgetName());
private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
private final ServerChannelGroup serverChannelGroup = ServerChannelGroupnewInstance();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
loggerlog(LevelINFO, "Channel state changed: {0}", e);
}
superhandleUpstream(ctx, e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
Systemoutprintln(this);
String request = (String) egetMessage();
//如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接
//服务器端通过ChannelFutureListenerCLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接
if (requesttoLowerCase()equals("bye")) {
ChannelFuture future = egetChannel()write("bye\r\n");
futureaddListener(ChannelFutureListenerCLOSE);
} else {
//以下是我初步解析客户端发送过来的数据,然后决定处理方式
RecevieData receivedaData = MessageDecoderdecode(request);
if (null != receivedaData) {
//服务器第5版
if (VersionCodeV5equals(receivedaDatagetVersion())) {
//然后判断命令是否存在
for (String s : CommandCodeCOMMANDS) {
if (sequals(receivedaDatagetActionType())) {
COMMAND_FLAGset(true);
if (sequals(CommandCodeKEEP_ALIVE)) {
serverChannelGroupaddChannel(egetChannel());
}
break;
} else {
COMMAND_FLAGset(false);
}
}
if (COMMAND_FLAGget()) {
COMMAND_FLAGset(false);
//将这个命令传递给下一个handler来处理
//这里的"下一个handler"即为用户自己定义的处理handler
ctxsendUpstream(e);
} else {
egetChannel()write(MessageEncoderencode(receivedaData, StatusCodeNOT_FOUND, StatusCodeNOT_FOUND_TEXT));
}
} else {
//版本错误
egetChannel()write(MessageEncoderencode(receivedaData, StatusCodeVERSION_NOT_SUPPORTED, StatusCodeVERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果格式错误,那么直接返回
egetChannel()write(MessageEncoderencode(receivedaData, null, null));
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
loggerlog(LevelWARNING, "Server side Unexpected exception from downstream",
egetCause());
egetChannel()close();
ListenerManagergetListener(ConnectClosedByPeerListenerclass)connectClosedByPeer(egetCause());
}
}
在上面这个handler中,我使用了ctxsendUpstream(e);来处理,个人觉得此处为了要实现执行运行时代码,也可以使用接口等方式但既然netty提供了sendUpstream 的方法,我们用这个岂不是更方便^_^
下面是使用SSL连接的handler
public class ServerSSLHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = LoggergetLogger(
ServerSSLHandlerclassgetName());
private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
private final ServerChannelGroup serverChannelGroup = ServerChannelGroupnewInstance();
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
loggerlog(LevelINFO, "Channel state changed: {0}", e);
}
superhandleUpstream(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
//ssl握手
SslHandler sslHandler = ctxgetPipeline()get(SslHandlerclass);
sslHandlerhandshake();
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
Systemoutprintln(this);
String request = (String) egetMessage();
//如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接
//服务器端通过ChannelFutureListenerCLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接
if (requesttoLowerCase()equals("bye")) {
ChannelFuture future = egetChannel()write("bye\r\n");
futureaddListener(ChannelFutureListenerCLOSE);
} else {
//以下是我初步解析客户端发送过来的数据,然后决定处理方式
RecevieData receivedaData = MessageDecoderdecode(request);
if (null != receivedaData) {
//服务器第5版
if (VersionCodeV5equals(receivedaDatagetVersion())) {
//然后判断命令是否存在
for (String s : CommandCodeCOMMANDS) {
if (sequals(receivedaDatagetActionType())) {
COMMAND_FLAGset(true);
if (sequals(CommandCodeKEEP_ALIVE)) {
serverChannelGroupaddChannel(egetChannel());
}
break;
} else {
COMMAND_FLAGset(false);
}
}
if (COMMAND_FLAGget()) {
COMMAND_FLAGset(false);
//将这个命令传递给下一个handler来处理
//这里的"下一个handler"即为用户自己定义的处理handler
ctxsendUpstream(e);
} else {
egetChannel()write(MessageEncoderencode(receivedaData, StatusCodeNOT_FOUND, StatusCodeNOT_FOUND_TEXT));
}
} else {
//版本错误
egetChannel()write(MessageEncoderencode(receivedaData, StatusCodeVERSION_NOT_SUPPORTED, StatusCodeVERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果格式错误,那么直接返回
egetChannel()write(MessageEncoderencode(receivedaData, null, null));
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
loggerlog(LevelWARNING, "Server side Unexpected exception from downstream",
egetCause());
egetChannel()close();
ListenerManagergetListener(ConnectClosedByPeerListenerclass)connectClosedByPeer(egetCause());
}
}
当我们有了2个handler后,当然就是要把他们添加到我们的Pipeline中
public class ServerPipelineFactory implements
ChannelPipelineFactory {
public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = pipeline();
ServerConfig config = ServerConfiggetInstance();
try {
if (configssl()) {
SSLEngine engine =
SecureSslContextFactorygetServerContext()createSSLEngine();
//说明是服务器端SslContext
enginesetUseClientMode(false);
pipelineaddLast("ssl", new SslHandler(engine));
}
//此Decoder可以自动解析一句以\r\n结束的命令,我为了方便,也用了这个Decoder
//使用这个Decoder,我不用刻意发送命令长度用于解析,只要没有收到\r\n说明数据还
//没有发送完毕这个Decoder会等到收到\r\n后调用下个handler
pipelineaddLast("framer", new DelimiterBasedFrameDecoder(
8192, DelimiterslineDelimiter()));
//字串解码,可以自己设置charset
pipelineaddLast("decoder", new StringDecoder());
//字串编码,可以自己设置charset
pipelineaddLast("encoder", new StringEncoder());
if (configssl()) {
//如果开启了SSL,那么使用sslhandler
pipelineaddLast("sslhandler", new ServerSSLHandler());
} else {
//如果没有开启SSL,那么使用普通handler
pipelineaddLast("handler", new ServerHandler());
}
//遍历配置文件中的服务器handler,将其添加进Pipeline链中
for (Element e : confighandler()) {
pipelineaddLast(eattribute(egetQName("id"))getValue()trim(),
(ChannelHandler) ClassforName(eattribute(egetQName("class"))getValue()trim())newInstance());
}
} catch (DocumentException ex) {
LoggergetLogger(ServerPipelineFactoryclassgetName())log(LevelSEVERE, exgetMessage(), ex);
} catch (InstantiationException ex) {
LoggergetLogger(ServerPipelineFactoryclassgetName())log(LevelSEVERE, exgetMessage(), ex);
} catch (IllegalAccessException ex) {
LoggergetLogger(ServerPipelineFactoryclassgetName())log(LevelSEVERE, exgetMessage(), ex);
} catch (ClassNotFoundException ex) {
LoggergetLogger(ServerPipelineFactoryclassgetName())log(LevelSEVERE, exgetMessage(), ex);
}
return pipeline;
}
}
serverxml,放到lib下即可,注意其中的handler 以及clienthandler 项,如果你新建了自己的handler,那么需要在此xml中配置一下
<xml version="10" encoding="UTF-8">
<root>
<!-- 配置主机地址 -->
<host>127001</host>
<!-- 配置服务端口 -->
<port>8080</port>
<!-- 是否启用ssl,1为启用,0为停用 -->
<ssl>0</ssl>
<!--服务器业务handler -->
<handler id="timeHandler" class="comchinatenetnioserverhandlerServerTimeHandler" />

<!--客户端业务handler -->
<clienthandler id="timeHandler" class="comchinatenetnioclienthandlerClientTimeHandler" />
</root>
到此,一个简单的可扩展handler的服务器雏形就出来了
下面,我们添加一个自定义的服务器处理handler进来
public class ServerTimeHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
RecevieData receivedaData = MessageDecoderdecode((String) egetMessage());
if (CommandCodeGET_TIMEequals(receivedaDatagetActionType())
|| CommandCodeKEEP_ALIVEequals(receivedaDatagetActionType())) {
if (VersionCodeV5equals(receivedaDatagetVersion())) {
//回复客户端后,即可进行自己的业务当然这里可以根据需要,看
//是先回复再处理还是等处理结果出来后,将结果返回客户端
egetChannel()write(MessageEncoderencode(receivedaData, StatusCodeOK,
SystemcurrentTimeMillis() / 1000 + ""));
} else {
//版本错误
egetChannel()write(MessageEncoderencode(receivedaData, StatusCodeVERSION_NOT_SUPPORTED,
StatusCodeVERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果不是此handler处理的命令,那么流下去
ctxsendUpstream(e);
}
}

}
最后测试一下
public class Server {
public static void main(String[] args) throws DocumentException {
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
ExecutorsnewCachedThreadPool(),
ExecutorsnewCachedThreadPool()));
bootstrapsetPipelineFactory(new ServerPipelineFactory());
//因为我要用到长连接
bootstrapsetOption("childtcpNoDelay", true);
bootstrapsetOption("childkeepAlive", true);

ServerConfig config = ServerConfiggetInstance();
bootstrapbind(new InetSocketAddress(IntegervalueOf(configport())));
}
}
总结:在整个服务器编码中,刚开始会遇到"远程主机强迫关闭了一个现有的连接。"等这类错误,最后修改成"相互告知对方我要关闭了"再进行关闭就可以了


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/13479207.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-08-15
下一篇 2023-08-15

发表评论

登录后才能评论

评论列表(0条)

保存