一文搞定 Flink Task 提交执行全流程

一文搞定 Flink Task 提交执行全流程,第1张

上一篇 一文搞定 Flink Job 提交全流程 ,我们知道每一个 operator chain 作为一个整体,提交 task 。

这里创建了一个 Task 对象并启动,我们来看一下 Task 启动的时候都做了什么

首先向 blobService、netWork 注册 job ,添加监控,将jar 等添加到分布式缓存中,然后就 invoke,这也是 task 真正开始执行的地方,我们以 StreamTask 为例

init 然后对于一些 Rich Function 会先执行其 open方法,然后开始 run,就开始真正的消费数据了。我们以 flatMap 为例

当执行 run 方法时,首先呢 OneInputStreamTask.run

这一块的逻辑可具体参考

一文搞定 Flink 消费消息的全流程 、 一文搞定 Flink Checkpoint Barrier 全流程 以及 一文搞懂 Flink 处理 Barrier 全过程

我们知道当往下游发送数据的时候

继续追踪下去到 StreamFlatMap.processElement

其他的类似,如果是 kafka source task 调用的是 headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()),然后去消费 kafka 中的数据。然后跟前面 一文搞定 Flink Job 提交全流程 、 写给大忙人看的Flink 消费 Kafka 、 一文搞定 Flink 消费消息的全流程 以及 一文搞定 Flink Checkpoint Barrier 全流程 就可以串起来了。而 Flink 整体流程的分析,除了 restore 之外,也差不多可以告一段落了。

yarn-session.sh(开辟资源) + flink run(提交任务)

1.在yarn上启动一个Flink会话,node1上执行以下命令

/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

说明:

申请2个CPU、1600M内存

-n 表示申请2个容器,这里指的就是多少个taskmanager

-tm 表示每个TaskManager的内存大小

-s 表示每个TaskManager的slots数量

-d 表示以后台程序方式运行

注意:

该警告不用管

WARN org.apache.hadoop.hdfs.DFSClient - Caught exception

java.lang.InterruptedException

http://node1:8088/cluster

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar

运行完之后可以继续运行其他的小任务

/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar

yarn application -kill application_1599402747874_0001


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/11822606.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-18
下一篇 2023-05-18

发表评论

登录后才能评论

评论列表(0条)

保存