flink网络通信剖析(组件rpc通信)并基于akka实现rpc框架

flink网络通信剖析(组件rpc通信)并基于akka实现rpc框架,第1张

flink网络通信剖析(组件rpc通信)并基于akka实现rpc框架 背景

最近阅读了flink 1.10版本网络通信部分的相关源码,网络部分主要有两块:

  1. 组件之间的rpc调用(akka)
  2. taskManager之间的数据传输(netty实现)

本文主要解析flink底层是如何使用akka封装的rpc,并模仿flink的设计方式,基于akka实现一个简单的rpc通信框架。

组件RPC通信架构

flink底层的rpc调用主要分为RpcEndPoint、RpcService、RpcServer、GateWay三个模块。

  1. RpcEndPoint:每个组件的基本实现,主要包含endpointId用于唯一标记当前的rpc节点(既actorSystem中的actor对象)。RpcEndPoint借助RpcService启动内部的RpcServer,之后通过RpcServer处理本地和远程调用。
  2. RpcService:每个节点启动时,会根据配置文件的IP、Port等信息初始化一个RpcService对象(对应akka的ActorSystem对象)。可以根据endpoint的地址创建Akka Actor实例,并基于Actor创建RpcServer接口的动态代理类。
  3. RpcServer:节点内的各个组件通过本节点的Service启动一个Server(既actor对象,处理具体的rpc调用,通过AkkaInvokeHandler代理类实现)。所有远端和本地的请求都会转发到AkkaInvokeHandler代理类中执行,AkkaInvokeHandler代理类会判断是远端还是本地请求。如果是远端请求,代理类会使用内部的ActorRef对象将方法调用封装后发送给远端执行(通过ActorRef的tell、ask方法)。
  4. GateWay接口是暴露给外界的一个远程调用的接口,要使用rpc调用对方的方法时,需要先获得一个对方GateWay接口的一个代理对象。使用代理对象进行rpc调用时,会将请求根据响应协议进行封装(既封装方法名、返回值、参数类型信息),并使用AkkaInvokeHandler代理类中的actorRef将请求发生给对方的actor,由对方处理完后再接受返回。
基于akka的rpc实现

本节基于flink的rpc设计模式,基于akka实现了一个简单的rpc调用框架。所有的rpc调用都会通过AkkainvokeHanlder代理类生成一个对方RpcgateWay接口的动态类转化为远程rpc调用。

  • AkkainvokeHanlder:代理对象,网管调用会被代理类通过内部的AkkaRpcActor转化为tcp请求。
  • AkkaRpcActor:系统所有Actor的抽象实现,处理远端的rpc请求(解析出方法名、返回类型、参数值,然后反射调用对应endpoint的方法)
  • EndPoint相关类:具体的方法实现(提供远端和本地调用),同时兼具flink中RpcService的角色(作为ActorSystem启动endpoint的actorSystem,并注册对应的actor)。
  • GateWay包:定义了各个endpoint需要暴露的rpc方法
  • Message包:定义了使用的协议类,RpcInvokeMessage类封装所有的rpc调用的方法名、返回值、参数等信息。
AkkainvokeHanlder
@AllArgsConstructor
public class AkkaInvokeHanlder implements InvocationHandler {

