第一章中我们的服务端测试代码中只能注册一个服务,这一章对其进行优化,可以注册多个服务。
服务注册表首先需要一个注册表来存放注册的服务,并且可以返回需要的服务实例。
package com.lany.rpc.registry; public interface RpcRegistry {void register(T service); Object getService(String serviceName); }
对该接口进行默认的实现。
package com.lany.rpc.registry; import com.lany.rpc.enumeration.RpcError; import com.lany.rpc.exception.RpcException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; public class DefaultRpcRegistry implements RpcRegistry { private static final Logger logger = LoggerFactory.getLogger(DefaultRpcRegistry.class); private static MapserviceMap = new ConcurrentHashMap<>(); private static Set registeredService = ConcurrentHashMap.newKeySet(); @Override public void register(T service) { // 获取到实例的名字 String serviceName = service.getClass().getCanonicalName(); // 如果该服务已经注册过了,直接返回,否则继续往下执行 if (registeredService.contains(serviceName)) { return; } registeredService.add(serviceName); // 获取到该服务实现的所有接口 Class>[] interfaces = service.getClass().getInterfaces(); // 处理异常,如果长度为0说明该服务没有实现任何接口 if (interfaces.length == 0) { throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE); } // 将实现的接口名和该服务存入map中,如果一个服务实现了多个接口, // 则这几个接口对应的实现都为该服务。 for (Class> i : interfaces) { serviceMap.put(i.getCanonicalName(), service); } } @Override public Object getService(String serviceName) { Object service = serviceMap.get(serviceName); if (service == null) { throw new RpcException(RpcError.SERVICE_NOT_FOUND); } return service; } }
serviceMap用来存储注册进来的服务信息,registeredService用来存储已经注册过的服务,防止重复注册。因为serviceMap中存储的接口的名字,而不是实现类的名字,所以还需要一个set来记录存储过的实例信息。
同时因为服务端是通过接口调用的,因此这里将一个接口只对应一个实现类。
当通过名字获取实例时,直接将map中的value返回即可。
数据处理上面这些完成之后就需要对RpcServer进行修改,将注册中心与RpcServer进行关联,而不是第一章中的实例与RpcServer关联。
package com.lany.rpc.server; import com.lany.rpc.registry.RpcRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.imageio.spi.ServiceRegistry; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.*; public class RpcServer { private static final Logger logger = LoggerFactory.getLogger(RpcServer.class); private final ExecutorService threadPool; // 核心线程数 private static final int corePoolSize = 5; // 最大线程数 private static final int maximumPoolSize = 50; // 空闲线程的等待时间 private static final long keepAliveTime = 60; BlockingQueueworkingQueue = new ArrayBlockingQueue<>(100); ThreadFactory threadFactory = Executors.defaultThreadFactory(); private final RequestHandler requestHandler = new RequestHandler(); private final RpcRegistry rpcRegistry; public RpcServer(RpcRegistry rpcRegistry) { this.rpcRegistry = rpcRegistry; this.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, workingQueue, threadFactory); } public void start(int port) { try (ServerSocket serverSocket = new ServerSocket(port)) { logger.info("服务器启动..."); Socket socket; while ((socket = serverSocket.accept()) != null) { logger.info("客户端连接!Ip为:{} port为:{}", socket.getInetAddress(), socket.getPort()); threadPool.execute(new RequestHandlerThread(socket, requestHandler, rpcRegistry)); } threadPool.shutdown(); } catch (IOException e) { logger.error("连接时有错误发生:", e); } } }
创建RequestHandler和RequestHandleThread分别用来执行对应的方法并返回结果和创建客户端的连接传输数据。
对应代码如下:
package com.lany.rpc.server; import com.lany.rpc.entity.RpcRequest; import com.lany.rpc.entity.RpcResponse; import com.lany.rpc.registry.RpcRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.imageio.spi.ServiceRegistry; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Socket; public class RequestHandlerThread implements Runnable { private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class); private Socket socket; private RequestHandler requestHandler; private RpcRegistry rpcRegistry; public RequestHandlerThread(Socket socket, RequestHandler requestHandler, RpcRegistry rpcRegistry) { this.socket = socket; this.requestHandler = requestHandler; this.rpcRegistry = rpcRegistry; } @Override public void run() { try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) { RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject(); Object service = rpcRegistry.getService(rpcRequest.getInterfaceName()); // 这里处理数据交给了处理器去执行 Object result = requestHandler.handle(rpcRequest, service); // 将返回的数据封装进RpcResponse中写入到缓冲区中 objectOutputStream.writeObject(result); // 将缓冲区数据输入都流中 objectOutputStream.flush(); } catch (IOException | ClassNotFoundException e) { logger.error("调用或发送时有错误发生:", e); } } }
package com.lany.rpc.server; import com.lany.rpc.entity.RpcRequest; import com.lany.rpc.entity.RpcResponse; import com.lany.rpc.enumeration.ResponseCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; public class RequestHandler { private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class); public Object handle(RpcRequest rpcRequest, Object service) { Object result = null; try { result = invokeTargetMethod(rpcRequest, service); logger.info("服务:{} 成功调用方法:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName()); } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { logger.error("调用或发送时有错误发生:", e); } return result; } private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { Method method = null; try { method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes()); } catch (NoSuchMethodException | SecurityException e) { return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND); } return RpcResponse.success(method.invoke(service, rpcRequest.getParameters())); } }测试运行
客户端的测试代码没有变化,主要就是服务端的测试修改了。
package com.lany.test; import com.lany.rpc.registry.DefaultRpcRegistry; import com.lany.rpc.registry.RpcRegistry; import com.lany.rpc.server.RpcServer; public class TestServer { public static void main(String[] args) { HelloServiceImpl helloService = new HelloServiceImpl(); HelloServiceImpl2 helloService2 = new HelloServiceImpl2(); RpcRegistry rpcRegistry = new DefaultRpcRegistry(); rpcRegistry.register(helloService); // 这里将service2注册后将helloservice的给覆盖了,导致每次都客户端都调用的是helloservice2 rpcRegistry.register(helloService2); RpcServer rpcServer = new RpcServer(rpcRegistry); rpcServer.start(9000); } }
testServer:
[main] INFO com.lany.rpc.server.RpcServer - 服务器启动... [main] INFO com.lany.rpc.server.RpcServer - 客户端连接!Ip为:/127.0.0.1 port为:53812 [pool-1-thread-1] INFO com.lany.test.HelloServiceImpl2 - 接收到2:this is a message [pool-1-thread-1] INFO com.lany.rpc.server.RequestHandler - 服务:com.lany.api.HelloService 成功调用方法:hello
testClient:
这时调用的返回值2,id2=12
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)