【JAVA】如何基于Netty实现简单的RPC 框架

【JAVA】如何基于Netty实现简单的RPC 框架,第1张

如何基于Netty实现简单的RPC 框架

1. 项目模块与依赖

common 模块依赖



    
        myRPC
        com.sgg
        1.0-SNAPSHOT
    
    4.0.0

    common

    
        8
        8
    

    
        
        
            io.netty
            netty-all
        
        
        
            com.alibaba
            fastjson
            1.2.80
        
        
        
            org.projectlombok
            lombok
            1.18.22
        
    

rpc-client模块依赖



    
        myRPC
        com.sgg
        1.0-SNAPSHOT
    
    4.0.0

    rpc-client

    
        8
        8
    

    
        
            com.sgg
            common
            1.0-SNAPSHOT
        
    

rpc-server 模块依赖



    
        myRPC
        com.sgg
        1.0-SNAPSHOT
    
    4.0.0

    rpc-server

    
        8
        8
    

    
        
            com.sgg
            common
            1.0-SNAPSHOT
        

        
        
            org.springframework
            spring-context
        
        
            org.springframework.boot
            spring-boot-autoconfigure
        
    


myRPC



    4.0.0

    com.sgg
    myRPC
    pom
    1.0-SNAPSHOT
    
        common
        rpc-client
        rpc-server
    

    
        8
        8
    

    
        org.springframework.boot
        spring-boot-starter-parent
        2.2.4.RELEASE
    



2. common 通用模块

2.1 RpcRequest
package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest {

    /**
     * 全限定类名
     */
    private String className;


    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */
    private Class[] parameterTypes;


    /**
     * 实参
     */
    private Object[] paramters;
}

2.2 RpcResponse
package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse {

    //返回状态码
    private Integer code;
    //返回结果
    private String result;
    //错误信息
    private String error;

}

2.3 User
package com.sgg.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author sz
 * @DATE 2022/5/6  21:55
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

    private Integer id;
    private String name;

}

2.4 UserService
package com.sgg.service;

import com.sgg.pojo.User;

public interface UserService {
    User getUserById(Integer id);
}

3. rpc-server 服务端模块

3.1 MyServiceRpc
package com.sgg.anno;


import java.lang.annotation.*;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyServiceRpc {
}

3.2 MyServerHandler
package com.sgg.handler;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.springframework.beans.BeansException;
import org.springframework.cglib.reflect.FastClass;
import org.springframework.cglib.reflect.FastMethod;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationTargetException;
import java.util.*;

/**
 * @author sz
 * @DATE 2022/5/6  22:16
 */
@Component
public class MyServerHandler extends SimpleChannelInboundHandler implements ApplicationContextAware {

    private static ApplicationContext app;
    private static HashMap cache = new HashMap<>();

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        app = applicationContext;
        //拿到容器中所有标注了@MyServiceRpc 注解的 bean
        Map beansWithAnnotation = app.getBeansWithAnnotation(MyServiceRpc.class);
        //拿到bean实现的接口的全限定类名
        Set> entries = beansWithAnnotation.entrySet();
        entries.stream().forEach(ent->{
            Class[] interfaces = ent.getValue().getClass().getInterfaces();
            if (null!=interfaces && interfaces.length != 0){
                Arrays.stream(interfaces).forEach(inter->{
                    cache.put(inter.getName(),ent.getValue());
                });
            }
        });
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接 : "+ctx.channel().remoteAddress().toString().substring(1));
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String json) throws Exception {
        //封装结果
        RpcResponse rpcResponse = new RpcResponse();

        Object result = null;
        try {
            //将json字符串转换为RpcRequest 对象
            RpcRequest rpcRequest = JSONObject.parseObject(json, RpcRequest.class);
            //拿到需要调用的类
            String className = rpcRequest.getClassName();
            Object bean = cache.get(className);
            //需要调用的方法名
            String methodName = rpcRequest.getMethodName();
            //方法参数类型
            Class[] parameterTypes = rpcRequest.getParameterTypes();
            //方法实参
            Object[] paramters = rpcRequest.getParamters();

            //反射调用方法
            FastClass fastClass = FastClass.create(bean.getClass());
            FastMethod fastClassMethod = fastClass.getMethod(methodName, parameterTypes);
            result = fastClassMethod.invoke(bean, paramters);
            rpcResponse.setCode(200);
            rpcResponse.setResult((String) result);
        } catch (Exception e) {
            e.printStackTrace();
            rpcResponse.setCode(400);
            rpcResponse.setError(e.getMessage());
        }

        //将结果用json字符串写回去
        channelHandlerContext.writeAndFlush(JSON.toJSONString(rpcResponse));
    }


}

3.3 ServerProvider
package com.sgg.provider;

import com.sgg.handler.MyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Service;

import java.io.Closeable;
import java.io.IOException;
import java.net.ServerSocket;

/**
 * @author sz
 * @DATE 2022/5/6  22:07
 */
@Service
public class ServerProvider implements Closeable {

    private NioEventLoopGroup boss ;
    private NioEventLoopGroup work ;



    public void start(String ip,Integer port)  {
        //创建两个线程组
        boss = new NioEventLoopGroup(1);
        //默认线程数 = CPU数 * 2
        work = new NioEventLoopGroup();

        //创建启动组手
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //解析字符串
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //内容处理
                            pipeline.addLast(new MyServerHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
            System.out.println(">>>>>>>服务器启动成功<<<<<<<<");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            if (null!=boss){
                boss.shutdownGracefully();
            }
            if (null!=boss){
                work.shutdownGracefully();
            }
        }
    }


