异步io框架netty源码级实战 客户端服务端互发消息 客户端服务端均可主动发消息 架构图+全部代码

异步io框架netty源码级实战 客户端服务端互发消息 客户端服务端均可主动发消息 架构图+全部代码,第1张

异步io框架netty源码级实战 客户端服务端互发消息 客户端服务端均可主动发消息 架构图+全部代码

文章目录
  • 理论部分
  • 总述
  • 架构图+代码
    • 泳道图
    • 时序图
    • 代码
      • pom依赖
      • controller
      • manager
      • nettyhandler
      • po
      • request
      • service
      • utils
    • postman接口测试

理论部分

见本人另一篇博客 基本是纯理论:
https://blog.csdn.net/GBS20200720/article/details/121189122?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164103507016780255295345%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=164103507016780255295345&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2blogfirst_rank_ecpm_v1~rank_v31_ecpm-1-121189122.nonecase&utm_term=netty&spm=1018.2226.3001.4450

总述

在之前netty的理论学习中,看了很多文档、图片,也费了些时间,但是没有代码实 *** ,理论看了确实没什么效果。
试着看了看代码,没一会就劝退了,netty底层的源码确实很繁琐,没个牵头的地方,看起来很累,再加上nio异步通信,本身就很抽象了。对nio的抽象,当然更复杂了;又加上平时上班比较累,所以…一直拖着。
不过最近算是慢慢啃下来了,因为netty确实是搞软件开发,想要精进的话,越不过去的坎。
各种优秀的框架,只要涉及到系统通信的,就绕不开io,提到io,基本绕不开netty了。
不敢说熟悉了netty,但至少能完整地写出一个完全可用的demo,在现在的网上应该是比较少的。毕竟最近啃netty,网上看了很多资料,基本是相互抄的,好一些的,也就是能实现客户端单向发送,能实现服务端主动发送的,没找到。
而且代码也是东一点,西一点,看了其实也不太敢看,都不知道能用不能…
所以自己费了些力气,写了一套完全可用的,贡献在这里,供想学习netty的同学参考。

架构图+代码 泳道图

时序图

此处就不画所有rest的时序图,选比较重要的链路画下:
客户端启动链路:
/netty/client/clientstart:

其他链路都比较简单,就直接上代码吧!
而且亿图图示有点难用qvq 主要试用版,又买不起正式版。大家有什么好用的免费UML图绘制软件可以推荐下呀!

代码 pom依赖

主要是一个netty包,其他的就是springboot-start包和一些工具类的包就可以了

       
            org.apache.commons
            commons-lang3
        
        
            org.projectlombok
            lombok
        
   		
            io.netty
            netty-all
            4.1.20.Final
        
controller

controller层主要有两个,一个客户端,一个服务端
ChatClientController:

package com.chat.controller;

import com.chat.manager.User;
import com.chat.manager.UserManager;
import com.chat.po.UserChannel;
import com.chat.request.ClientStartRequest;
import com.chat.request.ClientStopRequest;
import com.chat.request.SendMsgRequest;
import com.chat.service.ClientService;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;


@Slf4j
@RestController
@RequestMapping("/netty/client")
public class ChatClientController {
    public static final Integer SERVER_PORT = 7000;
    public static final String SERVER_ADDRESS = "127.0.0.1";
    @Autowired
    private ClientService clientService;

    @RequestMapping("/clientstart")
    public String clientStart(@RequestBody ClientStartRequest request) {
        if (request == null || StringUtils.isBlank(request.getNames()) || StringUtils.isBlank(request.getHost())
                || request.getClientPort() == null) {
            return "入参不合法";
        }
        do {
            SocketAddress sa = new InetSocketAddress(request.getHost(), request.getClientPort());
            clientService.clientStart(SERVER_ADDRESS, SERVER_PORT, request.getNames(), sa);
        } while (checkIsActive(request));
        // 没有激活就重复注册
        return "客户端启动成功:" + request.getNames();
    }

    private boolean checkIsActive(ClientStartRequest request) {
        if (request == null || StringUtils.isBlank(request.getNames())) {
            log.info("所检测的用户names为空");
            return false;
        }
        UserChannel userChannel = UserManager.getNameChannelsByUserName(request.getNames());
        if (userChannel == null || userChannel.getChannel() == null) {
            log.info("所检测的用户channel为空");
            return false;
        }
        boolean active = userChannel.getChannel().isActive();
        if (active) {
            // 如果已经激活 就不需要再激活了
            return false;
        }
        // 如果没有激活 则执行一次清空 再去循环激活
        clientClear();
        return true;

    }

