flink 1.14 异步 join 基础分析

flink 1.14 异步 join 基础分析,第1张

flink 1.14 异步 join 基础分析

@[TOC] flink 1.14 异步 join 思考

背景

flink 1.14 还没提供异步sql 版本jdbc join ,同时也没提供自定义传入SQL 查询结果集,然后再join的功能,但是他们提供了相关接口,恰好这两个功能对提升join 性能,以及SQL的灵活性上有需求,实现了一个版本。其中遇到一个问题,“异步join 的时候,如何保证顺序”?

你需要知道
 CustomSqlRowDataAsyncLookupFunction extends AsyncTableFunction {
 
 public void eval(CompletableFuture> future, Object... keys) {
 // 异步实现逻辑
 // 通过继承 AsyncTableFunction 实现自己的异步函数
 }
}

其次写过原生datastream 进行异步join的得知道有两个类:

// 这是异步以前用这个实现异步join 的方法,以及参数:AsyncDataStream
 public static  SingleOutputStreamOperator unorderedWait(
            DataStream in, 
            AsyncFunction func, 
            long timeout, 
            TimeUnit timeUnit) {
        return addOperator(, OutputMode.UNORDERED);
    }


  public static  SingleOutputStreamOperator orderedWait(
            DataStream in,
            AsyncFunction func,
            long timeout,
            TimeUnit timeUnit,
            int capacity) {
        return addOperator(, OutputMode.ORDERED);
    }
    
// 核心参数,也就是Operator 通过这个区分了是否有序
OutputMode.UNORDERED
OutputMode.ORDERED
其次

flink 不至于用多套逻辑实现异步,flink-sql 是翻译成算子,也是调用了datastream api 实现这个逻辑,因此我们来看看。
对于async 的 Operator 会有自己的工厂方法:

    
    public AsyncWaitOperatorFactory(
            AsyncFunction asyncFunction,
            long timeout,
            int capacity,
            AsyncDataStream.OutputMode outputMode) {
        // 定义的function,包括自定义的function。
        // 本质上我们的 *** 作都是一个function进行处理的
        this.asyncFunction = asyncFunction;
        // 异步的情况一般是需要队列+异步完成时间实现的
        // 默认timeout:180000 capacity:100 
        this.timeout = timeout;
        this.capacity = capacity;
        // 这就是我们的是否顺序参数
        this.outputMode = outputMode;
        // 默认算子会合并在一起执行
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }
    
    
    @Override
    public > T createStreamOperator(
            StreamOperatorParameters parameters) {
            // outputMode 是 Orderded
        AsyncWaitOperator asyncWaitOperator =
                new AsyncWaitOperator(
                        asyncFunction,
                        timeout,
                        capacity,
                        outputMode,
                        processingTimeService,
                        getMailboxExecutor());
        // 对asyncWaitOperator 内部进行初始化
        // 看看内部做了啥               
        asyncWaitOperator.setup(
                parameters.getContainingTask(),
                parameters.getStreamConfig(),
                parameters.getOutput());
        return (T) asyncWaitOperator;
    }

在这里,如果你用自定义的asyncLookupFunction 启动sql任务,本地进行debug到这里就会发现:
outputMode 参数值是:“ordered” 也就是保证顺序的。也就是说即使我们用asyncFunction,那么
内部还是给我们做了保证顺序。

异步如何保证顺序?

思考一下:
让你们实现一个异步处理数据的程序,但是要保证顺序输出,也就是进来的顺序和输出的顺序相同。
相信大家很容易想到用队列实现就行,为了简单,我简单画图说明一下:

1.数据流 1,2,3,4顺序到达
2.优先进入顺序队列queue ,同时异步线程(比如:CompletableFuture) 进行处理,返回future
3.程序优先从队列 queue 取出来,循环判断 future.isDone

这样必须等队列数据1 处理完成,才会去获取1的结果。但是 2 3 会异步进行处理,相当于提高了并发。当然需要控制队列长度,以及获取的超时时间。

看看flink怎么做

继续 进入AsyncWaitOperator#setup 看实现

 // Queue, into which to store the currently in-flight stream elements
 // 开始就定义了具体的队列存储
 private transient StreamElementQueue queue;
 public void setup() {
        
        switch (outputMode) {
            case ORDERED:
            	// 初始化元素队列(有序)
                queue = new OrderedStreamElementQueue<>(capacity);
                break;
            case UNORDERED:
                queue = new UnorderedStreamElementQueue<>(capacity);
                break;
            default:
                throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
        }
        this.timestampedCollector = new TimestampedCollector<>(super.output);
    }

