目录
第一部分 Flink 概述
第 1 节 什么是 Flink
第 2 节 Flink 特点
第 3 节 Flink 应用场景
3.1 事务型处理
3.2 分析型处理
第 4 节 Flink 核心组成及生态发展
4.1 Flink核心组成
4.2 Flink生态发展
第 5 节 Flink 处理模型:流处理与批处理
第 6 节 流处理引擎的技术选型
第二部分 Flink快速应用
第 1 节 单词统计案例(批数据)
1.1 需求
1.2 代码实现
第 2 节 单词统计案例(流数据)
2.1 需求
2.2 代码实现
第三部分 Flink体系结构
第 1 节 Flink的重要角色
第 2 节 Flink运行架构
2.1 Flink程序结构
2.2 Task和SubTask
2.3 Operator chain( *** 作器链)
2.4 Flink中的数据传输
2.5 任务槽和槽共享
第四部分 Flink安装和部署
1.1 基础环境
1.2 安装包下载
1.3 集群规划
1.4 StandAlone模式部署
1.5 Yarn模式部署
(1)启动一个YARN session(Start a long-running Flink cluster on YARN);
(2)直接在YARN上提交运行Flink作业(Run a Flink job on YARN)
第一部分 Flink 概述 第 1 节 什么是 Flink
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。
Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
第 2 节 Flink 特点Flink 是一个开源的流处理框架,它具有以下特点
- 批流一体:统一批处理、流处理
- 分布式:Flink程序可以运行在多台机器上
- 高性能:处理性能比较高
- 高可用:Flink支持高可用性(HA)
- 准确:Flink可以保证数据处理的准确性
Flink主要应用于流式数据分析场景 数据无处不在,绝大多数的企业所采取的处理数据的架构都会划分成两类:事务型处理、分析型处理
3.1 事务型处理OLTP On-Line Transaction Processing :联机事务处理过程。
流程审批、数据录入、填报等
特点:线下工作线上化,数据保存在各自的系统中,互不相通(数据孤岛)
OLTP:联机事务处理系统是一种以事务元作为数据处理的单位、人机交互的计算机应用系统。 它能对数据进行即时更新或其他 *** 作,系统内的数据总是保持在最新状态。
用户可将一组保持数据一致性的 *** 作序列指定为一个事务元,通过终端、个人计算机或其他设备输入事 务元,经系统处理后返回结果,
OLTP主要用来记录某类业务事件的发生,如购买行为,当行为产生后,系统会记录是谁在何时何地做 了何事,这样的一行(或多行)数据会以增删改的方式在数据库中进行数据的更新处理 *** 作,要求实时 性高、稳定性强、确保数据及时更新成功。
应用于飞机订票、银行出纳、股票交易、超市销售、饭店前后管理等实时系统
比如公司常见的业务系统如ERP,CRM,OA等系统都属于OLTP
ERP: Enterprise Resource Planning 企业资源计划
CRM:Customer Relationship Management 客户关系管理
OA:Office Automation 办公自动化
期间每处理一条事件,应用都会通过执行远程数据库系统的事务来读取或更新状态。很多时候,多个应 用会共享同一个数据库系统,有时候还会访问相同的数据库或表。
该设计在应用需要更新或数据库扩缩容或更改表模式的时候容易导致问题。
3.2 分析型处理当数据积累到一定的程度,我们需要对过去发生的事情做一个总结分析时,就需要把过去一段时间内产 生的数据拿出来进行统计分析,从中获取我们想要的信息,为公司做决策提供支持,这时候就是在做 OLAP了。
因为OLTP所产生的业务数据分散在不同的业务系统中,而OLAP往往需要将不同的业务数据集中到一起 进行统一综合的分析,这时候就需要根据业务分析需求做对应的数据清洗后存储在数据仓库中,然后由 数据仓库来统一提供OLAP分析
OLAP On-Line Analytical Processing :联机分析系统
分析报表,分析决策等
根据业务分析需求做对应的数据清洗后存储在数据仓库中称为ETL
ETL : Extract-Transform-Load: 从事务型数据库中提取数据,将其转化成通用的表示形式(可能包含数 据验证,数据归一化,编码、去重、表模式转化等工作),最终加载到分析型数据库中。
OLAP的实现方案一:(数仓)
如上图所示,数据实时写入 Hbase,实时的数据更新也在 Hbase 完成,为了应对 OLAP 需求,我们定时(通常是 T+1 或者 T+H)将 Hbase 数据写成静态的文件(如:Parquet)导入到 OLAP 引擎(如: HDFS,比较常见的是Impala *** 作Hive)。这一架构能满足既需要随机读写,又可以支持 OLAP 分析的 场景,
但有如下缺点:
- 架构复杂。从架构上看,数据在 Hbase、消息队列、HDFS 间流转,涉及环节太多,运维成本很 高。并且每个环节需要保证高可用,都需要维护多个副本,存储空间也有一定的浪费。最后数据在 多个系统上,对数据安全策略、监控等都提出了挑战。
- 时效性低。数据从 Hbase 导出成静态文件是周期性的,一般这个周期是一天(或一小时),在时 效性上不是很高。
- 难以应对后续的更新。真实场景中,总会有数据是「延迟」到达的。如果这些数据之前已经从 Hbase 导出到 HDFS,新到的变更数据就难以处理了,一个方案是把原有数据应用上新的变更后 重写一遍,但这代价又很高。
通常数据仓库中的查询可以分为两类:
1、普通查询:是定制的
2、即席查询:是用户自定义查询条件的
实时ETL
集成流计算现有的诸多数据通道和SQL灵活的加工能力,对流式数据进行实时清洗、归并和结构化处理;同时,对离线数仓进行有效的补充和优化,并为数据实时传输提供可计算通道。
实时报表
实时化采集、加工流式数据存储;实时监控和展现业务、客户各类指标,让数据化运营实时化。 如通过分析订单处理系统中的数据获知销售增长率; 通过分析分析运输延迟原因或预测销售量调整库存;
监控预警
对系统和用户行为进行实时监测和分析,以便及时发现危险行为,如计算机网络入侵、诈骗预警等
在线系统
实时计算各类数据指标,并利用实时结果及时调整在线系统的相关策略,在各类内容投放、智能推 送领域有大量的应用,如在客户浏览商品的同时推荐相关商品等
第 4 节 Flink 核心组成及生态发展 4.1 Flink核心组成Deploy层:
- 可以启动单个JVM,让Flink以Local模式运行
- Flink也可以以Standalone 集群模式运行,同时也支持Flink ON YARN,Flink应用直接提交到 YARN上面运行
- Flink还可以运行在GCE(谷歌云服务)和EC2(亚马逊云服务)
Core层(Runtime): 在Runtime之上提供了两套核心的API,
- DataStream API(流处理)
- DataSet API(批处理)
APIs & Libraries层: 核心API之上又扩展了一些高阶的库和API
- CEP流处理
- Table API和SQL
- Flink ML机器学习库
- Gelly图计算
中间部分主要内容在上面Flink核心组成中已经提到
输入Connectors(左侧部分)
- 流处理方式:包含Kafka(消息队列)、AWS kinesis(实时数据流服务)、RabbitMQ(消息队 列)、NIFI(数据管道)、Twitter(API)
- 批处理方式:包含HDFS(分布式文件系统)、Hbase(分布式列式数据库)、Amazon S3(文件 系统)、MapR FS(文件系统)、ALLuxio(基于内存分布式文件系统)
输出Connectors(右侧部分)
- 流处理方式:包含Kafka(消息队列)、AWS kinesis(实时数据流服务)、RabbitMQ(消息队 列)、NIFI(数据管道)、Cassandra(NOSQL数据库)、ElasticSearch(全文检索)、HDFS rolling file(滚动文件)
- 批处理方式:包含Hbase(分布式列式数据库)、HDFS(分布式文件系统)
Flink 专注于无限流处理,有限流处理是无限流处理的一种特殊情况
无限流处理:
- 输入的数据没有尽头,像水流一样源源不断
- 数据处理从当前或者过去的某一个时间 点开始,持续不停地进行
有限流处理:
- 从某一个时间点开始处理数据,然后在另一个时间点结束
- 输入数据可能本身是有限的(即输入数据集并不会随着时间增长),也可能出于分析的目的被人为 地设定为有限集(即只分析某一个时间段内的事件)
- Flink封装了DataStream API进行流处理,封装了DataSet API进行批处理。 同时,Flink也是一个批流一体的处理引擎,提供了Table API / SQL统一了批处理和流处理
有状态的流处理应用:
第 6 节 流处理引擎的技术选型市面上的流处理引擎不止Flink一种,其他的比如Storm、SparkStreaming、Trident等,实际应用 时如何进行选型,给一些建议参考
- 流数据要进行状态管理,选择使用Trident、Spark Streaming或者Flink
- 消息投递需要保证At-least-once(至少一次)或者Exactly-once(仅一次)不能选择Storm
- 对于小型独立项目,有低延迟要求,吞吐量不大, 可以选择使用Storm,更简单
- 如果项目已经引入了大框架Spark,实时处理需求可以满足的话,建议直接使用Spark中的Spark Streaming
- 消息投递要满足Exactly-once(仅一次),数据量大、有高吞吐、低延迟要求,要进行状态管理或 窗口统计,建议使用Flink
通过一个单词统计的案例,快速上手应用Flink,进行流处理(Streaming)和批处理(Batch)
第 1 节 单词统计案例(批数据) 1.1 需求统计一个文件中各个单词出现的次数,把统计结果输出到文件
步骤:
1、读取数据源
2、处理数据源
- a、将读到的数据源文件中的每一行根据空格切分
- b、将切分好的每个单词拼接1
- c、根据单词聚合(将相同的单词放在一起)
- d、累加相同的单词(单词后面的1进行累加)
3、保存处理结果
1.2 代码实现引入依赖
4.0.0 com.ch FirstFlink1.0-SNAPSHOT org.apache.flink flink-java1.11.1 org.apache.flink flink-streaming-java_2.121.11.1 provided org.apache.flink flink-clients_2.121.11.1 org.apache.flink flink-scala_2.121.11.1 org.apache.flink flink-streaming-scala_2.121.11.1 provided
Java程序
package com.ch; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCountJavaBatch { public static void main(String[] args) throws Exception { String input = "data/input/data.txt"; String output = "data/output"; //1 获取 flink 的批处理运行环境 ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); //2 读取数据源为 多行数据集 DataSetlines = executionEnvironment.readTextFile(input); //3 处理每一行数据 // 注意下面的 Tuple2 需要 java 的, 而不是 scala FlatMapOperator > wordWithOne = lines.flatMap(new SplitClz()); //4 每个元组根据 第一个字段 聚合分组 UnsortedGrouping > wordWithones = wordWithOne.groupBy(0); //5 聚合分组后的元组, 按第二个字段 进行累加 AggregateOperator > result = wordWithOnes.sum(1); //6 保存处理结果 result.writeAsCsv(output, "n", " "); //7 执行 executionEnvironment.setParallelism(1); executionEnvironment.execute(); } static class SplitClz implements FlatMapFunction > { // s 为每一行, collect 为其中的单词及其计数 public void flatMap(String s, Collector > collector) { //3.1 将读到数据源文件中的每一行根据空格切分 String[] s1 = s.split(" "); //3.2 切分好的 每个单词拼接 1 for (String word : s1) { collector.collect(new Tuple2 (word, 1)); } } } }
scala
import org.apache.flink.api.scala._ object WordCountScalaBatch { def main(args: Array[String]): Unit = { // 路径 val in = "data/input/data.txt" val out = "data/output2" // 获取运行环境 val environment : ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val lines = environment.readTextFile(in) val result : AggregateDataSet[(String, Int)] = lines.flatMap(_.split(" ")) .map((_, 1)) .groupBy(0) .sum(1) result.writeAsCsv(out, "n", " ").setParallelism(1) environment.execute("scala batch process") } }第 2 节 单词统计案例(流数据)
nc
netcat:
2.1 需求Socket模拟实时发送单词,使用Flink实时接收数据,对指定时间窗口内(如5s)的数据进行聚合统计,每隔1s汇总计算一次,并且把时间窗口内计算结果打印出来。
2.2 代码实现scala 版本
import org.apache.flink.streaming.api.scala._ object WordCountScalaStream { def main(args: Array[String]): Unit = { // 处理流式 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val streamdata: DataStream[String] = environment.socketTextStream("192.168.110.128", 7777) val result: DataStream[(String, Int)] = streamData.flatMap((_: String) .split(" ")) .map(((_: String), 1)) .keyBy(0) .sum(1) result.print() environment.execute() } }
Java版
package com.ch; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountJavaStream { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcestreamData = environment.socketTextStream("192.168.110.128", 7777); SingleOutputStreamOperator > result = streamData.flatMap(new FlatMapFunction >() { public void flatMap(String s, Collector > collector) throws Exception { for (String word : s.split(" ")) { collector.collect(new Tuple2 (word, 1)); } } }).keyBy(0).sum(1); result.print(); environment.execute(); } }
Flink程序开发的流程总结如下:
1)获得一个执行环境
2)加载/创建初始化数据
3)指定数据 *** 作的算子
4)指定结果数据存放位置
5)调用execute()触发执行程序
注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序
第三部分 Flink体系结构 第 1 节 Flink的重要角色Flink是非常经典的Master/Slave结构实现,JobManager是Master,TaskManager是Slave。
JobManager处理器(Master)
- 协调分布式执行,它们用来调度task,协调检查点(CheckPoint),协调失败时恢复等
- Flink运行时至少存在一个master处理器,如果配置高可用模式则会存在多个master处理器,它们其中有一个是leader,而其他的都是standby。
JobManager接收的应用包括jar和JobGraph
TaskManager处理器(Slave)也称之为Worker
- 主要职责是从JobManager处接收任务, 并部署和启动任务, 接收上游的数据并处理
- Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点
- TaskManager在启动的时候会向ResourceManager注册自己的资源信息(Slot的数量等)
ResourceManager
- 针对不同的环境和资源提供者,如(YARN,Me搜索,Kubernetes或独立部署),Flink提供了不同的ResourceManager
- 作用:负责管理Flink的处理资源单元---Slot
Dispatcher:
- 作用:提供一个REST接口来让我们提交需要执行的应用。
- 一旦一个应用提交执行,Dispatcher会启动一个JobManager,并将应用转交给他。
- Dispatcher还会启动一个webUI来提供有关作业执行信息
- 注意:某些应用的提交执行的方式,有可能用不到Dispatcher
各个组件之间的关系:
第 2 节 Flink运行架构 2.1 Flink程序结构Flink程序的基本构建块是流和转换(请注意,Flink的DataSet API中使用的DataSet也是内部流)。从概念上讲,流是(可能永无止境的)数据记录流,而转换是将一个或多个流输入,并产生一个或多个输出流。
上图表述了Flink的应用程序结构,有Source(源头)、Transformation(转换)、Sink(接收 器)三个重要组成部分
Source
数据源,定义Flink从哪里加载数据,Flink 在流处理和批处理上的 source 大概有 4 类:基于 本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的source 常见的有 Apache kafka、RabbitMQ 等。
Transformation
数据转换的各种 *** 作,也称之为算子,有 Map / FlatMap / Filter / KeyBy / Reduce / Window等,可以将数据转换计算成你想要的数据。
Sink
接收器,Flink 将转换计算后的数据发送的地点 ,定义了结果数据的输出方向,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、HDFS等。
2.2 Task和SubTaskTask
- 是一个阶段多个功能相同 SubTask 的集合,类似于 Spark 中的 TaskSet。
SubTask(子任务)
- SubTask 是 Flink 中任务最小执行单元,是一个 Java 类的实例,这个 Java 类中有属性和方 法,完成具体的计算逻辑
- 比如一个执行 *** 作map,分布式的场景下会在多个线程中同时执行,每个线程中执行的都叫 做一个SubTask(在2.3节的图中也能够体现)
- 子任务数量受并行度影响
Flink的所有 *** 作都称之为Operator,客户端在提交任务的时候会对Operator进行优化 *** 作,能进 行合并的Operator会被合并为一个Operator,合并后的Operator称为Operator chain,实际上就是一 个执行链,每个执行链会在TaskManager上一个独立的线程中执行。
一个 *** 作器链就是一个 task
shuffle
2.4 Flink中的数据传输在运行过程中,应用中的任务会持续进行数据交换。
为了有效利用网络资源和提高吞吐量,Flink在处理任务间的数据传输过程中,采用了缓冲区机制。
2.5 任务槽和槽共享任务槽也叫做task-slot、槽共享也叫做slot sharing
- 每个TaskManager是一个JVM的进程, 可以在不同的线程中执行一个或多个子任务。
- 为了控制一个worker能接收多少个task。worker通过task slot来进行控制(一个worker至少有一个task slot)
任务槽
每个task slot表示TaskManager拥有资源的一个固定大小的子集。 一般来说:我们分配槽的个 数都是和CPU的核数相等,比如6核,那么就分配6个槽.
Flink将进程的内存进行了划分到多个Slot中。假设一个TaskManager机器有3个slot,那么每 个slot占有1/3的内存(平分)。
内存被划分到不同的slot之后可以获得如下好处:
- TaskManager最多能同时并发执行的任务是可以控制的,那就是3个,因为不能超过slot的数 量
- slot有独占的内存空间,这样在一个TaskManager中可以运行多个不同的作业,作业之间不 受影响
槽共享
默认情况下,Flink允许子任务subtast(map[1] map[2] keyby[1] keyby[2] 共享插槽,即使 它们是不同任务的子任务,只要它们来自同一个作业。结果是一个槽可以保存作业的整个管道。
第四部分 Flink安装和部署Flink支持多种安装模式
local(本地):单机模式,一般本地开发调试使用
StandAlone 独立模式:Flink自带集群,自己管理资源调度,生产环境也会有所应用
Yarn模式:计算资源统一由Hadoop YARN管理,生产环境应用较多
1.1 基础环境jdk1.8及以上【配置JAVA_HOME环境变量】
ssh免密码登录【集群内节点之间免密登录】
1.2 安装包下载配套资料文件夹中提供,使用Flink1.11.1版本
1.3 集群规划 1.4 StandAlone模式部署Step1、Flink安装包上传到hdp-1对应目录并解压
Step2、修改 flink/conf/flink-conf.yaml 文件
jobmanager.rpc.address: hdp-1 # master 的地址 taskmanager.numberOfTaskSlots: 2 # 任务槽数量 默认为 1, 一般 = CPU core size
Step3、修改/conf/masters 和 /conf/slave 以及 zoo.cfg
# masters hdp-1:8081
#slave hdp-1 hdp-2 hdp-3
# zoo.cfg 高可用配置 利用 zookeeper server.1=hdp-1:2888:3888 server.2=hdp-2:2888:3888 server.3=hdp-3:2888:3888
Step4、整个文件夹发送到其他两台机器后, standalone模式启动集群
scp -r flink-1.11.1 hdp-2:$PWD
bin目录下执行
./start-cluster.sh
Step5、jps进程查看核实
3857 TaskManagerRunner
3411 StandaloneSessionClusterEntrypoint
3914 Jps
Step6、查看Flink的web页面 ip:8081/#/overview
Step7、集群模式下运行example测试
./flink run -c WordCountScalaStream -p 2 ../examples/streaming/FirstFlink-1.0-SNAPSHOT.jar
./flink cancel job-id #停止
Step8、查看结果文件
注意:集群搭建完毕后,Flink程序就可以达成Jar,在集群环境下类似于Step7中一样提交执行计算任务
打jar包插件:
1.5 Yarn模式部署 (1)启动一个YARN session(Start a long-running Flink cluster on YARN);org.apache.maven.plugins maven-shade-plugin2.4.3 package shade *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA
启动 yarn-session.sh 后提交任务
需要修改/etc/profile
export HADOOP_CONF_DIR=$HADOOP_HOME export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop export HADOOP_CLASSPATH=`hadoop classpath
配置文件 yarn-site.xml
yarn.nodemanager.pmem-check-enabled false yarn.nodemanager.vmem-check-enabled false yarn.resourcemanager.address hdp-2:8032 yarn.resourcemanager.scheduler.address hdp-2:8030 yarn.resourcemanager.resource-tracker.address hdp-2:8031
注意:yarn-site的修改需要在集群的每一台机器上执行
启动hadoop (hdfs-yarn)
yarn-session.sh -h
/export/servers/flink/bin/yarn-session.sh -s 1 -jm 1024 -tm 1024m
# -s 表示每个TaskManager的slots数量
# -jm 表示每个 JobManager 的内存大小
# -tm 表示每个TaskManager的内存大小
点击 applicationmaster 后
通过命令在 yarn 上启动一个任务
./flink run -c WordCountScalaStream -yid application_xxxx_xxxx .../myjars/FirstFlink-1.0-SNAPSHOT.jar
# yid 为 yarn 的 application-id
解释
上面的命令的意思是,同时向Yarn申请1个container
(即便只申请了1个,因为ApplicationMaster和Job Manager有一个额外的容器。一旦将Flink部署到YARN群集中,它就会显示Job Manager的连接详细信息)
1 个 Container 启动 TaskManager -n 1,每个 TaskManager 拥有1个 Task Slot -s 1,并且向每个TaskManager 的 Container 申请 1024m 的内存,以及一个ApplicationMaster--Job Manager。
如果不想让Flink YARN客户端始终运行,那么也可以启动分离的 YARN会话。该参数被称为-d或--detached。在这种情况下,Flink YARN客户端只会将Flink提交给集群,然后关闭它自己
yarn-session.sh(开辟资源) + flink run(提交任务)
- 使用Flink中的yarn-session(yarn客户端),会启动两个必要服务JobManager和TaskManager
- 客户端通过flink run提交作业
- yarn-session会一直启动,不停地接收客户端提交的作业
- 这种方式创建的Flink集群会独占资源。
- 如果有大量的小作业/任务比较小,或者工作时间短,适合使用这种方式,减少资源创建的时间.
(2)直接在YARN上提交运行Flink作业(Run a Flink job on YARN)bin/flink run -m yarn-cluster -ynm flink-job -c WordCountScalaStream ..../../../examples/batch/FirstFlink-1.0-SNAPSHOT.jar
# -m jobmanager的地址
# -ynm job name
# -c 启动类
停止yarn-cluster:
yarn application -kill application_xxxx_xxxx
rm -rf /tmp/.yarn-properties-root
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)