    @RequestMapping("/clientclear")
    public String clientClear() {
        Set> entries = UserManager.nameChannels.entrySet();
        //  客户端已死亡
        entries.removeIf(entry -> !entry.getValue().isActive());
        return "已清除死亡的客户端";
    }

    @RequestMapping("/clientstop")
    public String clientStop(@RequestBody ClientStopRequest request) {
        if (request == null || StringUtils.isBlank(request.getName())) {
            return "入参不合法";
        }
        String name = request.getName();
        UserChannel userChannel = UserManager.getNameChannelsByUserName(name);
        if (userChannel == null || userChannel.getChannel() == null) {
            return "所选用户的channel为空";
        }
        Channel channel = userChannel.getChannel();
        channel.close();
        User user = new User();
        user.setUserName(userChannel.getUserName());
        user.setSocketAddress(userChannel.getSocketAddress());
        UserManager.nameChannels.remove(user);
        return "客户端关闭成功:" + name;
    }


    @RequestMapping("/sendmsg")
    public String sendMsg(@RequestBody SendMsgRequest sendMsgRequest) {
        if (sendMsgRequest == null || StringUtils.isBlank(sendMsgRequest.getUserName())) {
            return "入参不合法";
        }
        String userName = sendMsgRequest.getUserName();
        if (StringUtils.isEmpty(userName)) {
            return "userName不能为空";
        }
        UserChannel nameChannelsByUserName = UserManager.getNameChannelsByUserName(userName);
        if (nameChannelsByUserName == null) {
            return "所选用户的channel为空";
        }
        Channel channel = nameChannelsByUserName.getChannel();
        if (channel == null) {
            return "此用户不存在或不在线";
        }
        String words = sendMsgRequest.getWords();
        if (StringUtils.isEmpty(words)) {
            return "说话!";
        }
        channel.writeAndFlush(words);
        return "成功";
    }
}

ChatServerController:

package com.chat.controller;

import com.alibaba.fastjson.JSON;
import com.chat.manager.User;
import com.chat.manager.UserManager;
import com.chat.request.SendToAllRequest;
import com.chat.service.ServerService;
import com.chat.utils.ThreadPoolUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Collection;
import java.util.Map;


@RestController
@RequestMapping("/netty/server")
@Slf4j
public class ChatServerController {

    public static final Integer SERVER_PORT = 7000;
    public static final String SERVER_ADDRESS = "127.0.0.1";

    @Autowired
    private ServerService serverService;


    @RequestMapping("/getchannels")
    public Object getChannels() {
        Map channels = UserManager.nameChannels;
        return JSON.toJSON(channels);
    }

    @RequestMapping("/serverstart")
    public String serverStart() {
        //log.info("服务器开始启动...");
        ThreadPoolUtil.NETTY_POOL.submit(new Runnable() {
            @Override
            public void run() {
                serverService.serverStart(SERVER_PORT);
            }
        });

        return "服务端启动成功";
    }

    @RequestMapping("/sendtoall")
    public String sendToAll(@RequestBody SendToAllRequest request) {
        if (request == null || StringUtils.isBlank(request.getWords())) {
            return "入参不合法";
        }
        Collection values = UserManager.nameServerChannels.values();
        for (ChannelHandlerContext ctx : values) {
            ctx.writeAndFlush("服务端主动发消息咯:" + request.getWords());
        }
        return "消息发送成功";
    }

}

manager

User:

package com.chat.manager;

import lombok.Data;

import java.net.SocketAddress;


// 用户po类
//    name + ip
@Data
public class User {
    private String userName;
    private SocketAddress socketAddress;
}

UserManager:

package com.chat.manager;

import com.chat.po.UserChannel;
import com.chat.po.UserChannelContext;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


//@Component
public class UserManager {
    // 绑定username ip(从客户端channel中获取)和channel
    public static Map nameChannels = new ConcurrentHashMap<>(10);

    // 服务端用的 用于服务端给客户端主动发消息
    public static Map nameServerChannels = new ConcurrentHashMap<>(10);

