Flink Metrics

Flink Metrics,第1张

Flink 提供了 Metric 系统,允许收集 Metric 并暴露给外部系统。

可以通过任何继承了 RichFunction 的函数访问 Metric 系统。调用 getRuntionContext()getMetricGroup() 方法,该方法返回一个 MetricGroup 对象,可以创建并注册 Metric。

Counter 用来计数。当前值可以使用 inc() / inc(long n) 或 dec() / dec(long n) 进行增减。

Gauge 根据需要提供任何类型的值。需要先创建一个实现 orgapacheflinkmetricsGauge 的类,返回值的类形没有限制。

Report 程序在暴露数据给外部系统时,会把对象转换为字符串,这意味着需要一个有意义的 toString() 实现。

Histogram 统计值的分布。

Flink 没有提供 Histogram 的默认实现,可以添加依赖使用 DropwizardHistogramWrapper 实现

Meter 用来统计平均吞吐量。

同样添加 flink-metrics-dropwizard 依赖,可以使用 DropwizardMeterWrapper 实现

每个 Metric 都会分配一个标识符和一组键值对,用来报告 Metric。

标识符基于3个组成部分:注册时的用户定义名称、可选的用户定义 Scope 和系统提供的 Scope。例如,如果 AB 是系统 Scope,CD 是用户 Scope,E 是名称,那么标识符将是 ABCDE。

可以通过在 conf/flink-confyaml 中设置 metricsscopedelimiter 键来配置用于标识符的分隔符(默认值:)。

定义 User Scope 的方法: 调用 MetricGroup#addGroup(String name) , MetricGroup#addGroup(int name) , MetricGroup#addGroup(String key, String value) 。这些方法会影响 MetricGroup#getMetricIdentifier 和 MetricGroup#getScopeComponents 的返回值。

System Scope 包含 Metric 的上下文信息,例如注册在哪个 Task(<task_name>)或属于哪个 Job(<job_name>)。

应该包含哪些上下文信息可以通过 conf/flink-confyaml 配置。

<host> | <job_name> | <tm_id> | <task_name> | <operator_name> | <subtask_index> 可以作为变量使用。变量的数量或顺序没有限制,区分大小写。

例如:Operator Metric 的默认 Scope 格式为 <host>taskmanager<tm_id><job_name><operator_name><subtask_index> ,生成的标识符类似 localhosttaskmanager1234MyJobMyOperator0MyMetric 的形式;如果希望包含 Task 名称,并且忽略 TaskManager 信息,可以设置 metricsscopeoperator: <host><job_name><task_name><operator_name><subtask_index> ,生成的标识符会变成 localhostMyJobMySource_->_MyOperatorMyOperator0MyMetric 。

建议添加带有 ID 的变量(如:<job_id>)保证唯一性,避免出现命名冲突的问题。所有可以使用的变量:

Flink 允许向外部系统报告 Metric。

通过在 conf/flink-confyaml 中配置一个或多个 Reporter,可以将 Metric 暴露给外部系统。这些 Reporter 在启动时实例化。

Reporter 必须至少配置 class 或 factoryclass 属性(使用哪个取决于 Reporter 的实现)。

配置 Reporter 示例

自定义 Reporter:

下面列出了一些支持的 Reporter

orgapacheflinkmetricsjmxJMXReporter

参数:

通过 JMX 公开的 Metric 由一个 domain 和一组 key 属性组成标识。domain 总是以 orgapacheflink 开始,接一个通用 metric 标识(与一般的 metric 标识不同,不受 scope 格式的影响,不包含任何变量),例如:orgapacheflinkjobtasknumBytesOut。

key 属性列表包含与给定 Metric 关联的所有变量的值(不受 scope 格式影响)。例如: host=localhost,job_name=MyJob,task_name=MyTask 。

orgapacheflinkmetricsprometheusPrometheusReporter

参数:

Flink Metric 类型和 Prometheus Metric 类型映射

orgapacheflinkmetricsprometheusPrometheusPushGatewayReporter

