RPC框架实战之手写RPC框架 第二章

RPC框架实战之手写RPC框架 第二章,第1张

RPC框架实战之手写RPC框架 第二章 第二章

第一章中我们的服务端测试代码中只能注册一个服务,这一章对其进行优化,可以注册多个服务。

服务注册表

首先需要一个注册表来存放注册的服务,并且可以返回需要的服务实例。

package com.lany.rpc.registry;

public interface RpcRegistry {

     void register(T service);

    Object getService(String serviceName);
}

对该接口进行默认的实现。

package com.lany.rpc.registry;

import com.lany.rpc.enumeration.RpcError;
import com.lany.rpc.exception.RpcException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;


public class DefaultRpcRegistry implements RpcRegistry {
    private static final Logger logger = LoggerFactory.getLogger(DefaultRpcRegistry.class);

    
    private static Map serviceMap = new ConcurrentHashMap<>();
    
    private static Set registeredService = ConcurrentHashMap.newKeySet();

    
    @Override
    public  void register(T service) {
        // 获取到实例的名字
        String serviceName = service.getClass().getCanonicalName();
        // 如果该服务已经注册过了,直接返回,否则继续往下执行
        if (registeredService.contains(serviceName)) {
            return;
        }
        registeredService.add(serviceName);
        // 获取到该服务实现的所有接口
        Class[] interfaces = service.getClass().getInterfaces();
        // 处理异常,如果长度为0说明该服务没有实现任何接口
        if (interfaces.length == 0) {
            throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
        }
        // 将实现的接口名和该服务存入map中,如果一个服务实现了多个接口,
        // 则这几个接口对应的实现都为该服务。
        for (Class i : interfaces) {
            serviceMap.put(i.getCanonicalName(), service);
        }

    }

    @Override
    public Object getService(String serviceName) {
        Object service = serviceMap.get(serviceName);
        if (service == null) {
            throw new RpcException(RpcError.SERVICE_NOT_FOUND);
        }
        return service;
    }
}

serviceMap用来存储注册进来的服务信息,registeredService用来存储已经注册过的服务,防止重复注册。因为serviceMap中存储的接口的名字,而不是实现类的名字,所以还需要一个set来记录存储过的实例信息。

同时因为服务端是通过接口调用的,因此这里将一个接口只对应一个实现类。

当通过名字获取实例时,直接将map中的value返回即可。

数据处理

上面这些完成之后就需要对RpcServer进行修改,将注册中心与RpcServer进行关联,而不是第一章中的实例与RpcServer关联。

package com.lany.rpc.server;

import com.lany.rpc.registry.RpcRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.imageio.spi.ServiceRegistry;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;


public class RpcServer {

    private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);

    private final ExecutorService threadPool;
    // 核心线程数
    private static final int corePoolSize = 5;
    // 最大线程数
    private static final int maximumPoolSize = 50;
    // 空闲线程的等待时间
    private static final long keepAliveTime = 60;
    
    BlockingQueue workingQueue = new ArrayBlockingQueue<>(100);
    
    ThreadFactory threadFactory = Executors.defaultThreadFactory();

    private final RequestHandler requestHandler = new RequestHandler();
    private final RpcRegistry rpcRegistry;

    
    public RpcServer(RpcRegistry rpcRegistry) {
        this.rpcRegistry = rpcRegistry;
        this.threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
                TimeUnit.SECONDS, workingQueue, threadFactory);
    }

    
    public void start(int port) {
        try (ServerSocket serverSocket = new ServerSocket(port)) {
            logger.info("服务器启动...");
            Socket socket;
            while ((socket = serverSocket.accept()) != null) {
                logger.info("客户端连接!Ip为:{} port为:{}", socket.getInetAddress(), socket.getPort());
                threadPool.execute(new RequestHandlerThread(socket, requestHandler, rpcRegistry));
            }
            threadPool.shutdown();
        } catch (IOException e) {
            logger.error("连接时有错误发生:", e);
        }
    }
}

创建RequestHandler和RequestHandleThread分别用来执行对应的方法并返回结果和创建客户端的连接传输数据。

对应代码如下:

package com.lany.rpc.server;

import com.lany.rpc.entity.RpcRequest;
import com.lany.rpc.entity.RpcResponse;
import com.lany.rpc.registry.RpcRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.imageio.spi.ServiceRegistry;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.Socket;


public class RequestHandlerThread implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(RequestHandlerThread.class);

    private Socket socket;
    private RequestHandler requestHandler;
    private RpcRegistry rpcRegistry;

    public RequestHandlerThread(Socket socket, RequestHandler requestHandler, RpcRegistry rpcRegistry) {
        this.socket = socket;
        this.requestHandler = requestHandler;
        this.rpcRegistry = rpcRegistry;
    }

    @Override
    public void run() {
        try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
            Object service = rpcRegistry.getService(rpcRequest.getInterfaceName());
            // 这里处理数据交给了处理器去执行
            Object result = requestHandler.handle(rpcRequest, service);
            // 将返回的数据封装进RpcResponse中写入到缓冲区中
            objectOutputStream.writeObject(result);
            // 将缓冲区数据输入都流中
            objectOutputStream.flush();
        } catch (IOException | ClassNotFoundException e) {
            logger.error("调用或发送时有错误发生:", e);
        }


    }
}
package com.lany.rpc.server;

import com.lany.rpc.entity.RpcRequest;
import com.lany.rpc.entity.RpcResponse;
import com.lany.rpc.enumeration.ResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;


public class RequestHandler {
    private static final Logger logger = LoggerFactory.getLogger(RequestHandler.class);

    public Object handle(RpcRequest rpcRequest, Object service) {
        Object result = null;
        try {
            result = invokeTargetMethod(rpcRequest, service);
            logger.info("服务:{} 成功调用方法:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
        } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
            logger.error("调用或发送时有错误发生:", e);
        }
        return result;
    }

    
    private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        Method method = null;
        try {
            method = service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamTypes());
        } catch (NoSuchMethodException | SecurityException e) {
            return RpcResponse.fail(ResponseCode.METHOD_NOT_FOUND);
        }
        return RpcResponse.success(method.invoke(service, rpcRequest.getParameters()));
    }
}
测试运行

客户端的测试代码没有变化,主要就是服务端的测试修改了。

package com.lany.test;

import com.lany.rpc.registry.DefaultRpcRegistry;
import com.lany.rpc.registry.RpcRegistry;
import com.lany.rpc.server.RpcServer;


public class TestServer {

    public static void main(String[] args) {
        HelloServiceImpl helloService = new HelloServiceImpl();
        HelloServiceImpl2 helloService2 = new HelloServiceImpl2();
        RpcRegistry rpcRegistry = new DefaultRpcRegistry();
        rpcRegistry.register(helloService);
        // 这里将service2注册后将helloservice的给覆盖了,导致每次都客户端都调用的是helloservice2
        rpcRegistry.register(helloService2);
        RpcServer rpcServer = new RpcServer(rpcRegistry);
        rpcServer.start(9000);

    }
}

testServer:

[main] INFO com.lany.rpc.server.RpcServer - 服务器启动...
[main] INFO com.lany.rpc.server.RpcServer - 客户端连接!Ip为:/127.0.0.1 port为:53812
[pool-1-thread-1] INFO com.lany.test.HelloServiceImpl2 - 接收到2:this is a message
[pool-1-thread-1] INFO com.lany.rpc.server.RequestHandler - 服务:com.lany.api.HelloService 成功调用方法:hello

testClient:

这时调用的返回值2,id2=12

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705433.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存