    // 根据ip+port获取nameServerChannels全部信息
    public static UserChannelContext getNameServerChannelsByAddress(SocketAddress socketAddress) {
        if (socketAddress == null) {
            return null;
        }
        UserChannelContext result = new UserChannelContext();
        Set> entries = nameServerChannels.entrySet();
        for (Map.Entry entry : entries) {
            if (entry.getKey().getSocketAddress().toString().equals(socketAddress.toString())) {
                result.setChannelHandlerContext(entry.getValue());
                result.setUserName(entry.getKey().getUserName());
                result.setSocketAddress(entry.getKey().getSocketAddress());
                return result;
            }
        }
        return null;
    }

    // 根据username获取nameServerChannels全部信息
    public static UserChannelContext getNameServerChannelsByUserName(String userName) {
        if (StringUtils.isBlank(userName)) {
            return null;
        }
        UserChannelContext result = new UserChannelContext();
        Set> entries = nameServerChannels.entrySet();
        for (Map.Entry entry : entries) {
            if (entry.getKey().getUserName().equals(userName)) {
                result.setChannelHandlerContext(entry.getValue());
                result.setUserName(entry.getKey().getUserName());
                result.setSocketAddress(entry.getKey().getSocketAddress());
                return result;
            }
        }
        return null;
    }

    // 根据ip+port获取nameChannels全部信息
    public static UserChannel getNameChannelsByAddress(SocketAddress socketAddress) {
        if (socketAddress == null) {
            return null;
        }
        UserChannel result = new UserChannel();
        Set> entries = nameChannels.entrySet();
        for (Map.Entry entry : entries) {
            if (entry.getKey().getSocketAddress().toString().equals(socketAddress.toString())) {
                result.setChannel(entry.getValue());
                result.setUserName(entry.getKey().getUserName());
                result.setSocketAddress(entry.getKey().getSocketAddress());
                return result;
            }
        }
        return null;
    }

    // 根据username获取nameChannels全部信息
    public static UserChannel getNameChannelsByUserName(String userName) {
        if (StringUtils.isBlank(userName)) {
            return null;
        }
        UserChannel result = new UserChannel();
        Set> entries = nameChannels.entrySet();
        for (Map.Entry entry : entries) {
            if (entry.getKey().getUserName().equals(userName)) {
                result.setChannel(entry.getValue());
                result.setUserName(entry.getKey().getUserName());
                result.setSocketAddress(entry.getKey().getSocketAddress());
                return result;
            }
        }
        return null;
    }
}

nettyhandler

GroupChatClientHandler:

package com.chat.nettyhandler;


import com.chat.manager.User;
import com.chat.manager.UserManager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.SocketAddress;

@Slf4j
public class GroupChatClientHandler extends SimpleChannelInboundHandler {

    private String userName;

    private SocketAddress socketAddress;

    //2
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (ctx == null || ctx.channel() == null ){
            return;
        }
        super.channelRegistered(ctx);
        User user = new User();
        user.setUserName(userName);
        user.setSocketAddress(socketAddress);
        UserManager.nameChannels.putIfAbsent(user, ctx.channel());
    }
    //1
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        super.handlerAdded(ctx);

    }


    public GroupChatClientHandler(String userName, SocketAddress socketAddress){
        this.userName = userName;
        this.socketAddress = socketAddress;
    }

    //从服务器拿到的数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        log.info("来自客户端handler --->  {},收到消息:{}", this.userName, msg);
    }
}

GroupChatServerHandler:

package com.chat.nettyhandler;



import com.chat.manager.User;
import com.chat.manager.UserManager;
import com.chat.po.UserChannel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class GroupChatServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    }

    //断开连接, 将xx客户离开信息推送给当前在线的客户
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

    }

    //表示channel 处于活动状态, 提示 xx上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (ctx == null || ctx.channel() == null){
            return;
        }
        UserChannel userChannel = UserManager.getNameChannelsByAddress(ctx.channel().remoteAddress());
        log.info("来自服务端handler --->  {}已上线", userChannel.getUserName());
        // 上线后把客户端对应的ChannelHandlerContext存入UserManager
        User user = new User();
        user.setUserName(userChannel.getUserName());
        user.setSocketAddress(userChannel.getSocketAddress());
        UserManager.nameServerChannels.putIfAbsent(user, ctx);
    }


    //表示channel 处于不活动状态, 提示 xx离线了
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (ctx == null || ctx.channel() == null){
            return;
        }
        UserChannel userChannel = UserManager.getNameChannelsByAddress(ctx.channel().remoteAddress());
        log.info("来自服务端handler --->  {}已下线", userChannel.getUserName());
    }

    // 服务端收到客户端信息
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (ctx == null || ctx.channel() == null){
            return;
        }
        UserChannel userChannel = UserManager.getNameChannelsByAddress(ctx.channel().remoteAddress());
        // 群聊  服务器接受 不给客户端转发
        log.info("来自服务端handler --->  {} 说:{}", userChannel.getUserName(), msg.toString());
        // 收到客户端消息后给客户端回信息
        ctx.writeAndFlush("我是服务端,我已经收到你的消息了!");
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

    }
}
po