参数

PrometheusPushGatewayReporter 将 Metric 推到 Pushgateway

默认情况下,Flink 收集的指标

代替 Network/IO 部分 Metrics

如果启用了 Reactive Mode (113 MVP 特性),这些 Metric(除 numRestarts)不能正常工作。

如果启用了 Reactive Mode (113 MVP 特性),Job Scope 的 Metric 不能正常工作。

Flink 允许跟踪在系统中传输的记录的延迟。默认情况下禁用此功能。要启用延迟跟踪,必须在 Flink 配置( conf/flink-confyaml )或 ExecutionConfig 中将 latencyTrackingInterval 设置为正数。

Source 会定期(latencyTrackingInterval)发出一个特殊的记录,称为 LatencyMarker。记录包含一个时间戳,该时间戳从记录在源处发出时算起。LatencyMarker 不能超过(overtake)正常记录,因此如果正常记录在 Operator 前排队,将增加标记跟踪的延迟。

延迟监控的粒度,分为以下3档:

需要注意:

Metrics 可以通过 REST API 查询。下面列出一些可用的 Endpoint 和 JSON 返回格式。

Base URL: >

看了 BucketSink 的相关源码。着重看了它的checkpoint以及故障恢复机制。

把大概的理解梳理如下:

BucketSink 大体的工作流程:

1新建一个文件,不断的写入文件中,后缀命名为 in-progress

2判断文件写入完毕,关闭该文件时,后缀名命名为 pending

3checkpoint触发时,将上次ck到这次ck间的所有 pending 文件变为 finish 状态

BucketSink 实现了 CheckpointedFunction 接口

有两个方法

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;�其中:

initializeState 方法在每次新建 BucketSink 或者故障恢复时 会调用。

snapshotState 在每次触发 ck 时会被调用。

下面简单分析下这两个方法的逻辑:

initializeState 方法主要执行一些初始化 *** 作,其中我认为关键的在于

restoredBucketStates = stateStoregetSerializableListState("bucket-states");

该方法获取一个叫做 bucket-states 的状态对象,从名称也可知,该对象用于重启。正常情况下,该对象无内容下面的for语句不会执行。但是若有故障重启的情况,则会从上次的ck中读取出内容,也就是上次ck的状态信息,然后执行回滚 *** 作保证数据的一致性。这一点最后再做介绍。

snapshotState 方法用于触发 ck *** 作。

这个方法做了如下几件事

1获取当前正在写的 pending 文件的大小,以便若下次 ck 前发生故障,可以获知本次ck时,该文件的大小,以便删除本次ck后到故障发生时写入的数据,或者显示该文件的有效数据大小。

2将所有 pending 状态的文件存储到list中,稍后ck结束后,方便修改其状态为 finish

3将当前状态存入 restoredBucketStates 对象,以便若下次 ck 前发生故障,可以从这个状态处进行恢复。

同时,BucketSink也实现了 CheckpointListener 接口

void notifyCheckpointComplete(long checkpointId) throws Exception;

该方法会在 ck 完成后调用。

该方法,将 pending 文件的状态转为 final 状态

并且移除writer已经处于close状态的bucket。

最后详细说一下故障恢复。

当程序因故障自动恢复时,initializeState 方法的 restoredBucketStates 就会从上次 ck 中获取到上次ck时的状态。进而进行恢复。

首先,将 pending 状态的文件名列表清空即可,因为将 pending 状态转为 finish 状态,可以在 notifyCheckpointComplete 方法中完成。故障恢复时,该方法对 pending 的文件的做法是不做处理,等待故障恢复之后,第一次ck触发时,便会自动的将 pending 的文件变为 finish 状态。

而之所以不处理 pending 状态文件,是因为 pending 状态文件说明该文件已经写入完毕,就差ck成功后修改文件状态(也就是文件名)而已,本质上,该文件已经不再写入数据,没有数据的变化。

接下来 handlePendingInProgressFile 就是处理 in-progress 状态的文件。