在operator 内部有个核心处理数据的方法:

    // 当数据真正进入算子内部
    public void processElement(StreamRecord record) throws Exception {
        StreamRecord element;
        
        // 先加入队列
        final ResultFuture entry = addToWorkQueue(element);
        // ResultHandler 实现了 ResultFuture,内部有回调结果的 *** 作
        final ResultHandler resultHandler = new ResultHandler(element, entry);
		if (timeout > 0L) {
		    // 注册一个超时的东西
            resultHandler.registerTimeout(getProcessingTimeService(), timeout);
        }
        // 调用我们的函数进行处理
        userFunction.asyncInvoke(element.getValue(), resultHandler);
  }

而在addToWorkQueue 内部,简单看看如何 *** 作

 private ResultFuture addToWorkQueue(StreamElement streamElement)
            throws InterruptedException {
        Optional> queueEntry;
        // 调用tryPut进行存放元素
        while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {
            mailboxExecutor.yield();
        }
        return queueEntry.get();
    }

然后看看 OrderedStreamElementQueue 的结构和 tryPut 方法

  // 内部就是一个ArrayDeque,元素是StreamElementQueueEntry
  private final Queue> queue;
  public OrderedStreamElementQueue(int capacity) {
        this.capacity = capacity;
        this.queue = new ArrayDeque<>(capacity);
    }
 // 创建元素   
 private StreamElementQueueEntry createEntry(StreamElement streamElement) {}
 // 存储元素
 public Optional> tryPut(StreamElement streamElement) {
   //略
   StreamElementQueueEntry queueEntry = createEntry(streamElement);
   // Queue> queue 存放值得地方
   queue.add(queueEntry);
 }

这里看到,创建元素之后会返回一个ResultFuture 的异步对象,因为 StreamElementQueueEntry 是继承了 ResultFuture 的接口

interface StreamElementQueueEntry extends ResultFuture {
   boolean isDone();
   void emitResult(TimestampedCollector output);
   StreamElement getInputElement();
   default void completeExceptionally(Throwable error) {}
}
数据进入队列后,我们是join完成?

当我们异步处理完成数据之后,肯定会调用:

public void eval(CompletableFuture> future, Object... keys) throws InterruptedException {
   // 返回数据
   future.complete(rowData);
}

这里实际上是会回调的 ResultFuture 的 实现下到 AsyncWaitOperator#complete

        public void complete(Collection results) {
            if (!completed.compareAndSet(false, true)) {
                return;
            }
            processInMailbox(results);
        }
       //  processInMailbox -> 直到这里
       private void processResults(Collection results) {
            // 取消定时器超时控制
            if (timeoutTimer != null) {
                // canceling in mailbox thread avoids
                // https://issues.apache.org/jira/browse/Flink-13635
                timeoutTimer.cancel(true);
            }
            // update the queue entry with the result
            resultFuture.complete(results);
            // 输出已经异步函数里面返回的元素
            outputCompletedElement();
        }

然后进入刚才的队列 OrderedStreamElementQueue

private void outputCompletedElement() {
        if (queue.hasCompletedElements()) {
            // emit only one element to not block the mailbox thread unnecessarily
            queue.emitCompletedElement(timestampedCollector);
             
         }   
    }
    public void emitCompletedElement(TimestampedCollector output) {
        if (hasCompletedElements()) {
            // 是先取头节点
            final StreamElementQueueEntry head = queue.poll();
            head.emitResult(output);
        }
    }
     // 这里控制顺序,每次判断是否完成,都是取头部元素进行处理
    public boolean hasCompletedElements() {
        return !queue.isEmpty() && queue.peek().isDone();
    }
    

    public void emitResult(TimestampedCollector output) {
        output.setTimestamp(inputRecord);
        for (OUT r : completedElements) {
            // 发送数据
            output.collect(r);
        }
    }

这里就数据发送完成了。

小结
  1. 我们用tableApi 做 async sql 函数的时候,实际内部用了异步,保顺。注意:这里是partition 保证顺序。
  2. 基本原理是把元素加入有序队列,每次complate取判断头部元素是否完成,再往下游发送做到保持顺序
  3. 内部有对接受数据是watermark 以及超时等处理,没仔细分析。有兴趣可以再看看
    4.有问题可以留言沟通、指正

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5676657.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存