UserChannel:

package com.chat.po;

import io.netty.channel.Channel;
import lombok.Data;

import java.net.SocketAddress;



@Data
public class UserChannel {
    private String userName;
    private SocketAddress socketAddress;
    private Channel channel;
}

UserChannelContext:

package com.chat.po;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.Data;

import java.net.SocketAddress;


@Data
public class UserChannelContext {
    private String userName;
    private SocketAddress socketAddress;
    private ChannelHandlerContext channelHandlerContext;
}

request

ClientStartRequest:

package com.chat.request;

import lombok.Data;

import java.util.List;


@Data
public class ClientStartRequest {
    private String names;
    private Integer clientPort;
    private String host;
}

ClientStopRequest:

package com.chat.request;

import lombok.Data;


@Data
public class ClientStopRequest {
    private String name;
}

SendMsgRequest:

package com.chat.request;

import lombok.Data;


@Data
public class SendMsgRequest {
    private String userName;
    private String userId;
    private String words;
}

SendToAllRequest:

package com.chat.request;

import lombok.Data;


@Data
public class SendToAllRequest {
    private String words;
}

service

ClientServiceImpl:

package com.chat.service.impl;

import com.chat.nettyhandler.GroupChatClientHandler;
import com.chat.service.ClientService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Service;

import java.net.SocketAddress;


@Service
public class ClientServiceImpl implements ClientService {
    @Override
    public String clientStart(String host, int serverPort, String userName, SocketAddress socketAddress) {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap()
                    .localAddress(socketAddress)
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //得到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入相关handler
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());

                            //加入自定义的handler
                            pipeline.addLast(new GroupChatClientHandler(userName, socketAddress));
                            //ChannelHandlerContext chc = new c
                            //pipeline.addLast()

                        }
                    });
            //bootstrap.bind(clientPort);
            bootstrap.connect(host, serverPort).sync();

            return "客户端已启动..."+userName;

        } catch (Exception e){
            return "启动异常..." + e.getMessage();
        }
    }
}

ServerServiceImpl:

package com.chat.service.impl;

import com.chat.nettyhandler.GroupChatServerHandler;
import com.chat.service.ServerService;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Service;


@Service
public class ServerServiceImpl implements ServerService {
    @Override
    public void serverStart(Integer port) {
        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            //获取到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //向pipeline加入解码器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入编码器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的业务处理handler
                            pipeline.addLast(new GroupChatServerHandler());

                        }
                    });
            serverBootstrap.bind(port);


        }catch (Exception e){
            System.out.println("netty服务器启动异常  error:"+e.getMessage());
        }
    }
}

ClientService:

package com.chat.service;

import java.net.SocketAddress;


public interface ClientService {
    String clientStart(String host, int port, String userName, SocketAddress clientPort);
}

ServerService:

package com.chat.service;


public interface ServerService {
    void serverStart(Integer port);
}
utils

ThreadPoolUtil:

package com.chat.utils;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

// 线程池工具
public class ThreadPoolUtil {
    public final static ExecutorService NETTY_POOL = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors() * 5,
            Runtime.getRuntime().availableProcessors() * 20,
            10L,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1024), new ThreadFactoryBuilder()
            .setNameFormat("netty-user-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
}

postman接口测试

这个就不贴了,参数都比较简单的。
正确的顺序是:
客户端发送消息使服务端接受:serverstart -> clientstart -> clientsendmsg
服务端主动发消息给所有在线的客户端:serverstart -> clientstart -> sendtoall
返回所有的在线的客户端:getchannels

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5693029.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存