我们设想一下,故障重启是指在上次成功的ck之后,下次ck之前,发生了故障,然后应用自动重启,使用的是上次成功的ck的状态信息。

这样的话,上次 ck 时状态为 in-progress 的文件,可能在故障发生时,已经处于 pending 状态,也就是写完的状态,也可能仍然处于 in-progress 状态。

flink的做法是,不管处于什么状态 首先全部标注为 finish 状态。然后根据上次ck时状态中存储的文件的大小进行截断,这样,该文件就能回滚到上次ck成功时的状态。若 Hadoop 版本不支持截断 *** 作,则新建一个后缀为 valid-length 的文件,内容为文件的大小,单位 byte。

然后flink就可以从上次ck处重新拉取数据源,继续处理,写入sink。

最后,调用 handlePendingFilesForPreviousCheckpoints 将上次ck成功后,若故障发生的很快,没来得及调用 CheckpointListener 的 notifyCheckpointComplete 方法,则此处将文件状态置为 finish 。

BucketSink 是一个控制类,具体的写入 *** 作可以自己实现 orgapacheflinkstreamingconnectorsfsWriter 接口。

其中 snappy 等压缩文件的追加,可以使用

Fsappend 的方式追加内容到同一文件中

备注:为了防止发生权限错误

Flink默认包含两种配置方式:log4j以及logback

不配置的情况下运行flink集群或者运行flink job会提示建议移除其中一种。

直接移除或者重命名都可行。

例如:mv logbackxml logbackxml_bak

浏览器中访问node1:8081

[1] 第一种方式:yarn-sessionsh(开辟资源)+flink run(提交任务)

启动一个一直运行的flink集群

请注意:

<p style="color:red">

请注意:client必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。

经实验发现,其实如果配置的有HADOOP_HOME环境变量的话也是可以的(只是会出现警告)。HADOOP_HOME ,YARN_CONF_DIR,HADOOP_CONF_DIR 只要配置的有任何一个即可。

</p>

运行结果如图:

浏览器中访问 >

1流式计算分为无状态和有状态两种情况。 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子。

(1)所有类型的窗口。例如,计算过去一小时的平均水位,就是有状态的计算。

(2)所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,这是有状态的计算。

(3)流与流之间的所有关联 *** 作,以及流与静态表或动态表之间的关联 *** 作,都是有状态的计算。

2下图展示了无状态流处理和有状态流处理的主要区别。 无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。

上图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)。有状态 流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。

3有状态的算子和应用程序

Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。

在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:

算子状态(operator state)

键控状态(keyed state)

4算子状态(operator state)

算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。

Flink为算子状态提供三种基本数据结构:

列表状态(List state):将状态表示为一组数据的列表。

联合列表状态(Union list state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。

广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态

5键控状态(keyed state)

键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。

6状态后端(state backend)

每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)

状态后端主要负责两件事:

1)本地的状态管理

2)将检查点(checkpoint)状态写入远程存储

状态后端分类:

(1)MemoryStateBackend

内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。

(2)FsStateBackend

将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。

(3)RocksDBStateBackend

将所有状态序列化后,存入本地的RocksDB中存储。

7状态一致性

当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?

1)一致性级别

在流处理中,一致性可以分为3个级别:

(1) at-most-once : 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。同样的还有udp。

(2) at-least-once : 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。

(3) exactly-once : 这指的是系统保证在发生故障后得到的计数结果与正确值一致。

曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二。

(1)保证exactly-once的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性。

(2)流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补(例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。

最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。Flink避免了这种权衡。

Flink的一个重大价值在于, 它既保证了 exactly-once ,也具有低延迟和高吞吐的处理能力 。

从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。尽管这在外行看来很神奇,但是一旦了解,就会恍然大悟。

2)端到端(end-to-end)状态一致性

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:

1)source端 —— 需要外部源可重设数据的读取位置

2)link内部 —— 依赖checkpoint

3)sink端 —— 需要保证从故障恢复时,数据不会重复写入外部系统

而对于sink端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。

