NiFi 实战

NiFi 实战,第1张

local文件系统中, /opt/nifiData/raw 下批量文件:

Nifi 文本格式转换 csv->json

数据转换时,由于数据量过大,导致指告内唯宽明存溢出,nifi无法正常工作; 且无法重启!


清空缓存数据, 临时重命名数据路径,防止NiFi自动执行任务内存溢出;

The first tab in the Processor Configuration dialog is the Settings tab:

<u>The name of the Processor</u>. The name of a Processor by default is the same as the Processor type.

Next to the Processor Name is a checkbox, indicating whether the Processor is Enabled.The disabled state is used to indicate that when a group of Processors is started, such as when a DFM starts an entire Process Group, <u>this (disabled) Processor should be excluded</u>.


Processor’s unique identifier is displayed along with the Processor’s type and NAR bundle. These values cannot be modified.

During the normal course of processing a piece of data (a FlowFile), an event may occur that indicates that <u>the data cannot be processed at this time but the data may be processable at a later time</u>. When this occurs, the Processor may choose to Penalize the FlowFile. <u>This will prevent the FlowFile from being Processed for some period of time.</u>巧此 For example , if the Processor is to push the data to a remote service, but the remote service already has a file with the same name as the filename that the Processor is specifying, the Processor may penalize the FlowFile. The 'Penalty Duration' allows the DFM to specify how long the FlowFile should be penalized. The default value is 30 seconds.

Similarly, the Processor may determine that some situation exists such that <u>the Processor can no longer make any progress</u>, regardless of the data that it is processing. For example , if a Processor is to push data to a remote service and that service is not responding, the Processor cannot make any progress. As a result, the Processor should 'yield', which will prevent the Processor from being scheduled to run for some period of time. That period of time is specified by setting the 'Yield Duration'. The default value is 1 second.

Whenever the Processor writes to its <u>log </u>, the Processor also will generate a Bulletin. This setting indicates the lowest level of Bulletin that should be <u>shown in the User Interface.</u> By default, the Bulletin level is set to WARN, which means it will display all warning and error-level bulletins.

Each of the Relationships that is defined by the Processor is listed here, along with its description. In order for a Processor to be considered valid and able to run, each <u> Relationship defined by the Processor must be either connected to a downstream component or auto-terminated.</u>** If a Relationship is <u>auto-terminated</u>, any FlowFile that is routed to that Relationship will <u>be removed from the flow</u>and its processing considered complete. <u>Any Relationship that is already connected to a downstream component cannot be auto-terminated.</u>The Relationship must first be removed from any Connection that uses it. Additionally, for any Relationship that is selected to be auto-terminated, the auto-termination status will be cleared (turned off) if the Relationship is added to a Connection.

There are three possible options for scheduling components:

Timer driven : This is the default mode. The Processor will be scheduled to run on a regular interval. The interval at which the Processor is run is defined by the 'Run Schedule' option (see below).

Event driven : When this mode is selected, the Processor will be triggered to run by an event, and that event occurs when FlowFiles enter Connections feeding this Processor. This mode is currently considered experimental and is not supported by all Processors. When this mode is selected, the 'Run Schedule' option is not configurable, as the Processor is not triggered to run periodically but as the result of an event. Additionally, this is the only mode for which the 'Concurrent Tasks' option can be set to 0. In this case, the number of threads is limited only by the size of the Event-Driven Thread Pool that the administrator has configured.

CRON driven : When using the CRON driven scheduling mode, the Processor is scheduled to run periodically, similar to the Timer driven scheduling mode. However, the CRON driven mode provides significantly more flexibility at the expense of increasing the complexity of the configuration. The CRON driven scheduling value is a string of <u> six required fields and one optional field, each separated by a space. </u>These fields are:

You typically specify values one of the following ways:

You should also be aware of several valid special characters:

For example:

For additional information and examples, see the Chron Trigger Tutorial in the Quartz documentation.

This controls <u>how many threads the Processor will use</u> . Said a different way, this controls <u>how many FlowFiles should be processed by this Processor at the same time </u>. Increasing this value will typically allow the Processor to handle more data in the same amount of time. However, it does this by using system resources that then are not usable by other Processors. <u> This essentially provides a relative weighting of Processors</u>** — it controls how much of the system’s resources should be allocated to this Processor instead of other Processors. This field is available for most Processors. There are, however, some types of Processors that can only be scheduled with a single Concurrent task.

Dictates how often the Processor should be scheduled to run. The valid values for this field depend on the selected Scheduling Strategy (see above).

If using the Event driven Scheduling Strategy, this field is not available .

When using the Timer driven Scheduling Strategy, this value is a time duration specified by a number followed by a time unit. For example, 1 second or 5 mins . <u> The default value of 0 sec means that the Processor should run as often as possible as long as it has data to process.</u>** This is true for any time duration of 0, regardless of the time unit (i.e., 0 sec , 0 mins , 0 days ). For an explanation of values that are applicable for the CRON driven Scheduling Strategy, see the description of the CRON driven Scheduling Strategy itself.

When configured for clustering, an Execution setting will be available. This setting is used to determine which node(s) the Processor will be scheduled to execute. Selecting 'All Nodes' will result in this Processor being scheduled on every node in the cluster. <u>Selecting 'Primary Node' will result in this Processor being scheduled on the Primary Node only </u>. Processors that have been configured for 'Primary Node' execution are identified by a "P" next to the processor icon.

This controls how long the Processor should be scheduled to run each time that it is triggered. On the left-hand side of the slider, it is marked ' <u> Lower latency</u> ' while the right-hand side is marked ' <u>Higher throughput </u> '. <u> When a Processor finishes running, it must update the repository in order to transfer the FlowFiles to the next Connection. Updating the repository is expensive, so the more work that can be done at once before updating the repository, the more work the Processor can handle (Higher throughput).</u>However, this means that the next Processor cannot start processing those FlowFiles until the previous Process updates this repository. As a result, the latency will be longer (the time required to process the FlowFile from beginning to end will be longer). As a result, the slider provides a spectrum from which the DFM can choose to favor Lower Latency or Higher Throughput.



阿里的Datax是比较优秀的产品,基于python,提供各种数据村塾的读写插件,多线程执行,使用起来也很简单数锋皮, *** 作简单通常只需要两步;











Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。









Apache NiFi 是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统,用于自动化管理系统间的数据流。它支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。

NiFi基于Web方式工作,后台在服务器上进行调度。 用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。


Nifi 的设计理念接近于基于流的编程 Flow Based Programming。


FlowFile Processor(处理器):负责实际对数据流执行工作


Flow Controller(流量控制器):管理进程使用的线程及其分配

Process Group(过程组):进程组是一组特定的进程及其连接,允许组合其他组件创建新组件









ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)




多级索引 (卖家和买家各自分库索引)

search build





另外:otter已在阿里云推出商业化版本 数据传输服务DTS, 开通即用,免去部署维护的昂贵使用成本。DTS针对阿里云RDS、DRDS等产品进行了适配,解决了Binlog日志回收,主备切换、VPC网络切换等场景下的同步高可用问题。同时,针对RDS进行了针对性的性能优化。出于稳定性、性能及成本的考虑,强烈推荐阿里云用户使用DTS产品。



打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-21
下一篇 2023-05-21



