flink1.12.3集成iceberg0.12-(6)

flink1.12.3集成iceberg0.12-(6),第1张

flink1.12.3集成iceberg0.12-(6)

iceberg0.11 升级iceberg 0.12:

release-notes 地址(采用对应的fink1.12.4版本)
https://iceberg.apache.org/releases/#0120-release-notes
jar包下载地址
将下面两个包
放到flink的lib目录下
wget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-flink-runtime/0.12.0/iceberg-flink-runtime-0.12.0.jar

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.11/1.12.2/flink-sql-connector-hive-2.3.6_2.11-1.12.4.jar

下面这个包放到hive 的auxlib目录下
wget https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-hive-runtime/0.12.0/iceberg-hive-runtime-0.12.0.jar

重启flink集群
./bin/stop-cluster.sh
./bin/start-cluster.sh
在sql-client-default.yaml中配置hive_catalog
  # 需要安装了hive2.3.8版本
  - name: hive_catalog
    type: iceberg
    catalog-type: hive
    uri: thrift://cdh2:9083
    warehouse: hdfs://cdh2:8020/user/hive/warehouse/hive_catalog
    
或者进入sql-client.sh 手动创建
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://cdh2:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://cdh2:8020/warehouse/hive_catalog'
);
进去sql-client.sh
./bin/sql-client.sh embedded
测试 hive_catalog
1. 创建 hive_catalog
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://cdh2:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://cdh2:8020/user/hive/warehouse/hive_catalog'
);

2. 建库
create database hive_catalog.iceberg1;
3. dll
建表语句:
CREATE TABLE hive_catalog.iceberg1.iceberg_edw2(
    id BIGINT  PRIMARY KEY NOT ENFORCED,
    data BIGINT
) with (
'write.distribution-mode'='hash',
'format-version'='2'
);
删除语句:
drop table hive_catalog.iceberg1.iceberg_edw2;

4. 插入数据,分两次插入
insert into hive_catalog.iceberg1.iceberg_edw2 select 1,2 union select 2,3;
insert into hive_catalog.iceberg1.iceberg_edw2 select 1,3 union select 2,4;
5. 查询数据
SET execution.type = streaming ;
SET table.dynamic-table-options.enabled=true;
select * from hive_catalog.iceberg1.iceberg_edw2  ;

结果显示:分两次插入,会有重复数据
1 2
2 3
1 3
1 4


-------------------------------------------------------
5.1 '20210914添加
最新的master添加了'write.upsert.enable'='true'参数,可以将insert数据拆分成
一条delete数据和一条insert数据,解决上面数据重复的问题'
-------------------------------------------------------


6. 删除表重建,插入数据,插入 聚合 数据
insert into hive_catalog.iceberg1.iceberg_edw2
select id,sum(data) as data
from (
select 1 as id,2 as data union
select 2 as id,3 as data union 
select 1 as id,3 as data union 
select 2 as id,4 as data
) a group by id;

7. 查询数据
SET execution.type = streaming ;
select * from hive_catalog.iceberg1.iceberg_edw2 ;
报错

v2表还是不支持流式查询,所以只能用批查询
set execution.type=batch;
select * from hive_catalog.iceberg1.iceberg_edw2 ;

结果显示:聚合正常
1 5
2 7

8. 删除表重建,插入数据 一次性插入重复数据
insert into hive_catalog.iceberg1.iceberg_edw2 select 5,5 union select 5,6;

9. 查询数据
SET execution.type = streaming ;
select * from hive_catalog.iceberg1.iceberg_edw2 ;
报错

v2表还是不支持流式查询,所以只能用批查询
set execution.type=batch;
select * from hive_catalog.iceberg1.iceberg_edw2 ;
结果显示:正常去重,更新最新数据
5 6 

10. 在hive中查询
hive进入命令行,可以看到iceberg中的iceberg1库和iceberg1.iceberg_edw2表
select * from iceberg1.iceberg_edw2; 数据为空
select count(*) from iceberg1.iceberg_edw2; 报错

11. 在hive中通过hdfs目录创建iceberg外部表
在hive中:
create database test;
use test;
创建cieberg 外部表
create external table iceberg_edw_1 (
    id BIGINT ,
    data BIGINT
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://cdh2:8020/user/hive/warehouse/hive_catalog/iceberg1.db/iceberg_edw2'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');

查询外部表数据
set hive.vectorized.execution.enabled=false;
set iceberg.engine.hive.enabled=true;
select * from test.iceberg_edw_1; 数据为空
select count(*) from test.iceberg_edw_1; 数据为空
测试 hadoop_catalog
1. 创建 hadoop_catalog
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://cdh2:8020/user/hive/warehouse/hadoop_catalog'
);

