hudi与flink的集成(一)

hudi与flink的集成(一),第1张

hudi与flink的集成(一)

hudi与flink的集成

一、组件下载

1.1 flink1.12.2编译包下载1.2 hudi编译 二、Batch模式具体实施步骤

2.1 启动flink-sql客户端2.2 根据主键更新数据 三、支持stream读模式

3.1 创建表3.2 从批模式写入一条数据


一、组件下载 1.1 flink1.12.2编译包下载

https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.11.tgz

1.2 hudi编译
git clone https://github.com/apache/hudi.git && cd hudi
mvn clean package -DskipTests
##注意:默认是用scala-2.11编译的
##如果我们用的是flink1.12.2-2.12版本,可以自己编译成scala-2.12版本的
mvn clean package -DskipTests -Dscala-2.12
##包的路径在packaging/hudi-flink-bundle/target/hudi-flink-bundle_2.12-*.*.*-SNAPSHOT.jar

建议用flink1.12.2+hudi0.9.0(master),亲测可以。

二、Batch模式具体实施步骤

导包 hudi-flink到flink lib目录下

2.1 启动flink-sql客户端

可以提前把hudi-flink-bundle_2.12-0.9.0-SNAPSHOT.jar拷贝到 $Flink_HOME/lib目录下(我用的flink是scala2.12版本)

#HADOOP_HOME是解压二进制包后的hadoop根目录。
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
#启动flink单机集群
./bin/sql-client.sh embedded
CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIonED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://192.168.10.81:8020/hudi/t1',
  'table.type' = 'MERGE_ON_READ'
);

INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); 
  
 #查询表数据,设置一下查询模式为tableau
set execution.result-mode=tableau;

2.2 根据主键更新数据

INSERT INTO t1 VALUES (‘id1’,‘Danny’,24,TIMESTAMP ‘1970-01-01 00:00:01’,‘par1’);
id1的数据age由23变为了24

三、支持stream读模式 3.1 创建表
CREATE TABLE t2(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIonED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://192.168.10.81:8020/hudi/t1',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',  
  'read.streaming.start-commit' = '20210401134557' ,
  'read.streaming.check-interval' = '4'
);
 
#这里将 table option read.streaming.enabled 设置为 true,表明通过 streaming 的方式读取表数据;
#opiton read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;
#option table.type 设置表类型为 MERGE_ON_READ,目前只有 MERGE_ON_READ 表支持 streaming 读

3.2 从批模式写入一条数据

insert into t1 values (‘id9’,‘test’,27,TIMESTAMP ‘1970-01-01 00:00:01’,‘par5’);
隔几秒后在流模式可以读取到一条新增的数据

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5709209.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存