    @Override
    public void close() throws IOException {
        System.out.println("容器关闭我被调用了");
        if (null!=boss){
            boss.shutdownGracefully();
        }
        if (null!=boss){
            work.shutdownGracefully();
        }
    }
}


3.4 UserServiceImpl
package com.sgg.service.impl;

import com.sgg.anno.MyServiceRpc;
import com.sgg.pojo.User;
import com.sgg.service.UserService;
import org.springframework.stereotype.Service;

import java.util.HashMap;

/**
 * @author sz
 * @DATE 2022/5/6  22:18
 */
@MyServiceRpc
@Service
public class UserServiceImpl implements UserService {

    private static HashMap map = new HashMap();

    static {
        map.put(1,new User(1,"张三"));
        map.put(2,new User(2,"李四"));
    }

    @Override
    public User getUserById(Integer id) {
        return map.get(id);
    }

}

3.5 ServerApp
package com.sgg;

import com.sgg.provider.ServerProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author sz
 * @DATE 2022/5/6  22:04
 */
@SpringBootApplication
public class ServerApp implements CommandLineRunner {

    @Autowired
    private ServerProvider serverProvider;

    public static void main(String[] args) {
        SpringApplication.run(ServerApp.class,args);
    }

    @Override
    public void run(String... args) throws Exception {
        new Thread(()->{
                serverProvider.start("127.0.0.1",9999);
        }).start();
    }
}

4. rpc-client 客户端模块

4.1 RpcClient
package com.sgg.client;

import com.sgg.handler.MyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @author sz
 * @DATE 2022/5/6  22:54
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RpcClient {

    private String ip;
    private Integer port;

    public RpcClient(String ip, Integer port) {
        this.ip = ip;
        this.port = port;
        init();
    }

    private NioEventLoopGroup eventLoopGroup;
    private Channel channel;
    private MyClientHandler myClientHandler = new MyClientHandler();
    private ExecutorService executorService = Executors.newCachedThreadPool();

    public Object sendMess(String message) throws ExecutionException, InterruptedException {
        myClientHandler.setRequestMsg(message);
        Future submit = executorService.submit(myClientHandler);
        return submit.get();
    }

   public void init() {
        //创建线程组
        eventLoopGroup = new NioEventLoopGroup();
        //创建启动组手
        Bootstrap bootstrap = new Bootstrap();
        //分组
        try {
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //业务
                            pipeline.addLast(myClientHandler);
                        }
                    });

            channel = bootstrap.connect(ip, port).sync().channel();
        } catch (Exception e) {
            e.printStackTrace();
            if (null != channel) {
                channel.close();
            }
            if (null != eventLoopGroup) {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }

    public void close() {
        if (null != channel) {
            channel.close();
        }
        if (null != eventLoopGroup) {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

4.2 MyClientHandler
package com.sgg.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.concurrent.Callable;

/**
 * @author sz
 * @DATE 2022/5/6  23:04
 */
public class MyClientHandler extends SimpleChannelInboundHandler implements Callable {

    private String requestMsg;
    private String responseMsg;

    private ChannelHandlerContext context;

    public void setRequestMsg(String str) {
        this.requestMsg = str;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.context = ctx;
    }

    @Override
    protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String str) throws Exception {
        this.responseMsg = str;
        //唤醒
        notify();
    }

    @Override
    public synchronized Object call() throws Exception {
        this.context.writeAndFlush(requestMsg);
        //线程等待  拿到响应数据
        wait();
        return responseMsg;
    }
}

4.3 RpcProxy
package com.sgg.proxy;

import com.alibaba.fastjson.JSON;
import com.sgg.client.RpcClient;
import com.sgg.pojo.RpcRequest;
import com.sgg.pojo.RpcResponse;
import com.sgg.pojo.User;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @author sz
 * @DATE 2022/5/6  22:46
 */
public class RpcProxy {

    public static Object createProxy(Class target) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{target},
                (Object proxy, Method method, Object[] args) -> {

                    RpcRequest rpcRequest = new RpcRequest();
                    //设置类名
                    rpcRequest.setClassName(method.getDeclaringClass().getName());
                    //设置方法名
                    rpcRequest.setMethodName(method.getName());
                    //设置方法参数类型
                    rpcRequest.setParameterTypes(method.getParameterTypes());
                    //设置方法实际参数
                    rpcRequest.setParamters(args);

                    //发送信息,拿到返回值
                    RpcClient rpcClient = new RpcClient("127.0.0.1", 9999);
                    String mess = (String) rpcClient.sendMess(JSON.toJSONString(rpcRequest));
                    //转换为rpcResponse
                    RpcResponse rpcResponse = JSON.parseObject(mess, RpcResponse.class);
                    //拿到返回结果
                    if (200==rpcResponse.getCode()){
                        return JSON.parseObject(rpcResponse.getResult(), User.class);
                    }

                    return null;
                }
        );
    }

}

4.4 ClientApp
package com.sgg;

import com.sgg.pojo.User;
import com.sgg.proxy.RpcProxy;
import com.sgg.service.UserService;

/**
 * @author sz
 * @DATE 2022/5/6  22:44
 */

public class ClientApp {

    public static void main(String[] args) {

        UserService proxy = (UserService) RpcProxy.createProxy(UserService.class);
        User userById = proxy.getUserById(2);
        System.out.println("userById = " + userById);
    }

}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/874017.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存