iceberg0.11 升级iceberg 0.12:
release-notes 地址(采用对应的fink1.12.4版本)包下载地址
将下面两个包 放到flink的lib目录下 wget wget 下面这个包放到hive 的auxlib目录下 wget重启flink集群
./bin/ ./bin/start-cluster.sh在sql-client-default.yaml中配置hive_catalog
# 需要安装了hive2.3.8版本 - name: hive_catalog type: iceberg catalog-type: hive uri: thrift://cdh2:9083 warehouse: hdfs://cdh2:8020/user/hive/warehouse/hive_catalog 或者进入 手动创建 CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://cdh2:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://cdh2:8020/warehouse/hive_catalog' );进去
./bin/ embedded测试 hive_catalog
1. 创建 hive_catalog CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://cdh2:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://cdh2:8020/user/hive/warehouse/hive_catalog' ); 2. 建库 create database hive_catalog.iceberg1; 3. dll 建表语句: CREATE TABLE hive_catalog.iceberg1.iceberg_edw2( id BIGINT PRIMARY KEY NOT ENFORCED, data BIGINT ) with ( 'write.distribution-mode'='hash', 'format-version'='2' ); 删除语句: drop table hive_catalog.iceberg1.iceberg_edw2; 4. 插入数据,分两次插入 insert into hive_catalog.iceberg1.iceberg_edw2 select 1,2 union select 2,3; insert into hive_catalog.iceberg1.iceberg_edw2 select 1,3 union select 2,4; 5. 查询数据 SET execution.type = streaming ; SET table.dynamic-table-options.enabled=true; select * from hive_catalog.iceberg1.iceberg_edw2 ; 结果显示:分两次插入,会有重复数据 1 2 2 3 1 3 1 4 ------------------------------------------------------- 5.1 '20210914添加 最新的master添加了'write.upsert.enable'='true'参数,可以将insert数据拆分成 一条delete数据和一条insert数据,解决上面数据重复的问题' ------------------------------------------------------- 6. 删除表重建,插入数据,插入 聚合 数据 insert into hive_catalog.iceberg1.iceberg_edw2 select id,sum(data) as data from ( select 1 as id,2 as data union select 2 as id,3 as data union select 1 as id,3 as data union select 2 as id,4 as data ) a group by id; 7. 查询数据 SET execution.type = streaming ; select * from hive_catalog.iceberg1.iceberg_edw2 ; 报错 v2表还是不支持流式查询,所以只能用批查询 set execution.type=batch; select * from hive_catalog.iceberg1.iceberg_edw2 ; 结果显示:聚合正常 1 5 2 7 8. 删除表重建,插入数据 一次性插入重复数据 insert into hive_catalog.iceberg1.iceberg_edw2 select 5,5 union select 5,6; 9. 查询数据 SET execution.type = streaming ; select * from hive_catalog.iceberg1.iceberg_edw2 ; 报错 v2表还是不支持流式查询,所以只能用批查询 set execution.type=batch; select * from hive_catalog.iceberg1.iceberg_edw2 ; 结果显示:正常去重,更新最新数据 5 6 10. 在hive中查询 hive进入命令行,可以看到iceberg中的iceberg1库和iceberg1.iceberg_edw2表 select * from iceberg1.iceberg_edw2; 数据为空 select count(*) from iceberg1.iceberg_edw2; 报错 11. 在hive中通过hdfs目录创建iceberg外部表 在hive中: create database test; use test; 创建cieberg 外部表 create external table iceberg_edw_1 ( id BIGINT , data BIGINT ) STORED BY '' LOCATION 'hdfs://cdh2:8020/user/hive/warehouse/hive_catalog/iceberg1.db/iceberg_edw2' TBLPROPERTIES ('iceberg.catalog'='location_based_table'); 查询外部表数据 set hive.vectorized.execution.enabled=false; set iceberg.engine.hive.enabled=true; select * from test.iceberg_edw_1; 数据为空 select count(*) from test.iceberg_edw_1; 数据为空测试 hadoop_catalog
1. 创建 hadoop_catalog CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://cdh2:8020/user/hive/warehouse/hadoop_catalog' ); 2. 建库 create database hadoop_catalog.iceberg2; 3. dll 建表语句: CREATE TABLE hadoop_catalog.iceberg2.iceberg_edw2( id BIGINT PRIMARY KEY NOT ENFORCED, data BIGINT ) with ('write.distribution-mode'='hash', 'format-version'='2'); 删除语句: drop table hadoop_catalog.iceberg2.iceberg_edw2; 4. 插入数据,分两次插入 insert into hadoop_catalog.iceberg2.iceberg_edw2 select 1,2 union select 2,3; insert into hadoop_catalog.iceberg2.iceberg_edw2 select 1,3 union select 2,4; 5. 查询数据 SET execution.type = streaming ; SET table.dynamic-table-options.enabled=true; select * from hadoop_catalog.iceberg2.iceberg_edw2 ; 结果显示:分两次插入,会有重复数据 1 2 2 3 1 3 1 4 6. 删除表重建,插入数据,插入 聚合 数据 insert into hadoop_catalog.iceberg2.iceberg_edw2 select id,sum(data) as data from ( select 1 as id,2 as data union select 2 as id,3 as data union select 1 as id,3 as data union select 2 as id,4 as data ) a group by id; 7. 查询数据 SET execution.type = streaming ; select * from hadoop_catalog.iceberg2.iceberg_edw2 ; 报错 v2表还是不支持流式查询,所以只能用批查询 set execution.type=batch; select * from hadoop_catalog.iceberg2.iceberg_edw2 ; 结果显示:聚合正常 1 5 2 7 8. 删除表重建,插入数据 一次性插入重复数据 insert into hadoop_catalog.iceberg2.iceberg_edw2 select 5,5 union select 5,6; 9. 查询数据 SET execution.type = streaming ; select * from hadoop_catalog.iceberg2.iceberg_edw2 ; 报错 v2表还是不支持流式查询,所以只能用批查询 set execution.type=batch; select * from hadoop_catalog.iceberg2.iceberg_edw2 ; 结果显示:正常去重,更新最新数据 5 6 10. 在hive中查询 hive进入命令行,看不到hadoop_catalog的表 11. 在hive中通过hdfs目录创建iceberg外部表 在hive中: create database test; use test; 创建cieberg 外部表 create external table iceberg_edw_2 ( id BIGINT , data BIGINT ) STORED BY '' LOCATION 'hdfs://cdh2:8020/user/hive/warehouse/hadoop_catalog/iceberg2/iceberg_edw2' TBLPROPERTIES ('iceberg.catalog'='location_based_table'); 查询外部表数据 set hive.vectorized.execution.enabled=false; set iceberg.engine.hive.enabled=true; select * from test.iceberg_edw_2; 数据正常 select count(*) from test.iceberg_edw_2; 数据正常 select * from test.iceberg_edw_2 where id=1; 数据正常
- v2表单个任务内可实现按照key更新去重和聚合运算
- 多个任务插入则会出现重复数据(后续可能会通过把insert 拆分成 delete和insert来解决这个问题)
- iceberg-runtime.jar 和 flink-sql-connect-hive.jar不能同时使用,会冲突
- V2表还不支持流式查询,只支持批查询
- V2表还不支持小文件合并,V1表可以(参考:
- hive_catalog 和 hadoop_catalog 大体上并无差异
- hive_catalog的表在hive中查不到数据,创建外部表也查不到数据
- hadoop_catalog道的表可以在hive中通过创建外部表正常查询
20210901 测试后添加
1. 在 hvie-site.xml中添加参数 iceberg.engine.hive.enabled=true(暂时还没测试通过) 或者在iceberg建表的时候指定参数 'engine.hive.enabled'='true',(这个可以用 ) 2. 在hive中查询 select * from iceberg_edw2; select count(*) from iceberg_edw2; 都能正常查询 select * from iceberg_edw2 where id=1; 查询报错:Vectorization only supported for Hive 3+ 需要设置一下参数: set hive.vectorized.execution.enabled=false; 查询报错: NoSuchMethodError: 需要设置一下参数: set hive.optimize.ppd=false; 再次查询即可正常查询