    private ActorRef actorRef;//对方端点对应的actor引用,用于发生rpc请求(tell/ask)
    private long timeOut;//rpc调用超时时间

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Class[] parameterTypes = method.getParameterTypes();
        Object result = invokeRpc(method,args,parameterTypes);
        return result;
    }

    private Object invokeRpc(Method method, Object[] args, Class[] parameterTypes) {
        RpcInvokeMessage rpcInvokeMessage = new RpcInvokeMessage(method.getName(), args, parameterTypes);
        Class returnType = method.getReturnType();
        Object res = null;
        if (Objects.equals(returnType,Void.TYPE)) {
        	//没有返回值的调用,使用tell
            actorRef.tell(rpcInvokeMessage,ActorRef.noSender());
        }else {
        	//有返回值的调用,使用ask方法,获得调用后的异步对象。
            CompletableFuture completableFuture = Patterns.ask(actorRef, rpcInvokeMessage, Duration.ofSeconds(timeOut)).toCompletableFuture();
            if (Objects.equals(returnType,CompletableFuture.class)) {
                //异步方法调用,返回异步future
                res = completableFuture;
            }else {
                try {
                    //同步方法调用,阻塞获取结果
                    res = completableFuture.get(timeOut, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
        }
        return res;
    }


}
 
AkkaRpcActor 
@AllArgsConstructor
public class AkkaRpcActor extends AbstractActor {
    //actor rpc调用的时机对象
    private EndPoint endPoint;
    private String uid;

    @Override
    public Receive createReceive() {
        return receiveBuilder()
        		//actor只对rpc调用进行处理
                .match(RpcInvokeMessage.class,this::handleRpcInvoke)
                .build();
    }

    public void handleRpcInvoke(RpcInvokeMessage invokeMessage) {
        String method = invokeMessage.getMethod();
        Object[] args = invokeMessage.getArgs();
        Class[] parameterTypes = invokeMessage.getParameterTypes();
        try {
            Method rpcMethod = endPoint.getClass().getMethod(method, parameterTypes);
            Class returnType = rpcMethod.getReturnType();
            if (rpcMethod != null) {
                rpcMethod.setAccessible(true);
                if (Objects.equals(returnType,Void.TYPE)) {
                    //无返回的rpc调用
                    rpcMethod.invoke(endPoint,args);
                }else {
                    //有返回rpc调用
                    Object result = rpcMethod.invoke(endPoint, args);
                    sender().tell(result,getSelf());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
EndPoint

封装既获取远端的连接等各个端点的一些通用方法

public class EndPoint {
    public  RpcGateWay connect(String path, Class clazz, ActorSystem actorSystem) {
        ActorRef actorRef = actorSystem.actorFor(path);
        //代理类
        AkkaInvokeHanlder akkaInvokeHanlder = new AkkaInvokeHanlder(actorRef,2);
        ClassLoader classLoader = clazz.getClassLoader();
        //被代理接口(gateway网管接口)
        Class[] interfaces = {clazz};
        RpcGateWay rpcGateWay = (RpcGateWay) Proxy.newProxyInstance(classLoader, interfaces, akkaInvokeHanlder);
        return rpcGateWay;
    }
}
JobManager
@RequiredArgsConstructor
@Data
public class JobManager extends EndPoint implements JobManagerGateWay {
    @NonNull
    private Integer executors;
    @NonNull
    private String hostName;
    @NonNull
    private String port;
    private HashMap uidToTaskManager = new HashMap<>();
    private HashMap uidToTaskManagerGateWay = new HashMap<>();
    private static ActorSystem masterSystem;

    @Override
    public Integer getNumberOfExecutors() {
        return this.executors;
    }

    @Override
    public String getHostName() {
        return this.hostName;
    }

    @Override
    public String getPort() {
        return this.port;
    }

    @Override
    public void registerTaskManager(RegisterMassage registerMassage) {
        if (!uidToTaskManager.containsKey(registerMassage.getUid())) {
            //保存taskManager的注册信息
            uidToTaskManager.put(registerMassage.getUid(),registerMassage);
            this.executors++;
            //获取taskManager的网管对象
            String workerPath = "akka.tcp://WorkerSystem@" + registerMassage.getHostName() + ":" + registerMassage.getPort() + "/user/" + registerMassage.getUid();
            TaskManagerGateWay taskManagerGateWay = (TaskManagerGateWay) connect(workerPath,TaskManagerGateWay.class,masterSystem);
            uidToTaskManagerGateWay.put(registerMassage.getUid(),taskManagerGateWay);
            System.out.println(registerMassage.getHostName() + " 建立了连接, 当前executors数:" + this.executors);
            //向taskManager发生task
            TaskInfo taskInfo = taskManagerGateWay.sendTask(new Task(UUID.randomUUID().toString()));
            System.out.println("向executor["+ taskInfo.getExecutorID() +"]部署task  taskInfo["+ taskInfo.toString() +"]");
        }else {
            System.out.println(registerMassage.getHostName() + " 重复建立连接");
        }
    }

    public static void main(String[] args) {
        Integer executor = 0;
        String hostName = "192.168.1.10";
        String port = "8080";
        HashMap map = new HashMap<>();
        map.put("akka.actor.provider","akka.remote.RemoteActorRefProvider");
        map.put("akka.remote.netty.tcp.hostname",hostName);
        map.put("akka.remote.netty.tcp.port",port);
        Config config = ConfigFactory.parseMap(map);
        masterSystem = ActorSystem.create("MasterSystem", config);

        JobManager jobManager = new JobManager(executor, hostName, port);
        masterSystem.actorOf(Props.create(AkkaRpcActor.class, ()-> new AkkaRpcActor(jobManager,"Master")),"Master");
    }
}
JobManager
@Data
public class TaskManager extends EndPoint implements TaskManagerGateWay {
    private String uid;
    private Integer slots;
    private Integer memories;
    private String hostName;
    private String port;
    private String masterPath;
    private JobManagerGateWay jobManagerGateWay;
    private Integer runningTask;
    private static ActorSystem workerSystem;



    @Override
    public Integer getNumberOfSlots() {
        return this.slots;
    }

    @Override
    public Integer getMemories() {
        return this.memories;
    }

    @Override
    public String getHostName() {
        return this.hostName;
    }

    @Override
    public String getPort() {
        return this.port;
    }

    @Override
    public TaskInfo sendTask(Task task) {
        System.out.println("成功接收task " + task.getTaskID());
        TaskInfo taskInfo = new TaskInfo(task.getTaskID(), true, uid, slots, memories);
        return taskInfo;
    }

    public TaskManager(String uid, Integer slots, Integer memories, String hostName, String port, String masterPath) {
        this.uid = uid;
        this.slots = slots;
        this.memories = memories;
        this.hostName = hostName;
        this.port = port;
        this.masterPath = masterPath;

        //TaskManager启动时向JobManager注册
        //获得JobManagerGateWay的代理对象
        this.jobManagerGateWay = (JobManagerGateWay) connect(this.masterPath,JobManagerGateWay.class,workerSystem);
        //进行注册
        this.jobManagerGateWay.registerTaskManager(new RegisterMassage(uid,slots,memories,hostName,port));
    }

    public static void main(String[] args) {
        String uid = UUID.randomUUID().toString();
        Integer slots = 3;
        Integer memories = 5;
        String hostName = "192.168.119.1";
        String port = "8080";
        String masterPath = "akka.tcp://[email protected]:8080/user/Master";
        HashMap map = new HashMap<>();
        map.put("akka.actor.provider","akka.remote.RemoteActorRefProvider");
        map.put("akka.remote.netty.tcp.hostname",hostName);
        map.put("akka.remote.netty.tcp.port",port);
        Config config = ConfigFactory.parseMap(map);
        workerSystem = ActorSystem.create("WorkerSystem", config);
        TaskManager taskManager = new TaskManager(uid, slots, memories, hostName, port, masterPath);

        workerSystem.actorOf(Props.create(AkkaRpcActor.class, ()-> new AkkaRpcActor(taskManager,uid)),uid);


    }
}
RpcInvokeMessage

通信协议类

@Data
@AllArgsConstructor
public class RpcInvokeMessage implements Serializable {
    private String method;
    private Object[] args;
    private Class[] parameterTypes;
}

源码git地址

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5679180.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)