这里创建了一个 Task 对象并启动,我们来看一下 Task 启动的时候都做了什么
首先向 blobService、netWork 注册 job ,添加监控,将jar 等添加到分布式缓存中,然后就 invoke,这也是 task 真正开始执行的地方,我们以 StreamTask 为例
init 然后对于一些 Rich Function 会先执行其 open方法,然后开始 run,就开始真正的消费数据了。我们以 flatMap 为例
当执行 run 方法时,首先呢 OneInputStreamTask.run
这一块的逻辑可具体参考
一文搞定 Flink 消费消息的全流程 、 一文搞定 Flink Checkpoint Barrier 全流程 以及 一文搞懂 Flink 处理 Barrier 全过程
我们知道当往下游发送数据的时候
继续追踪下去到 StreamFlatMap.processElement
其他的类似,如果是 kafka source task 调用的是 headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()),然后去消费 kafka 中的数据。然后跟前面 一文搞定 Flink Job 提交全流程 、 写给大忙人看的Flink 消费 Kafka 、 一文搞定 Flink 消费消息的全流程 以及 一文搞定 Flink Checkpoint Barrier 全流程 就可以串起来了。而 Flink 整体流程的分析,除了 restore 之外,也差不多可以告一段落了。
yarn-session.sh(开辟资源) + flink run(提交任务)1.在yarn上启动一个Flink会话,node1上执行以下命令
/export/server/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
说明:
申请2个CPU、1600M内存
-n 表示申请2个容器,这里指的就是多少个taskmanager
-tm 表示每个TaskManager的内存大小
-s 表示每个TaskManager的slots数量
-d 表示以后台程序方式运行
注意:
该警告不用管
WARN org.apache.hadoop.hdfs.DFSClient - Caught exception
java.lang.InterruptedException
http://node1:8088/cluster
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
运行完之后可以继续运行其他的小任务
/export/server/flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar
yarn application -kill application_1599402747874_0001
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)