最近阅读了flink 1.10版本网络通信部分的相关源码,网络部分主要有两块:
- 组件之间的rpc调用(akka)
- taskManager之间的数据传输(netty实现)
本文主要解析flink底层是如何使用akka封装的rpc,并模仿flink的设计方式,基于akka实现一个简单的rpc通信框架。
组件RPC通信架构flink底层的rpc调用主要分为RpcEndPoint、RpcService、RpcServer、GateWay三个模块。
- RpcEndPoint:每个组件的基本实现,主要包含endpointId用于唯一标记当前的rpc节点(既actorSystem中的actor对象)。RpcEndPoint借助RpcService启动内部的RpcServer,之后通过RpcServer处理本地和远程调用。
- RpcService:每个节点启动时,会根据配置文件的IP、Port等信息初始化一个RpcService对象(对应akka的ActorSystem对象)。可以根据endpoint的地址创建Akka Actor实例,并基于Actor创建RpcServer接口的动态代理类。
- RpcServer:节点内的各个组件通过本节点的Service启动一个Server(既actor对象,处理具体的rpc调用,通过AkkaInvokeHandler代理类实现)。所有远端和本地的请求都会转发到AkkaInvokeHandler代理类中执行,AkkaInvokeHandler代理类会判断是远端还是本地请求。如果是远端请求,代理类会使用内部的ActorRef对象将方法调用封装后发送给远端执行(通过ActorRef的tell、ask方法)。
- GateWay接口是暴露给外界的一个远程调用的接口,要使用rpc调用对方的方法时,需要先获得一个对方GateWay接口的一个代理对象。使用代理对象进行rpc调用时,会将请求根据响应协议进行封装(既封装方法名、返回值、参数类型信息),并使用AkkaInvokeHandler代理类中的actorRef将请求发生给对方的actor,由对方处理完后再接受返回。
本节基于flink的rpc设计模式,基于akka实现了一个简单的rpc调用框架。所有的rpc调用都会通过AkkainvokeHanlder代理类生成一个对方RpcgateWay接口的动态类转化为远程rpc调用。
- AkkainvokeHanlder:代理对象,网管调用会被代理类通过内部的AkkaRpcActor转化为tcp请求。
- AkkaRpcActor:系统所有Actor的抽象实现,处理远端的rpc请求(解析出方法名、返回类型、参数值,然后反射调用对应endpoint的方法)
- EndPoint相关类:具体的方法实现(提供远端和本地调用),同时兼具flink中RpcService的角色(作为ActorSystem启动endpoint的actorSystem,并注册对应的actor)。
- GateWay包:定义了各个endpoint需要暴露的rpc方法
- Message包:定义了使用的协议类,RpcInvokeMessage类封装所有的rpc调用的方法名、返回值、参数等信息。
@AllArgsConstructor public class AkkaInvokeHanlder implements InvocationHandler { private ActorRef actorRef;//对方端点对应的actor引用,用于发生rpc请求(tell/ask) private long timeOut;//rpc调用超时时间 @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class>[] parameterTypes = method.getParameterTypes(); Object result = invokeRpc(method,args,parameterTypes); return result; } private Object invokeRpc(Method method, Object[] args, Class>[] parameterTypes) { RpcInvokeMessage rpcInvokeMessage = new RpcInvokeMessage(method.getName(), args, parameterTypes); Class> returnType = method.getReturnType(); Object res = null; if (Objects.equals(returnType,Void.TYPE)) { //没有返回值的调用,使用tell actorRef.tell(rpcInvokeMessage,ActorRef.noSender()); }else { //有返回值的调用,使用ask方法,获得调用后的异步对象。 CompletableFutureAkkaRpcActor
@AllArgsConstructor public class AkkaRpcActor extends AbstractActor { //actor rpc调用的时机对象 private EndPoint endPoint; private String uid; @Override public Receive createReceive() { return receiveBuilder() //actor只对rpc调用进行处理 .match(RpcInvokeMessage.class,this::handleRpcInvoke) .build(); } public void handleRpcInvoke(RpcInvokeMessage invokeMessage) { String method = invokeMessage.getMethod(); Object[] args = invokeMessage.getArgs(); Class>[] parameterTypes = invokeMessage.getParameterTypes(); try { Method rpcMethod = endPoint.getClass().getMethod(method, parameterTypes); Class> returnType = rpcMethod.getReturnType(); if (rpcMethod != null) { rpcMethod.setAccessible(true); if (Objects.equals(returnType,Void.TYPE)) { //无返回的rpc调用 rpcMethod.invoke(endPoint,args); }else { //有返回rpc调用 Object result = rpcMethod.invoke(endPoint, args); sender().tell(result,getSelf()); } } } catch (Exception e) { e.printStackTrace(); } } }EndPoint
封装既获取远端的连接等各个端点的一些通用方法
public class EndPoint { publicJobManagerRpcGateWay connect(String path, Class clazz, ActorSystem actorSystem) { ActorRef actorRef = actorSystem.actorFor(path); //代理类 AkkaInvokeHanlder akkaInvokeHanlder = new AkkaInvokeHanlder(actorRef,2); ClassLoader classLoader = clazz.getClassLoader(); //被代理接口(gateway网管接口) Class[] interfaces = {clazz}; RpcGateWay rpcGateWay = (RpcGateWay) Proxy.newProxyInstance(classLoader, interfaces, akkaInvokeHanlder); return rpcGateWay; } }
@RequiredArgsConstructor @Data public class JobManager extends EndPoint implements JobManagerGateWay { @NonNull private Integer executors; @NonNull private String hostName; @NonNull private String port; private HashMapJobManageruidToTaskManager = new HashMap<>(); private HashMap uidToTaskManagerGateWay = new HashMap<>(); private static ActorSystem masterSystem; @Override public Integer getNumberOfExecutors() { return this.executors; } @Override public String getHostName() { return this.hostName; } @Override public String getPort() { return this.port; } @Override public void registerTaskManager(RegisterMassage registerMassage) { if (!uidToTaskManager.containsKey(registerMassage.getUid())) { //保存taskManager的注册信息 uidToTaskManager.put(registerMassage.getUid(),registerMassage); this.executors++; //获取taskManager的网管对象 String workerPath = "akka.tcp://WorkerSystem@" + registerMassage.getHostName() + ":" + registerMassage.getPort() + "/user/" + registerMassage.getUid(); TaskManagerGateWay taskManagerGateWay = (TaskManagerGateWay) connect(workerPath,TaskManagerGateWay.class,masterSystem); uidToTaskManagerGateWay.put(registerMassage.getUid(),taskManagerGateWay); System.out.println(registerMassage.getHostName() + " 建立了连接, 当前executors数:" + this.executors); //向taskManager发生task TaskInfo taskInfo = taskManagerGateWay.sendTask(new Task(UUID.randomUUID().toString())); System.out.println("向executor["+ taskInfo.getExecutorID() +"]部署task taskInfo["+ taskInfo.toString() +"]"); }else { System.out.println(registerMassage.getHostName() + " 重复建立连接"); } } public static void main(String[] args) { Integer executor = 0; String hostName = "192.168.1.10"; String port = "8080"; HashMap map = new HashMap<>(); map.put("akka.actor.provider","akka.remote.RemoteActorRefProvider"); map.put("akka.remote.netty.tcp.hostname",hostName); map.put("akka.remote.netty.tcp.port",port); Config config = ConfigFactory.parseMap(map); masterSystem = ActorSystem.create("MasterSystem", config); JobManager jobManager = new JobManager(executor, hostName, port); masterSystem.actorOf(Props.create(AkkaRpcActor.class, ()-> new AkkaRpcActor(jobManager,"Master")),"Master"); } }
@Data public class TaskManager extends EndPoint implements TaskManagerGateWay { private String uid; private Integer slots; private Integer memories; private String hostName; private String port; private String masterPath; private JobManagerGateWay jobManagerGateWay; private Integer runningTask; private static ActorSystem workerSystem; @Override public Integer getNumberOfSlots() { return this.slots; } @Override public Integer getMemories() { return this.memories; } @Override public String getHostName() { return this.hostName; } @Override public String getPort() { return this.port; } @Override public TaskInfo sendTask(Task task) { System.out.println("成功接收task " + task.getTaskID()); TaskInfo taskInfo = new TaskInfo(task.getTaskID(), true, uid, slots, memories); return taskInfo; } public TaskManager(String uid, Integer slots, Integer memories, String hostName, String port, String masterPath) { this.uid = uid; this.slots = slots; this.memories = memories; this.hostName = hostName; this.port = port; this.masterPath = masterPath; //TaskManager启动时向JobManager注册 //获得JobManagerGateWay的代理对象 this.jobManagerGateWay = (JobManagerGateWay) connect(this.masterPath,JobManagerGateWay.class,workerSystem); //进行注册 this.jobManagerGateWay.registerTaskManager(new RegisterMassage(uid,slots,memories,hostName,port)); } public static void main(String[] args) { String uid = UUID.randomUUID().toString(); Integer slots = 3; Integer memories = 5; String hostName = "192.168.119.1"; String port = "8080"; String masterPath = "akka.tcp://MasterSystem@192.168.1.10:8080/user/Master"; HashMapRpcInvokeMessagemap = new HashMap<>(); map.put("akka.actor.provider","akka.remote.RemoteActorRefProvider"); map.put("akka.remote.netty.tcp.hostname",hostName); map.put("akka.remote.netty.tcp.port",port); Config config = ConfigFactory.parseMap(map); workerSystem = ActorSystem.create("WorkerSystem", config); TaskManager taskManager = new TaskManager(uid, slots, memories, hostName, port, masterPath); workerSystem.actorOf(Props.create(AkkaRpcActor.class, ()-> new AkkaRpcActor(taskManager,uid)),uid); } }
通信协议类
@Data @AllArgsConstructor public class RpcInvokeMessage implements Serializable { private String method; private Object[] args; private Class>[] parameterTypes; }
源码git地址
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)