- 理论部分
- 总述
- 架构图+代码
- 泳道图
- 时序图
- 代码
- pom依赖
- controller
- manager
- nettyhandler
- po
- request
- service
- utils
- postman接口测试
见本人另一篇博客 基本是纯理论:
https://blog.csdn.net/GBS20200720/article/details/121189122?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164103507016780255295345%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=164103507016780255295345&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2blogfirst_rank_ecpm_v1~rank_v31_ecpm-1-121189122.nonecase&utm_term=netty&spm=1018.2226.3001.4450
在之前netty的理论学习中,看了很多文档、图片,也费了些时间,但是没有代码实 *** ,理论看了确实没什么效果。
试着看了看代码,没一会就劝退了,netty底层的源码确实很繁琐,没个牵头的地方,看起来很累,再加上nio异步通信,本身就很抽象了。对nio的抽象,当然更复杂了;又加上平时上班比较累,所以…一直拖着。
不过最近算是慢慢啃下来了,因为netty确实是搞软件开发,想要精进的话,越不过去的坎。
各种优秀的框架,只要涉及到系统通信的,就绕不开io,提到io,基本绕不开netty了。
不敢说熟悉了netty,但至少能完整地写出一个完全可用的demo,在现在的网上应该是比较少的。毕竟最近啃netty,网上看了很多资料,基本是相互抄的,好一些的,也就是能实现客户端单向发送,能实现服务端主动发送的,没找到。
而且代码也是东一点,西一点,看了其实也不太敢看,都不知道能用不能…
所以自己费了些力气,写了一套完全可用的,贡献在这里,供想学习netty的同学参考。
此处就不画所有rest的时序图,选比较重要的链路画下:
客户端启动链路:
/netty/client/clientstart:
其他链路都比较简单,就直接上代码吧!
而且亿图图示有点难用qvq 主要试用版,又买不起正式版。大家有什么好用的免费UML图绘制软件可以推荐下呀!
主要是一个netty包,其他的就是springboot-start包和一些工具类的包就可以了
controllerorg.apache.commons commons-lang3org.projectlombok lombokio.netty netty-all4.1.20.Final
controller层主要有两个,一个客户端,一个服务端
ChatClientController:
package com.chat.controller; import com.chat.manager.User; import com.chat.manager.UserManager; import com.chat.po.UserChannel; import com.chat.request.ClientStartRequest; import com.chat.request.ClientStopRequest; import com.chat.request.SendMsgRequest; import com.chat.service.ClientService; import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; import java.util.Set; @Slf4j @RestController @RequestMapping("/netty/client") public class ChatClientController { public static final Integer SERVER_PORT = 7000; public static final String SERVER_ADDRESS = "127.0.0.1"; @Autowired private ClientService clientService; @RequestMapping("/clientstart") public String clientStart(@RequestBody ClientStartRequest request) { if (request == null || StringUtils.isBlank(request.getNames()) || StringUtils.isBlank(request.getHost()) || request.getClientPort() == null) { return "入参不合法"; } do { SocketAddress sa = new InetSocketAddress(request.getHost(), request.getClientPort()); clientService.clientStart(SERVER_ADDRESS, SERVER_PORT, request.getNames(), sa); } while (checkIsActive(request)); // 没有激活就重复注册 return "客户端启动成功:" + request.getNames(); } private boolean checkIsActive(ClientStartRequest request) { if (request == null || StringUtils.isBlank(request.getNames())) { log.info("所检测的用户names为空"); return false; } UserChannel userChannel = UserManager.getNameChannelsByUserName(request.getNames()); if (userChannel == null || userChannel.getChannel() == null) { log.info("所检测的用户channel为空"); return false; } boolean active = userChannel.getChannel().isActive(); if (active) { // 如果已经激活 就不需要再激活了 return false; } // 如果没有激活 则执行一次清空 再去循环激活 clientClear(); return true; } @RequestMapping("/clientclear") public String clientClear() { Set> entries = UserManager.nameChannels.entrySet(); // 客户端已死亡 entries.removeIf(entry -> !entry.getValue().isActive()); return "已清除死亡的客户端"; } @RequestMapping("/clientstop") public String clientStop(@RequestBody ClientStopRequest request) { if (request == null || StringUtils.isBlank(request.getName())) { return "入参不合法"; } String name = request.getName(); UserChannel userChannel = UserManager.getNameChannelsByUserName(name); if (userChannel == null || userChannel.getChannel() == null) { return "所选用户的channel为空"; } Channel channel = userChannel.getChannel(); channel.close(); User user = new User(); user.setUserName(userChannel.getUserName()); user.setSocketAddress(userChannel.getSocketAddress()); UserManager.nameChannels.remove(user); return "客户端关闭成功:" + name; } @RequestMapping("/sendmsg") public String sendMsg(@RequestBody SendMsgRequest sendMsgRequest) { if (sendMsgRequest == null || StringUtils.isBlank(sendMsgRequest.getUserName())) { return "入参不合法"; } String userName = sendMsgRequest.getUserName(); if (StringUtils.isEmpty(userName)) { return "userName不能为空"; } UserChannel nameChannelsByUserName = UserManager.getNameChannelsByUserName(userName); if (nameChannelsByUserName == null) { return "所选用户的channel为空"; } Channel channel = nameChannelsByUserName.getChannel(); if (channel == null) { return "此用户不存在或不在线"; } String words = sendMsgRequest.getWords(); if (StringUtils.isEmpty(words)) { return "说话!"; } channel.writeAndFlush(words); return "成功"; } }
ChatServerController:
package com.chat.controller; import com.alibaba.fastjson.JSON; import com.chat.manager.User; import com.chat.manager.UserManager; import com.chat.request.SendToAllRequest; import com.chat.service.ServerService; import com.chat.utils.ThreadPoolUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Collection; import java.util.Map; @RestController @RequestMapping("/netty/server") @Slf4j public class ChatServerController { public static final Integer SERVER_PORT = 7000; public static final String SERVER_ADDRESS = "127.0.0.1"; @Autowired private ServerService serverService; @RequestMapping("/getchannels") public Object getChannels() { Mapmanagerchannels = UserManager.nameChannels; return JSON.toJSON(channels); } @RequestMapping("/serverstart") public String serverStart() { //log.info("服务器开始启动..."); ThreadPoolUtil.NETTY_POOL.submit(new Runnable() { @Override public void run() { serverService.serverStart(SERVER_PORT); } }); return "服务端启动成功"; } @RequestMapping("/sendtoall") public String sendToAll(@RequestBody SendToAllRequest request) { if (request == null || StringUtils.isBlank(request.getWords())) { return "入参不合法"; } Collection values = UserManager.nameServerChannels.values(); for (ChannelHandlerContext ctx : values) { ctx.writeAndFlush("服务端主动发消息咯:" + request.getWords()); } return "消息发送成功"; } }
User:
package com.chat.manager; import lombok.Data; import java.net.SocketAddress; // 用户po类 // name + ip @Data public class User { private String userName; private SocketAddress socketAddress; }
UserManager:
package com.chat.manager; import com.chat.po.UserChannel; import com.chat.po.UserChannelContext; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import java.net.SocketAddress; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; //@Component public class UserManager { // 绑定username ip(从客户端channel中获取)和channel public static MapnettyhandlernameChannels = new ConcurrentHashMap<>(10); // 服务端用的 用于服务端给客户端主动发消息 public static Map nameServerChannels = new ConcurrentHashMap<>(10); // 根据ip+port获取nameServerChannels全部信息 public static UserChannelContext getNameServerChannelsByAddress(SocketAddress socketAddress) { if (socketAddress == null) { return null; } UserChannelContext result = new UserChannelContext(); Set > entries = nameServerChannels.entrySet(); for (Map.Entry entry : entries) { if (entry.getKey().getSocketAddress().toString().equals(socketAddress.toString())) { result.setChannelHandlerContext(entry.getValue()); result.setUserName(entry.getKey().getUserName()); result.setSocketAddress(entry.getKey().getSocketAddress()); return result; } } return null; } // 根据username获取nameServerChannels全部信息 public static UserChannelContext getNameServerChannelsByUserName(String userName) { if (StringUtils.isBlank(userName)) { return null; } UserChannelContext result = new UserChannelContext(); Set > entries = nameServerChannels.entrySet(); for (Map.Entry entry : entries) { if (entry.getKey().getUserName().equals(userName)) { result.setChannelHandlerContext(entry.getValue()); result.setUserName(entry.getKey().getUserName()); result.setSocketAddress(entry.getKey().getSocketAddress()); return result; } } return null; } // 根据ip+port获取nameChannels全部信息 public static UserChannel getNameChannelsByAddress(SocketAddress socketAddress) { if (socketAddress == null) { return null; } UserChannel result = new UserChannel(); Set > entries = nameChannels.entrySet(); for (Map.Entry entry : entries) { if (entry.getKey().getSocketAddress().toString().equals(socketAddress.toString())) { result.setChannel(entry.getValue()); result.setUserName(entry.getKey().getUserName()); result.setSocketAddress(entry.getKey().getSocketAddress()); return result; } } return null; } // 根据username获取nameChannels全部信息 public static UserChannel getNameChannelsByUserName(String userName) { if (StringUtils.isBlank(userName)) { return null; } UserChannel result = new UserChannel(); Set > entries = nameChannels.entrySet(); for (Map.Entry entry : entries) { if (entry.getKey().getUserName().equals(userName)) { result.setChannel(entry.getValue()); result.setUserName(entry.getKey().getUserName()); result.setSocketAddress(entry.getKey().getSocketAddress()); return result; } } return null; } }
GroupChatClientHandler:
package com.chat.nettyhandler; import com.chat.manager.User; import com.chat.manager.UserManager; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import java.net.SocketAddress; @Slf4j public class GroupChatClientHandler extends SimpleChannelInboundHandler{ private String userName; private SocketAddress socketAddress; //2 @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (ctx == null || ctx.channel() == null ){ return; } super.channelRegistered(ctx); User user = new User(); user.setUserName(userName); user.setSocketAddress(socketAddress); UserManager.nameChannels.putIfAbsent(user, ctx.channel()); } //1 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { super.handlerAdded(ctx); } public GroupChatClientHandler(String userName, SocketAddress socketAddress){ this.userName = userName; this.socketAddress = socketAddress; } //从服务器拿到的数据 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { log.info("来自客户端handler ---> {},收到消息:{}", this.userName, msg); } }
GroupChatServerHandler:
package com.chat.nettyhandler; import com.chat.manager.User; import com.chat.manager.UserManager; import com.chat.po.UserChannel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; @Slf4j public class GroupChatServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { super.channelWritabilityChanged(ctx); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } //断开连接, 将xx客户离开信息推送给当前在线的客户 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } //表示channel 处于活动状态, 提示 xx上线 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (ctx == null || ctx.channel() == null){ return; } UserChannel userChannel = UserManager.getNameChannelsByAddress(ctx.channel().remoteAddress()); log.info("来自服务端handler ---> {}已上线", userChannel.getUserName()); // 上线后把客户端对应的ChannelHandlerContext存入UserManager User user = new User(); user.setUserName(userChannel.getUserName()); user.setSocketAddress(userChannel.getSocketAddress()); UserManager.nameServerChannels.putIfAbsent(user, ctx); } //表示channel 处于不活动状态, 提示 xx离线了 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (ctx == null || ctx.channel() == null){ return; } UserChannel userChannel = UserManager.getNameChannelsByAddress(ctx.channel().remoteAddress()); log.info("来自服务端handler ---> {}已下线", userChannel.getUserName()); } // 服务端收到客户端信息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (ctx == null || ctx.channel() == null){ return; } UserChannel userChannel = UserManager.getNameChannelsByAddress(ctx.channel().remoteAddress()); // 群聊 服务器接受 不给客户端转发 log.info("来自服务端handler ---> {} 说:{}", userChannel.getUserName(), msg.toString()); // 收到客户端消息后给客户端回信息 ctx.writeAndFlush("我是服务端,我已经收到你的消息了!"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { } }po
UserChannel:
package com.chat.po; import io.netty.channel.Channel; import lombok.Data; import java.net.SocketAddress; @Data public class UserChannel { private String userName; private SocketAddress socketAddress; private Channel channel; }
UserChannelContext:
package com.chat.po; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import lombok.Data; import java.net.SocketAddress; @Data public class UserChannelContext { private String userName; private SocketAddress socketAddress; private ChannelHandlerContext channelHandlerContext; }request
ClientStartRequest:
package com.chat.request; import lombok.Data; import java.util.List; @Data public class ClientStartRequest { private String names; private Integer clientPort; private String host; }
ClientStopRequest:
package com.chat.request; import lombok.Data; @Data public class ClientStopRequest { private String name; }
SendMsgRequest:
package com.chat.request; import lombok.Data; @Data public class SendMsgRequest { private String userName; private String userId; private String words; }
SendToAllRequest:
package com.chat.request; import lombok.Data; @Data public class SendToAllRequest { private String words; }service
ClientServiceImpl:
package com.chat.service.impl; import com.chat.nettyhandler.GroupChatClientHandler; import com.chat.service.ClientService; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import org.springframework.stereotype.Service; import java.net.SocketAddress; @Service public class ClientServiceImpl implements ClientService { @Override public String clientStart(String host, int serverPort, String userName, SocketAddress socketAddress) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .localAddress(socketAddress) .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { //得到pipeline ChannelPipeline pipeline = ch.pipeline(); //加入相关handler pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //加入自定义的handler pipeline.addLast(new GroupChatClientHandler(userName, socketAddress)); //ChannelHandlerContext chc = new c //pipeline.addLast() } }); //bootstrap.bind(clientPort); bootstrap.connect(host, serverPort).sync(); return "客户端已启动..."+userName; } catch (Exception e){ return "启动异常..." + e.getMessage(); } } }
ServerServiceImpl:
package com.chat.service.impl; import com.chat.nettyhandler.GroupChatServerHandler; import com.chat.service.ServerService; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import org.springframework.stereotype.Service; @Service public class ServerServiceImpl implements ServerService { @Override public void serverStart(Integer port) { //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { //获取到pipeline ChannelPipeline pipeline = ch.pipeline(); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的业务处理handler pipeline.addLast(new GroupChatServerHandler()); } }); serverBootstrap.bind(port); }catch (Exception e){ System.out.println("netty服务器启动异常 error:"+e.getMessage()); } } }
ClientService:
package com.chat.service; import java.net.SocketAddress; public interface ClientService { String clientStart(String host, int port, String userName, SocketAddress clientPort); }
ServerService:
package com.chat.service; public interface ServerService { void serverStart(Integer port); }utils
ThreadPoolUtil:
package com.chat.utils; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.ThreadFactoryBuilder; // 线程池工具 public class ThreadPoolUtil { public final static ExecutorService NETTY_POOL = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors() * 5, Runtime.getRuntime().availableProcessors() * 20, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), new ThreadFactoryBuilder() .setNameFormat("netty-user-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); }postman接口测试
这个就不贴了,参数都比较简单的。
正确的顺序是:
客户端发送消息使服务端接受:serverstart -> clientstart -> clientsendmsg
服务端主动发消息给所有在线的客户端:serverstart -> clientstart -> sendtoall
返回所有的在线的客户端:getchannels
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)