第一部分首先实现简单的RPC远程通信,流程如下:
客户端调用接口的方法,通过代理将要调用的方法信息传输给服务端服务端通过socket监听,当接收到数据后,就创建一个线程去执行通过客户端传输过来的数据反射找到对应的方法,并执行获取到对应的数据将数据封装进response中返回给客户端客户端收到数据后打印。
因为是简单的实现,因此直接指定了服务端的地址,后续会进行优化完善。
让我们开始吧!
项目的整体模块如下:
myrpc
rpc-api:接口相关的类rpc-common:通用模块,例如服务端和消费端传输的RpcRequestrpc-core:项目的核心模块test-client:客户端相关的类test-server:服务端相关的类 定义接口
首先定义接口,也就客户端调用的接口
package com.lany.api; public interface HelloService { String hello(HelloObject helloObject); }
参数我们用的是对象,因此还需要创建HelloObject,同时需要实现Serializable接口,因为后面传输时需要接口类型,因此需要序列化,没有序列化会报错。
package com.lany.api; import lombok.AllArgsConstructor; import lombok.Data; import java.io.Serializable; @Data @AllArgsConstructor public class HelloObject implements Serializable { private Integer id; private String message; }传输对象
接着需要定义传输对象
定义该对象来让服务端唯一确定一个方法,因此需要的参数有接口的名字、方法的名字、参数列表以及参数的类型。
package com.lany.rpc.entity; import lombok.Builder; import lombok.Data; import java.io.Serializable; @Data @Builder public class RpcRequest implements Serializable { private String interfaceName; private String methodName; private Object[] parameters; private Class>[] paramTypes; }
服务端通过这个对象找到对应的方法后执行将结果封装进RpcResponse中返回给消费因此还需要一个RpcResponse来接收方法执行的结果,返回时成功还是失败。
package com.lany.rpc.entity; import com.lany.rpc.enumeration.ResponseCode; import lombok.Data; import java.io.Serializable; @Data public class RpcResponse动态代理implements Serializable { private Integer statusCode; private String message; private T data; public static RpcResponse success(T data) { RpcResponse response = new RpcResponse<>(); response.setStatusCode(ResponseCode.SUCCESS.getCode()); response.setData(data); return response; } public static RpcResponse fail(ResponseCode code) { RpcResponse response = new RpcResponse<>(); response.setStatusCode(code.getCode()); response.setMessage(code.getMessage()); return response; } }
消费端通过动态代理来将想要调用的数据传输给服务端,让消费端只管调用,而不用管具体的怎么实现,就像调用本地方法一样。这里使用jdk动态代理来实现。
package com.lany.rpc.client; import com.lany.rpc.entity.RpcRequest; import com.lany.rpc.entity.RpcResponse; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; public class RpcClientProxy implements InvocationHandler { private String host; private int port; public RpcClientProxy(String host, int port) { this.host = host; this.port = port; } @SuppressWarnings("unchecked") publicT getProxy(Class clazz) { return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest rpcRequest = RpcRequest.builder() .interfaceName(method.getDeclaringClass().getName()) .methodName(method.getName()) .parameters(args) .paramTypes(method.getParameterTypes()) .build(); RpcClient rpcClient = new RpcClient(); return ((RpcResponse) rpcClient.sendRequest(rpcRequest, host, port)).getData(); } }
封装了一个getProxy()方法,来返回代理对象。在代理对象调用具体的方法的时候调用了invoke()方法来把消费端想要调用的方法数据传输给服务端并接受到服务端传回来的数据。
具体与服务端通信的逻辑在RpcClient类中的sendRequest方法中实现。
package com.lany.rpc.client; import com.lany.rpc.entity.RpcRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.Socket; public class RpcClient { private static final Logger logger = LoggerFactory.getLogger(RpcClient.class); public Object sendRequest(RpcRequest rpcRequest, String host, int port) { try (Socket socket = new Socket(host, port)) { ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); objectOutputStream.writeObject(rpcRequest); objectOutputStream.flush(); return objectInputStream.readObject(); } catch (IOException | ClassNotFoundException e) { logger.error("调用时有错误发生:" + e); return null; } } }反射调用
服务端通过反射进行调用对应的方法
服务端一直监听9000端口,当有请求连接时通过线程池创建线程让其执行通信的逻辑
package com.lany.rpc.server; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.*; public class RpcServer { private final ExecutorService threadPool; private static final Logger logger = LoggerFactory.getLogger(RpcServer.class); public RpcServer() { // 核心线程数 int corePoolSize = 5; // 最大线程数 int maximumPoolSize = 50; // 空闲线程的等待时间 long keepAliveTime = 60; // 阻塞队列 BlockingQueueworkingQueue = new ArrayBlockingQueue<>(100); // 线程工厂 ThreadFactory threadFactory = Executors.defaultThreadFactory(); threadPool = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory); } public void register(Object service, int port) { try (ServerSocket serverSocket = new ServerSocket(port)) { logger.info("服务器正在启动..."); Socket socket; while ((socket = serverSocket.accept()) != null) { logger.info("客户端连接!Ip为:" + socket.getInetAddress()); threadPool.execute(new WorkerThread(socket, service)); } } catch (IOException e) { logger.error("连接时有错误发生:", e); } } }
这里RpcServer咱叔只能注册一个接口,只能对外提供一个接口的调用方法,下一章会进行优化。
测试运行消费端测试代码:
package com.lany.test; import com.lany.api.HelloObject; import com.lany.api.HelloService; import com.lany.rpc.client.RpcClientProxy; public class TestClient { public static void main(String[] args) { RpcClientProxy rpcClientProxy = new RpcClientProxy("127.0.0.1", 9000); HelloService helloService = rpcClientProxy.getProxy(HelloService.class); HelloObject helloObject = new HelloObject(12, "this is a message"); String hello = helloService.hello(helloObject); System.out.println(hello); } }
服务端测试代码:
package com.lany.test; import com.lany.rpc.server.RpcServer; public class TestServer { public static void main(String[] args) { HelloServiceImpl helloService = new HelloServiceImpl(); RpcServer rpcServer = new RpcServer(); rpcServer.register(helloService, 9000); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)