NiFi 实战

NiFi 实战,第1张

local文件系统中, /opt/nifiData/raw 下批量文件:

Nifi 文本格式转换 csv->json

数据转换时,由于数据量过大,导致指告内唯宽明存溢出,nifi无法正常工作; 且无法重启!

由于NiFi在启动时会自动执行未完成任务,于是内存溢出问题还是存在,且无法进入页面停止任务;

清空缓存数据, 临时重命名数据路径,防止NiFi自动执行任务内存溢出;

The first tab in the Processor Configuration dialog is the Settings tab:

<u>The name of the Processor</u>. The name of a Processor by default is the same as the Processor type.

Next to the Processor Name is a checkbox, indicating whether the Processor is Enabled.The disabled state is used to indicate that when a group of Processors is started, such as when a DFM starts an entire Process Group, <u>this (disabled) Processor should be excluded</u>.

ID

Processor’s unique identifier is displayed along with the Processor’s type and NAR bundle. These values cannot be modified.

During the normal course of processing a piece of data (a FlowFile), an event may occur that indicates that <u>the data cannot be processed at this time but the data may be processable at a later time</u>. When this occurs, the Processor may choose to Penalize the FlowFile. <u>This will prevent the FlowFile from being Processed for some period of time.</u>巧此 For example , if the Processor is to push the data to a remote service, but the remote service already has a file with the same name as the filename that the Processor is specifying, the Processor may penalize the FlowFile. The 'Penalty Duration' allows the DFM to specify how long the FlowFile should be penalized. The default value is 30 seconds.

Similarly, the Processor may determine that some situation exists such that <u>the Processor can no longer make any progress</u>, regardless of the data that it is processing. For example , if a Processor is to push data to a remote service and that service is not responding, the Processor cannot make any progress. As a result, the Processor should 'yield', which will prevent the Processor from being scheduled to run for some period of time. That period of time is specified by setting the 'Yield Duration'. The default value is 1 second.

Whenever the Processor writes to its <u>log </u>, the Processor also will generate a Bulletin. This setting indicates the lowest level of Bulletin that should be <u>shown in the User Interface.</u> By default, the Bulletin level is set to WARN, which means it will display all warning and error-level bulletins.

Each of the Relationships that is defined by the Processor is listed here, along with its description. In order for a Processor to be considered valid and able to run, each <u> Relationship defined by the Processor must be either connected to a downstream component or auto-terminated.</u>** If a Relationship is <u>auto-terminated</u>, any FlowFile that is routed to that Relationship will <u>be removed from the flow</u>and its processing considered complete. <u>Any Relationship that is already connected to a downstream component cannot be auto-terminated.</u>The Relationship must first be removed from any Connection that uses it. Additionally, for any Relationship that is selected to be auto-terminated, the auto-termination status will be cleared (turned off) if the Relationship is added to a Connection.

There are three possible options for scheduling components:

Timer driven : This is the default mode. The Processor will be scheduled to run on a regular interval. The interval at which the Processor is run is defined by the 'Run Schedule' option (see below).

Event driven : When this mode is selected, the Processor will be triggered to run by an event, and that event occurs when FlowFiles enter Connections feeding this Processor. This mode is currently considered experimental and is not supported by all Processors. When this mode is selected, the 'Run Schedule' option is not configurable, as the Processor is not triggered to run periodically but as the result of an event. Additionally, this is the only mode for which the 'Concurrent Tasks' option can be set to 0. In this case, the number of threads is limited only by the size of the Event-Driven Thread Pool that the administrator has configured.

CRON driven : When using the CRON driven scheduling mode, the Processor is scheduled to run periodically, similar to the Timer driven scheduling mode. However, the CRON driven mode provides significantly more flexibility at the expense of increasing the complexity of the configuration. The CRON driven scheduling value is a string of <u> six required fields and one optional field, each separated by a space. </u>These fields are:

You typically specify values one of the following ways:

You should also be aware of several valid special characters:

For example:

For additional information and examples, see the Chron Trigger Tutorial in the Quartz documentation.

This controls <u>how many threads the Processor will use</u> . Said a different way, this controls <u>how many FlowFiles should be processed by this Processor at the same time </u>. Increasing this value will typically allow the Processor to handle more data in the same amount of time. However, it does this by using system resources that then are not usable by other Processors. <u> This essentially provides a relative weighting of Processors</u>** — it controls how much of the system’s resources should be allocated to this Processor instead of other Processors. This field is available for most Processors. There are, however, some types of Processors that can only be scheduled with a single Concurrent task.

Dictates how often the Processor should be scheduled to run. The valid values for this field depend on the selected Scheduling Strategy (see above).

If using the Event driven Scheduling Strategy, this field is not available .

When using the Timer driven Scheduling Strategy, this value is a time duration specified by a number followed by a time unit. For example, 1 second or 5 mins . <u> The default value of 0 sec means that the Processor should run as often as possible as long as it has data to process.</u>** This is true for any time duration of 0, regardless of the time unit (i.e., 0 sec , 0 mins , 0 days ). For an explanation of values that are applicable for the CRON driven Scheduling Strategy, see the description of the CRON driven Scheduling Strategy itself.

