使用Apache NiFi实时分发企业主数据到下游业务系统时,下游系统有MySQL、PostgreSQL、Oracle等业务系统,其中NiFi不直接支持Oracle Upsert语义,导致产品、物料等主数据大量更新时,通过insert-error-update的方式分发Orace数据给下游,性能不足。故计划通过自定义Processor,通过借助Oracle 内置的Merge into语法实现Upsert功能。
本文先讲解Apache NiFi 自定义开发Processor环境搭建流程,后续文章讲解通过分别重写ConvertJsonToSQL或者PutDatabaseRecord的方式实现NiFi支持Oracle内置的Merage into功能。
自定义Processor流程体验本文通过Mac笔记本演示开发流程,如果您使用的是Window电脑,请自行百度,完成环境准备
小节介绍内容。
搭建NiFi开发环境需要完成一下准备工作:
- Mac电脑
- Java
(base) dszhao@IT00039346 / % java -version
java version "1.8.0_251"
Java(TM) SE Runtime Environment (build 1.8.0_251-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.251-b08, mixed mode)
(base) dszhao@IT00039346 / %
- maven
(base) dszhao@IT00039346 / % mvn -version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /Users/dszhao/software/apache-maven-3.6.3
Java version: 1.8.0_251, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_251.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.15.7", arch: "x86_64", family: "mac"
- Git
- idea
- NiFi
本文安装NiFi最新1.15.3版本
# https://www.apache.org/dyn/closer.lua?path=/nifi/1.15.3/nifi-1.15.3-bin.tar.gz
# 下载NiFi
# 本机安装并正常运行,详细过程略
初始化NiFi maven 工程
# 创建开发目录
(base) dszhao@IT00039346 source % mkdir nifi-processor
(base) dszhao@IT00039346 source % cd nifi-processor
# s
(base) dszhao@IT00039346 nifi-processor % mvn archetype:generate
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): : nifi
Choose archetype:
1: remote -> org.apache.nifi:nifi-processor-bundle-archetype (-)
2: remote -> org.apache.nifi:nifi-service-bundle-archetype (-)
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): :
Choose a number or apply filter (format: [groupId:]artifactId, case sensitive contains): : 1
Choose org.apache.nifi:nifi-processor-bundle-archetype version:
1: 0.0.2-incubating
2: 0.1.0-incubating
3: 0.2.0-incubating
4: 0.2.1
5: 0.3.0
6: 0.4.0
7: 0.4.1
8: 0.5.0
9: 0.5.1
10: 0.6.0
11: 0.6.1
12: 0.7.0
13: 0.7.1
14: 0.7.2
15: 0.7.3
16: 0.7.4
17: 1.0.0-BETA
18: 1.0.0
19: 1.0.1
20: 1.1.0
21: 1.1.1
22: 1.1.2
23: 1.2.0
24: 1.3.0
25: 1.4.0
Choose a number: 25:
Downloading from cvte: http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi-processor-bundle-archetype/1.4.0/nifi-processor-bundle-archetype-1.4.0.pom
Downloaded from cvte: http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi-processor-bundle-archetype/1.4.0/nifi-processor-bundle-archetype-1.4.0.pom (3.0 kB at 447 B/s)
Downloading from cvte: http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi-maven-archetypes/1.4.0/nifi-maven-archetypes-1.4.0.pom
Downloaded from cvte: http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi-maven-archetypes/1.4.0/nifi-maven-archetypes-1.4.0.pom (1.5 kB at 445 B/s)
Downloading from cvte: http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi/1.4.0/nifi-1.4.0.pom
[WARNING] Could not validate integrity of download from http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi/1.4.0/nifi-1.4.0.pom: Checksum validation failed, no checksums available
[WARNING] Checksum validation failed, no checksums available from cvte for http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi/1.4.0/nifi-1.4.0.pom
Downloaded from cvte: http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi/1.4.0/nifi-1.4.0.pom (98 kB at 19 kB/s)
Downloading from cvte: http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi-processor-bundle-archetype/1.4.0/nifi-processor-bundle-archetype-1.4.0.jar
[WARNING] Could not validate integrity of download from http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi-processor-bundle-archetype/1.4.0/nifi-processor-bundle-archetype-1.4.0.jar: Checksum validation failed, no checksums available
[WARNING] Checksum validation failed, no checksums available from cvte for http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi-processor-bundle-archetype/1.4.0/nifi-processor-bundle-archetype-1.4.0.jar
Downloaded from cvte: http://mvn.gz.cvte.cn/nexus/content/groups/public/org/apache/nifi/nifi-processor-bundle-archetype/1.4.0/nifi-processor-bundle-archetype-1.4.0.jar (12 kB at 1.9 kB/s)
Define value for property 'artifactBaseName': cvte
[INFO] Using property: nifiVersion = 1.4.0
Define value for property 'groupId': com
Define value for property 'artifactId': cvte
Define value for property 'version' 1.0-SNAPSHOT: : 1.0
Define value for property 'package' com.processors.cvte: : nifi
Confirm properties configuration:
artifactBaseName: cvte
nifiVersion: 1.4.0
groupId: com
artifactId: cvte
version: 1.0
package: nifi
Y: : y
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: nifi-processor-bundle-archetype:1.4.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: com
[INFO] Parameter: artifactId, Value: cvte
[INFO] Parameter: version, Value: 1.0
[INFO] Parameter: package, Value: nifi
[INFO] Parameter: packageInPathFormat, Value: nifi
[INFO] Parameter: package, Value: nifi
[INFO] Parameter: version, Value: 1.0
[INFO] Parameter: artifactBaseName, Value: cvte
[INFO] Parameter: groupId, Value: com
[INFO] Parameter: artifactId, Value: cvte
[INFO] Parameter: nifiVersion, Value: 1.4.0
[INFO] Project created from Archetype in dir: /Users/dszhao/source/nifi-processor/cvte
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 10:26 min
[INFO] Finished at: 2022-03-08T19:09:51+08:00
[INFO] ------------------------------------------------------------------------
(base) dszhao@IT00039346 nifi-processor %
**⚠️注意:**该方法生成的NiFi开发框架版本为1.4.0。
当前最新版本为1.15.3。可修改pom文件引用最新版本,后文详细介绍。
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
// TODO implement
// 此处新增
System.out.println("This is a custom processor that will receive flow file");
session.transfer(flowFile,MY_RELATIONSHIP);
}
修改工程pom中nifi-nar-bundles版本号为1.15.3
打包工程为nar压缩文件
(base) dszhao@IT00039346 nifi-processor % ll
total 0
drwxr-xr-x 6 dszhao staff 192 Mar 8 19:15 cvte
(base) dszhao@IT00039346 nifi-processor % cd cvte
(base) dszhao@IT00039346 cvte % ll
total 16
-rw-r--r-- 1 dszhao staff 1320 Mar 8 19:15 cvte.iml
drwxr-xr-x 4 dszhao staff 128 Mar 8 19:15 nifi-cvte-nar
drwxr-xr-x 5 dszhao staff 160 Mar 8 19:15 nifi-cvte-processors
-rw-r--r-- 1 dszhao staff 1464 Mar 8 19:09 pom.xml
(base) dszhao@IT00039346 cvte % pwd
/Users/dszhao/source/nifi-processor/cvte
(base) dszhao@IT00039346 cvte % mvn install
# 省略若干下载信息
[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ nifi-cvte-nar ---
[INFO] Installing /Users/dszhao/source/nifi-processor/cvte/nifi-cvte-nar/target/nifi-cvte-nar-1.0.nar to /Users/dszhao/.m2/repository/com/nifi-cvte-nar/1.0/nifi-cvte-nar-1.0.nar
[INFO] Installing /Users/dszhao/source/nifi-processor/cvte/nifi-cvte-nar/pom.xml to /Users/dszhao/.m2/repository/com/nifi-cvte-nar/1.0/nifi-cvte-nar-1.0.pom
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for cvte 1.0:
[INFO]
[INFO] cvte ............................................... SUCCESS [02:55 min]
[INFO] nifi-cvte-processors ............................... SUCCESS [01:08 min]
[INFO] nifi-cvte-nar ...................................... SUCCESS [ 0.162 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 04:05 min
[INFO] Finished at: 2022-03-08T19:26:49+08:00
[INFO] ------------------------------------------------------------------------
如果一切正常,打包后,生成target,如下所示:
(base) dszhao@IT00039346 cvte % ll nifi-cvte-nar/target
total 288
drwxr-xr-x 3 dszhao staff 96 Mar 8 19:26 classes
drwxr-xr-x 3 dszhao staff 96 Mar 8 19:26 maven-archiver
drwxr-xr-x 3 dszhao staff 96 Mar 8 19:26 maven-shared-archive-resources
-rw-r--r-- 1 dszhao staff 146149 Mar 8 19:26 nifi-cvte-nar-1.0.nar
drwxr-xr-x 3 dszhao staff 96 Mar 8 19:26 test-classes
(base) dszhao@IT00039346 cvte % ll nifi-cvte-processors/target/
total 24
drwxr-xr-x 4 dszhao staff 128 Mar 8 19:26 classes
drwxr-xr-x 3 dszhao staff 96 Mar 8 19:26 generated-sources
drwxr-xr-x 3 dszhao staff 96 Mar 8 19:26 generated-test-sources
drwxr-xr-x 3 dszhao staff 96 Mar 8 19:26 maven-archiver
drwxr-xr-x 3 dszhao staff 96 Mar 8 19:26 maven-shared-archive-resources
drwxr-xr-x 3 dszhao staff 96 Mar 8 19:26 maven-status
-rw-r--r-- 1 dszhao staff 9588 Mar 8 19:26 nifi-cvte-processors-1.0.jar
drwxr-xr-x 5 dszhao staff 160 Mar 8 19:26 surefire-reports
drwxr-xr-x 4 dszhao staff 128 Mar 8 19:26 test-classes
(base) dszhao@IT00039346 cvte %
复制生成的nar到本机NiFi环境
复制生成的nar文件到nifi 安装目录lib目录
(base) dszhao@IT00039346 bin % cp /Users/dszhao/source/nifi-processor/cvte/nifi-cvte-nar/target/nifi-cvte-nar-1.0.nar /Users/dszhao/source/nifi-1.15.3/lib/
(base) dszhao@IT00039346 bin % ll
total 368
-rwxr-x---@ 1 dszhao staff 1872 Jan 22 2020 dump-nifi.bat
-rw-r--r-- 1 dszhao staff 146149 Mar 8 19:34 nifi-cvte-nar-1.0.nar
-rwxr-x---@ 1 dszhao staff 1120 Jan 22 2020 nifi-env.bat
-rwxr-x---@ 1 dszhao staff 2719 Jan 22 2020 nifi-env.sh
-rwxr-x---@ 1 dszhao staff 16662 Jan 22 2020 nifi.sh
-rwxr-x---@ 1 dszhao staff 1871 Jan 22 2020 run-nifi.bat
-rwxr-x---@ 1 dszhao staff 1832 Jan 22 2020 status-nifi.bat
(base) dszhao@IT00039346 bin % sh nifi.sh restart
Java home: /Library/Java/JavaVirtualMachines/jdk1.8.0_251.jdk/Contents/Home
NiFi home: /Users/dszhao/source/nifi-1.15.3
Bootstrap Config File: /Users/dszhao/source/nifi-1.15.3/conf/bootstrap.conf
2022-03-08 19:34:42,556 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is not currently running
Java home: /Library/Java/JavaVirtualMachines/jdk1.8.0_251.jdk/Contents/Home
NiFi home: /Users/dszhao/source/nifi-1.15.3
Bootstrap Config File: /Users/dszhao/source/nifi-1.15.3/conf/bootstrap.conf
重启nifi后,登录https://127.0.0.1:8443/nifi/查看
可以看到我们自定义的MyProcessor已经成功加载。
该演示示例,只是简单的将FlowFile通过MY_RELATIONSHIP连接,路由到下游Processor。
本文简单演示了NiFi 自定义开发Processor的流程,包括NiFi processor maven开发骨架搭建、NiFi最新版本应用、NiFi Processor测试。后续文档,将详细介绍分别改写ConvertJsonToSQL、PutDatabaseRecord实现Oracle Merge功能,通过两个实战案例,演示NiFi Processor自定义细节知识。
对于文中任何疑问,欢迎加微信讨论:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)