netty是一个高效的NIO框架,用netty封装的websocket服务器不仅稳定性能也非常优秀,由于netty会用线程绑定连接(通道),在游戏开发中可以利用这种特性进行单用户无锁开发,可以大量的减少对锁的使用,提高游戏的吞吐量,这里对netty进行简单封装,方便使用
git地址
https://gitee.com/ichiva/netty-websocket-server.git介绍
封装netty用于快速创建websocket服务器快速入门
创建FastNettyWebSocket实例,传入消息处理器,并开启监听
public static void main(String[] args) { new FastNettyWebSocketServer(new WebSocket() { @Override public void onMessage(WebSocketSession session, String message) { System.out.println("收到:" + message); send(session,"收到,over"); } }).listener(8080); }实现细节
定义WebSocketServer接口
public interface WebSocketServer { default void onOpen(WebSocketSession session){ } void onMessage(WebSocketSession session, String message); default void onMessage(WebSocketSession session, byte[] message) { } default void send(WebSocketSession session, String message){ session.getChannel().writeAndFlush( new TextWebSocketframe(message) ); } default void send(WebSocketSession session, byte[] message){ session.getChannel().writeAndFlush(new BinaryWebSocketframe( Unpooled.buffer().writeBytes(message) )); } default void onClose(WebSocketSession session) { } default void onError(WebSocketSession session, Throwable e){ } }
定义配置文件
public interface NettyWebsocketServerConfig { default NioEventLoopGroup getWorkerGroup(){ return new NioEventLoopGroup(); } default NioEventLoopGroup getBoosGroup(){ return new NioEventLoopGroup(1); } ChannelHandler getChildHandler(); default int getPort(){ return 8080; } }
WebSocketServer (核心)实现,用于启动netty和关闭netty
public abstract class FastNettyWebSocketServer implements WebSocketServer { private Channel serverChannel; private NettyWebsocketServerConfig config; public void start(NettyWebsocketServerConfig config) { this.config = config; ServerBootstrap server = new ServerBootstrap(); server.group(config.getBoosGroup(), config.getWorkerGroup()); server.channel(NioServerSocketChannel.class); server.childHandler(config.getChildHandler()); ChannelFuture future = server.bind(config.getPort()); future.addListener(f -> { if (f.isDone() && f.isSuccess()) { this.serverChannel = future.channel(); log.info("Start ws server success"); log.info("boos group thread number {}", config.getBoosGroup().executorCount()); log.info("worker group thread number {}", config.getWorkerGroup().executorCount()); } if (f.isDone() && f.cause() != null) { log.error("Start ws server fail throw={}", f.cause().getMessage()); future.channel().close(); } }); } public void start(final int port) { start(new NettyWebsocketServerConfig() { @Override public ChannelHandler getChildHandler() { return new WebSocketChannelInitializer(FastNettyWebSocketServer.this); } @Override public int getPort() { return port; } }); } public void start() { start(8080); } public void stop() { if (serverChannel != null && serverChannel.isOpen()) { final int waitSec = 10; CountDownLatch latch = new CountDownLatch(1); serverChannel.close().addListener(f -> { config.getWorkerGroup().schedule(() -> { log.info("Shutdown dispatcher success..."); config.getWorkerGroup().shutdownGracefully(); latch.countDown(); }, waitSec - 2, TimeUnit.SECONDS); log.info("Close ws server socket success={}", f.isSuccess()); config.getBoosGroup().shutdownGracefully(); }); try { boolean flag = latch.await(waitSec, TimeUnit.SECONDS); if(!flag){ log.warn("Shutdown ws server timeout"); } } catch (InterruptedException e) { log.warn("Shutdown ws server interrupted exception={}", e.getMessage()); } } } }
默认的通道实现
public class WebSocketChannelInitializer extends ChannelInitializer{ private final WebSocketServer server; public WebSocketChannelInitializer(WebSocketServer server){ this.server = server; } @Override protected void initChannel(SocketChannel ch) { //二进制流在通道中被处理 ChannelPipeline pipeline = ch.pipeline(); // HttpRequestDecoder和HttpResponseEncoder的一个组合,针对http协议进行编解码 pipeline.addLast("httpServerCodec", new HttpServerCodec());//设置解码器 //分块向客户端写数据,防止发送大文件时导致内存溢出, channel.write(new ChunkedFile(new File("bigFile.mkv"))) pipeline.addLast(new ChunkedWriteHandler());//用于大数据的分区传输 // 将HttpMessage和HttpContents聚合到一个完成的 FullHttpRequest或FullHttpResponse中 // 具体是FullHttpRequest对象还是FullHttpResponse对象取决于是请求还是响应 // 需要放到HttpServerCodec这个处理器后面 pipeline.addLast(new HttpObjectAggregator(1024 * 2));//聚合器,使用websocket会用到 // webSocket 数据压缩扩展,当添加这个的时候WebSocketServerProtocolHandler的第三个参数需要设置成true pipeline.addLast(new WebSocketServerCompressionHandler()); // 服务器端向外暴露的 web socket 端点,当客户端传递比较大的对象时,maxframeSize参数的值需要调大 pipeline.addLast(new WebSocketServerAuthProtocolHandler("/", null, true, 65536,server)); pipeline.addLast(new LengthFieldPrepender(4)); // 业务代码 pipeline.addLast(new WebSocketServerChannelInboundHandler(server)); } }
提供session支持
public class WebSocketServerChannelInboundHandler extends SimpleChannelInboundHandler
扩展uri支持
public class WebSocketServerAuthProtocolHandler extends WebSocketServerProtocolHandler { private final WebSocketServer webSocketServer; public WebSocketServerAuthProtocolHandler(String websocketPath, WebSocketServer webSocketServer) { this(websocketPath, null, false,webSocketServer); } public WebSocketServerAuthProtocolHandler(String websocketPath, String subprotocols, WebSocketServer webSocketServer) { this(websocketPath, subprotocols, false,webSocketServer); } public WebSocketServerAuthProtocolHandler(String websocketPath, String subprotocols, boolean allowExtensions, WebSocketServer webSocketServer) { this(websocketPath, subprotocols, allowExtensions, 65536,webSocketServer); } public WebSocketServerAuthProtocolHandler(String websocketPath, String subprotocols, boolean allowExtensions, int maxframeSize, WebSocketServer webSocketServer) { this(websocketPath, subprotocols, allowExtensions, maxframeSize, false,webSocketServer); } public WebSocketServerAuthProtocolHandler(String websocketPath, String subprotocols, boolean allowExtensions, int maxframeSize, boolean allowMaskMismatch, WebSocketServer webSocketServer) { super(websocketPath, subprotocols, allowExtensions, maxframeSize, allowMaskMismatch); this._webSocketPathPrefix = websocketPath; this._subprotocols =subprotocols; this._allowExtensions = allowExtensions; this._maxframeSize = maxframeSize; this._allowMaskMismatch = allowMaskMismatch; this.webSocketServer =webSocketServer; } String _webSocketPathPrefix; String _subprotocols; boolean _allowExtensions; int _maxframeSize; boolean _allowMaskMismatch; @Override public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline cp = ctx.pipeline(); if (cp.get(WebSocketServerAuthHandshakeHandler.class) == null) { // Add the WebSocketHandshakeHandler before this one. ctx.pipeline().addBefore(ctx.name(), WebSocketServerAuthHandshakeHandler.class.getName(), new WebSocketServerAuthHandshakeHandler(_webSocketPathPrefix, _subprotocols, _allowExtensions, _maxframeSize, _allowMaskMismatch, webSocketServer)); } if (cp.get(Utf8framevalidator.class) == null) { // Add the UFT8 checking before this one. ctx.pipeline().addBefore(ctx.name(), Utf8framevalidator.class.getName(), new Utf8framevalidator()); } } } public class WebSocketServerAuthHandshakeHandler extends ChannelInboundHandlerAdapter { private final String websocketPath; private final String subprotocols; private final boolean allowExtensions; private final int maxframePayloadSize; private final boolean allowMaskMismatch; private final WebSocketServer webSocketServer; public WebSocketServerAuthHandshakeHandler(String websocketPath, String subprotocols, boolean allowExtensions, int maxframeSize, boolean allowMaskMismatch, WebSocketServer webSocketServer) { this.websocketPath = websocketPath; this.subprotocols = subprotocols; this.allowExtensions = allowExtensions; this.maxframePayloadSize = maxframeSize; this.allowMaskMismatch = allowMaskMismatch; this.webSocketServer = webSocketServer; } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) { FullHttpRequest req = (FullHttpRequest) msg; if (req.uri().indexOf(websocketPath) != 0) { ctx.fireChannelRead(msg); return; } try { if (req.method() != GET) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)); return; } WebSocketSession session = Sessions.createSession(ctx); session.setChannel(ctx.channel()); session.setId(ctx.channel().hashCode()); session.setUri(req.uri()); UrlEntity entity = UrlEntity.parse(req.uri()); session.setUribase(entity.getbaseUrl()); session.setParams(entity.getParams()); webSocketServer.onOpen(session); final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( getWebSocketLocation(ctx.pipeline(), req, websocketPath), subprotocols, allowExtensions, maxframePayloadSize, allowMaskMismatch); final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); handshakeFuture.addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { ctx.fireExceptionCaught(future.cause()); } else { ctx.fireUserEventTriggered( WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); } }); setHandshaker(ctx.channel(), handshaker); ctx.pipeline().replace(this, "WS403Responder", forbiddenHttpRequestResponder()); } } finally { req.release(); } } private static final AttributeKeyHANDSHAKER_ATTR_KEY = AttributeKey.valueOf(WebSocketServerHandshaker.class, "HANDSHAKER"); static void setHandshaker(Channel channel, WebSocketServerHandshaker handshaker) { channel.attr(HANDSHAKER_ATTR_KEY).set(handshaker); } static ChannelHandler forbiddenHttpRequestResponder() { return new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof FullHttpRequest) { ((FullHttpRequest) msg).release(); FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN); ctx.channel().writeAndFlush(response); } else { ctx.fireChannelRead(msg); } } }; } private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) { ChannelFuture f = ctx.channel().writeAndFlush(res); if (!isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) { String protocol = "ws"; if (cp.get(SslHandler.class) != null) { // SSL in use so use Secure WebSockets protocol = "wss"; } return protocol + "://" + req.headers().get(HttpHeaderNames.HOST) + path; } }
git地址
https://gitee.com/ichiva/netty-websocket-server.git
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)