When configured for clustering, an Execution setting will be available. This setting is used to determine which node(s) the Processor will be scheduled to execute. Selecting 'All Nodes' will result in this Processor being scheduled on every node in the cluster. <u>Selecting 'Primary Node' will result in this Processor being scheduled on the Primary Node only </u>. Processors that have been configured for 'Primary Node' execution are identified by a "P" next to the processor icon.

This controls how long the Processor should be scheduled to run each time that it is triggered. On the left-hand side of the slider, it is marked ' <u> Lower latency</u> ' while the right-hand side is marked ' <u>Higher throughput </u> '. <u> When a Processor finishes running, it must update the repository in order to transfer the FlowFiles to the next Connection. Updating the repository is expensive, so the more work that can be done at once before updating the repository, the more work the Processor can handle (Higher throughput).</u>However, this means that the next Processor cannot start processing those FlowFiles until the previous Process updates this repository. As a result, the latency will be longer (the time required to process the FlowFile from beginning to end will be longer). As a result, the slider provides a spectrum from which the DFM can choose to favor Lower Latency or Higher Throughput.

一、离线数据同步

DataX

阿里的Datax是比较优秀的产品,基于python,提供各种数据村塾的读写插件,多线程执行,使用起来也很简单数锋皮, *** 作简单通常只需要两步;

创建作业的配置文件(json格式配置reader,writer);

启动执行配置作业。

非常适合离线数据,增量数据可以使用一些编码的方式实现,

缺点:仅仅针对insert数据比较有效,update数据就不适合。缺乏对增量更新的内置支持,因为DataX的灵活架构,可以通过shell脚本等方式方便实现增量同步。

参考资料:

github地址:https://github.com/alibaba/DataX

dataX3.0介绍:https://www.jianshu.com/p/65c440f9bce1

datax初体验:https://www.imooc.com/article/15640

文档:https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md

Sqoop

Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

地址:http://sqoop.apache.org/

Sqoop导入:导入工具从RDBMS到HDFS导入单个表。表中的基蠢每一行被视为HDFS的记录。所有记录被存储在文本文件的文本数据或者在Avro和序列文件的二进制数据。

Sqoop导出:导出工具从HDFS导出一组文件到一个RDBMS。作为输入到Sqoop文件包含记录,这被称为在表中的行。那些被读取并解析成一组记录和分隔使用用户指定的分隔符。

Sqoop支持全量数据导入和增量数据导入(增量数据导入分两种,一是基于递增列的增量数据导入(Append方式)。二是基于时间列的增量数据导入(LastModified方式)),同时可以指定数据是否以并发形式导入。

Kettle

Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。

Kettle的Spoon有丰富的Steps可以组装开发出满足多种复杂应用场景的数据集成作业,方便实现全量、增量数据同步。缺点是通过定时运行,实时性相对较差。

NiFi

Apache NiFi 是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统,用于自动化管理系统间的数据流。它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。

NiFi基于Web方式工作,后台在服务器上进行调度。 用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。

几个核心概念:

Nifi 的设计理念接近于基于流的编程 Flow Based Programming。

FlowFile:表示通过系统移动的每个对象,包含数据流的基本属性

FlowFile Processor(处理器):负责实际对数据流执行工作

Connection(连接线):负责不同处理器之间的连接,是数据的有界缓冲区

Flow Controller(流量控制器):管理进程使用的线程及其分配

Process Group(过程组):进程组是一组特定的进程及其连接,允许组合其他组件创建新组件

参考资料

Nifi简介及核心概念整理

官方网站:http://nifi.apache.org/index.html

二、实时数据同步

实时同步最灵活的还是用薯差kafka做中间转发,当数据发生变化时,记录变化到kafka,需要同步数据的程序订阅消息即可,需要研发编码支持。这里说个mysql数据库的同步组件,阿里的canal和otter

canal

https://github.com/alibaba/canal

数据抽取简单的来说,就是将一个表的数据提取到另一个表中。有很多的ETL工具可以帮助我们来进行数据的抽取和转换,ETL工具能进行一次性或者定时作业抽取数据,不过canal作为阿里巴巴提供的开源的数据抽取项目,能够做到实时抽取,原理就是伪装成mysql从节点,读取mysql的binlog,生成消息,客户端订阅这些数据变更消息,处理并存储。下面我们来一起搭建一下canal服务

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基于日志增量订阅&消费支持的业务:

数据库镜像

数据库实时备份

多级索引 (卖家和买家各自分库索引)

search build

业务cache刷新

价格变化等重要业务消息

otter

https://github.com/alibaba/otter

otter是在canal基础上又重新实现了可配置的消费者,使用otter的话,刚才说过的消费者就不需要写了,而otter提供了一个web界面,可以自定义同步任务及map表。非常适合mysql库之间的同步。

另外:otter已在阿里云推出商业化版本 数据传输服务DTS, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的同步高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/tougao/12142482.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-21
下一篇 2023-05-21

发表评论

登录后才能评论

评论列表(0条)

保存