2. 建库
create database hadoop_catalog.iceberg2;
3. dll
建表语句:
CREATE TABLE hadoop_catalog.iceberg2.iceberg_edw2(
    id BIGINT  PRIMARY KEY NOT ENFORCED,
    data BIGINT
) with ('write.distribution-mode'='hash',
'format-version'='2');
删除语句:
drop table hadoop_catalog.iceberg2.iceberg_edw2;

4. 插入数据,分两次插入
insert into hadoop_catalog.iceberg2.iceberg_edw2 select 1,2 union select 2,3;
insert into hadoop_catalog.iceberg2.iceberg_edw2 select 1,3 union select 2,4;
5. 查询数据
SET execution.type = streaming ;
SET table.dynamic-table-options.enabled=true;
select * from hadoop_catalog.iceberg2.iceberg_edw2  ;

结果显示:分两次插入,会有重复数据
1 2
2 3
1 3
1 4

6. 删除表重建,插入数据,插入 聚合 数据
insert into hadoop_catalog.iceberg2.iceberg_edw2
select id,sum(data) as data
from (
select 1 as id,2 as data union
select 2 as id,3 as data union 
select 1 as id,3 as data union 
select 2 as id,4 as data
) a group by id;

7. 查询数据
SET execution.type = streaming ;
select * from hadoop_catalog.iceberg2.iceberg_edw2 ;
报错

v2表还是不支持流式查询,所以只能用批查询
set execution.type=batch;
select * from hadoop_catalog.iceberg2.iceberg_edw2 ;

结果显示:聚合正常
1 5
2 7

8. 删除表重建,插入数据 一次性插入重复数据
insert into hadoop_catalog.iceberg2.iceberg_edw2 select 5,5 union select 5,6;

9. 查询数据
SET execution.type = streaming ;
select * from hadoop_catalog.iceberg2.iceberg_edw2 ;
报错

v2表还是不支持流式查询,所以只能用批查询
set execution.type=batch;
select * from hadoop_catalog.iceberg2.iceberg_edw2 ;
结果显示:正常去重,更新最新数据
5 6 

10. 在hive中查询
hive进入命令行,看不到hadoop_catalog的表

11. 在hive中通过hdfs目录创建iceberg外部表
在hive中:
create database test;
use test;
创建cieberg 外部表
create external table iceberg_edw_2 (
    id BIGINT ,
    data BIGINT
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://cdh2:8020/user/hive/warehouse/hadoop_catalog/iceberg2/iceberg_edw2'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');

查询外部表数据
set hive.vectorized.execution.enabled=false;
set iceberg.engine.hive.enabled=true;
select * from test.iceberg_edw_2; 数据正常
select count(*) from test.iceberg_edw_2; 数据正常
select * from test.iceberg_edw_2 where id=1; 数据正常

结论:

  1. v2表单个任务内可实现按照key更新去重和聚合运算
  2. 多个任务插入则会出现重复数据(后续可能会通过把insert 拆分成 delete和insert来解决这个问题)
  3. iceberg-runtime.jar 和 flink-sql-connect-hive.jar不能同时使用,会冲突
  4. V2表还不支持流式查询,只支持批查询
  5. V2表还不支持小文件合并,V1表可以(参考:https://github.com/apache/iceberg/pull/2303)
  6. hive_catalog 和 hadoop_catalog 大体上并无差异
  7. hive_catalog的表在hive中查不到数据,创建外部表也查不到数据
  8. hadoop_catalog道的表可以在hive中通过创建外部表正常查询
hive_catalog 表在hive中无法查询问题解决

20210901 测试后添加

解决方案

1. 在 hvie-site.xml中添加参数
    iceberg.engine.hive.enabled=true(暂时还没测试通过)
    或者在iceberg建表的时候指定参数
    'engine.hive.enabled'='true',(这个可以用
    
    )
2. 在hive中查询
    select * from iceberg_edw2;
     select count(*) from iceberg_edw2;
     都能正常查询
     
     
    select * from iceberg_edw2 where id=1;
    查询报错:Vectorization only supported for Hive 3+
    
    需要设置一下参数:
        set hive.vectorized.execution.enabled=false;
        
    查询报错:
    NoSuchMethodError: org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg.create
    
    需要设置一下参数:
    set hive.optimize.ppd=false;
        
    再次查询即可正常查询

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5590709.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存