4)幂等写入

所谓幂等 *** 作,是说一个 *** 作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。

5)事务写入

需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。

对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。

8检查点(checkpoint)

Flink具体如何保证exactly-once呢 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。

9Flink+Kafka如何实现端到端的exactly-once语义

我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?

1)内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性

2)source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性

3)sink —— kafka producer作为sink,采用两阶段提交 sink,需要实现一个TwoPhaseCommitSinkFunction

内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。

我们知道Flink由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。

当checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。

每个算子会对当前的状态做个快照,保存到状态后端。对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。

每个内部的transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。

sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务。

当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。

当sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。

所以我们看到,执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink *** 作,会发起“确认提交”,如果执行失败,预提交会放弃掉。

具体的两阶段提交步骤总结如下:

1)第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”

2)触发 checkpoint *** 作,barrier从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知jobmanager

3)sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据

4)jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成

5)sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据

6)外部kafka关闭事务,提交的数据可以正常消费了。

        DT时代,人们对于数据的要求越来越严格,从开始的 大数据 到现在慢慢转变成 快数据 ,我们很多公司都是基于Hadoop生态搭建自己的数据仓库,将不同源的数据按照一定的 周期 (时/天等)通过 ETL (提取,转换,加载)放到我们的数仓以供分析师使用,但是随着业务发展,我们不得不面对一个事实,我们上述做的都属于批处理,我们的分析师或者我们的业务需要实时的数据,那么在批处理转到流计算的时候,我们会面临很多很多问题,例如低延时、高吞吐、exactly-once、无序问题等等。Storm 实现了低延迟,还做不到高吞吐,也不能在故障发生时准确地处理计算状态;Spark Streaming通过采用微批处理方法实现了高吞吐和容错性,但是牺牲了低延迟和实时处理能力,也不能使窗口与自然时间相匹配,并且表现力欠佳。而flink就是目前为止的最佳答案。

        我们在选择一个新的技术框架的时候,首先考虑的是他的应用场景,再牛逼的框架没有应用场景也是一无是处,当然牛逼的框架大多都是基于某一个或者某一类应用场景而产生,而flink主要应用于以下三个场景:

        1事件驱动型应用

        2数据分析型应用

        3数据管道 ETL  

什么是事情驱动型应用?

定义:事件驱动型应用是一类具有状态的应用,该应用会根据事件流中的 事件 触发 计算 、 更新状态 或 进行外部系统 *** 作 。

关键词: 事件 ---->状态---->外部系统

每条数据(事件)触发变化

例如:金融反欺诈,实施推荐,实施规则报警

在说这个之前,先说一下什么是分析,我们从事数据分析相关行业,有时候经常忘记分析本身到底是什么,下边是维基百科对于分析的定义

看到定义之后,再看我们工作中经常对各种数据按照不同维度拆分来分析数据代表的现象,来更好的理解数据,这是我们做数据分析的本质。

那么定义首先:

数据分析型应用是从原始 数据 中 提取 有价值的信息和指标,关键词:原始数据(集)、提取(过滤分析)

它的主要应用在于对数据集进行 *** 作,重在分析

典行的数据分析型应用比如今年的疫情,我们会统计每天每地上传的信息,然后展示在包括支付宝等平台。

那事件驱动型应用和数据分析型应用有何本质区别?

简单总结一下:

                数据触发计算会派发新的动作(状态/消息)

                数据只是分析不派生新的动作(只是输出结果)

        看到过很多大咖分享自己对于数据仓库ETL的看法,自己也做了一些数据仓库的工作,但是从来没有认真总结过,会在下一篇文章总结一下我对ETL的认知,也会谈一下最近新兴起的数据湖的看法。

以上

以上就是关于Flink Metrics全部的内容,包括:Flink Metrics、2020-10-31-Flink-7(流处理基础)、Flink从BucketSink看checkpoint与故障恢复等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9821688.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-02
下一篇 2023-05-02

发表评论

登录后才能评论

评论列表(0条)

保存