@[TOC] flink 1.14 异步 join 思考
背景flink 1.14 还没提供异步sql 版本jdbc join ,同时也没提供自定义传入SQL 查询结果集,然后再join的功能,但是他们提供了相关接口,恰好这两个功能对提升join 性能,以及SQL的灵活性上有需求,实现了一个版本。其中遇到一个问题,“异步join 的时候,如何保证顺序”?
你需要知道CustomSqlRowDataAsyncLookupFunction extends AsyncTableFunction{ public void eval(CompletableFuture > future, Object... keys) { // 异步实现逻辑 // 通过继承 AsyncTableFunction 实现自己的异步函数 } }
其次写过原生datastream 进行异步join的得知道有两个类:
// 这是异步以前用这个实现异步join 的方法,以及参数:AsyncDataStream public static其次SingleOutputStreamOperator unorderedWait( DataStream in, AsyncFunction func, long timeout, TimeUnit timeUnit) { return addOperator(, OutputMode.UNORDERED); } public static SingleOutputStreamOperator orderedWait( DataStream in, AsyncFunction func, long timeout, TimeUnit timeUnit, int capacity) { return addOperator(, OutputMode.ORDERED); } // 核心参数,也就是Operator 通过这个区分了是否有序 OutputMode.UNORDERED OutputMode.ORDERED
flink 不至于用多套逻辑实现异步,flink-sql 是翻译成算子,也是调用了datastream api 实现这个逻辑,因此我们来看看。
对于async 的 Operator 会有自己的工厂方法:
public AsyncWaitOperatorFactory( AsyncFunctionasyncFunction, long timeout, int capacity, AsyncDataStream.OutputMode outputMode) { // 定义的function,包括自定义的function。 // 本质上我们的 *** 作都是一个function进行处理的 this.asyncFunction = asyncFunction; // 异步的情况一般是需要队列+异步完成时间实现的 // 默认timeout:180000 capacity:100 this.timeout = timeout; this.capacity = capacity; // 这就是我们的是否顺序参数 this.outputMode = outputMode; // 默认算子会合并在一起执行 this.chainingStrategy = ChainingStrategy.ALWAYS; } @Override public > T createStreamOperator( StreamOperatorParameters parameters) { // outputMode 是 Orderded AsyncWaitOperator asyncWaitOperator = new AsyncWaitOperator( asyncFunction, timeout, capacity, outputMode, processingTimeService, getMailboxExecutor()); // 对asyncWaitOperator 内部进行初始化 // 看看内部做了啥 asyncWaitOperator.setup( parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); return (T) asyncWaitOperator; }
在这里,如果你用自定义的asyncLookupFunction 启动sql任务,本地进行debug到这里就会发现:
outputMode 参数值是:“ordered” 也就是保证顺序的。也就是说即使我们用asyncFunction,那么
内部还是给我们做了保证顺序。
思考一下:
让你们实现一个异步处理数据的程序,但是要保证顺序输出,也就是进来的顺序和输出的顺序相同。
相信大家很容易想到用队列实现就行,为了简单,我简单画图说明一下:
1.数据流 1,2,3,4顺序到达
2.优先进入顺序队列queue ,同时异步线程(比如:CompletableFuture) 进行处理,返回future
3.程序优先从队列 queue 取出来,循环判断 future.isDone
这样必须等队列数据1 处理完成,才会去获取1的结果。但是 2 3 会异步进行处理,相当于提高了并发。当然需要控制队列长度,以及获取的超时时间。
看看flink怎么做继续 进入AsyncWaitOperator#setup 看实现
// Queue, into which to store the currently in-flight stream elements // 开始就定义了具体的队列存储 private transient StreamElementQueuequeue; public void setup() { switch (outputMode) { case ORDERED: // 初始化元素队列(有序) queue = new OrderedStreamElementQueue<>(capacity); break; case UNORDERED: queue = new UnorderedStreamElementQueue<>(capacity); break; default: throw new IllegalStateException("Unknown async mode: " + outputMode + '.'); } this.timestampedCollector = new TimestampedCollector<>(super.output); }
在operator 内部有个核心处理数据的方法:
// 当数据真正进入算子内部 public void processElement(StreamRecordrecord) throws Exception { StreamRecord element; // 先加入队列 final ResultFuture entry = addToWorkQueue(element); // ResultHandler 实现了 ResultFuture,内部有回调结果的 *** 作 final ResultHandler resultHandler = new ResultHandler(element, entry); if (timeout > 0L) { // 注册一个超时的东西 resultHandler.registerTimeout(getProcessingTimeService(), timeout); } // 调用我们的函数进行处理 userFunction.asyncInvoke(element.getValue(), resultHandler); }
而在addToWorkQueue 内部,简单看看如何 *** 作
private ResultFutureaddToWorkQueue(StreamElement streamElement) throws InterruptedException { Optional > queueEntry; // 调用tryPut进行存放元素 while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) { mailboxExecutor.yield(); } return queueEntry.get(); }
然后看看 OrderedStreamElementQueue 的结构和 tryPut 方法
// 内部就是一个ArrayDeque,元素是StreamElementQueueEntry private final Queue> queue; public OrderedStreamElementQueue(int capacity) { this.capacity = capacity; this.queue = new ArrayDeque<>(capacity); } // 创建元素 private StreamElementQueueEntry createEntry(StreamElement streamElement) {} // 存储元素 public Optional > tryPut(StreamElement streamElement) { //略 StreamElementQueueEntry queueEntry = createEntry(streamElement); // Queue > queue 存放值得地方 queue.add(queueEntry); }
这里看到,创建元素之后会返回一个ResultFuture 的异步对象,因为 StreamElementQueueEntry 是继承了 ResultFuture 的接口
interface StreamElementQueueEntry数据进入队列后,我们是join完成?extends ResultFuture { boolean isDone(); void emitResult(TimestampedCollector output); StreamElement getInputElement(); default void completeExceptionally(Throwable error) {} }
当我们异步处理完成数据之后,肯定会调用:
public void eval(CompletableFuture> future, Object... keys) throws InterruptedException { // 返回数据 future.complete(rowData); }
这里实际上是会回调的 ResultFuture 的 实现下到 AsyncWaitOperator#complete
public void complete(Collectionresults) { if (!completed.compareAndSet(false, true)) { return; } processInMailbox(results); } // processInMailbox -> 直到这里 private void processResults(Collection results) { // 取消定时器超时控制 if (timeoutTimer != null) { // canceling in mailbox thread avoids // https://issues.apache.org/jira/browse/Flink-13635 timeoutTimer.cancel(true); } // update the queue entry with the result resultFuture.complete(results); // 输出已经异步函数里面返回的元素 outputCompletedElement(); }
然后进入刚才的队列 OrderedStreamElementQueue
private void outputCompletedElement() { if (queue.hasCompletedElements()) { // emit only one element to not block the mailbox thread unnecessarily queue.emitCompletedElement(timestampedCollector); } }
public void emitCompletedElement(TimestampedCollectoroutput) { if (hasCompletedElements()) { // 是先取头节点 final StreamElementQueueEntry head = queue.poll(); head.emitResult(output); } } // 这里控制顺序,每次判断是否完成,都是取头部元素进行处理 public boolean hasCompletedElements() { return !queue.isEmpty() && queue.peek().isDone(); } public void emitResult(TimestampedCollector output) { output.setTimestamp(inputRecord); for (OUT r : completedElements) { // 发送数据 output.collect(r); } }
这里就数据发送完成了。
小结- 我们用tableApi 做 async sql 函数的时候,实际内部用了异步,保顺。注意:这里是partition 保证顺序。
- 基本原理是把元素加入有序队列,每次complate取判断头部元素是否完成,再往下游发送做到保持顺序
- 内部有对接受数据是watermark 以及超时等处理,没仔细分析。有兴趣可以再看看
4.有问题可以留言沟通、指正
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)