flink1.12接受Kafka数据写入hive中,hive客户端不能查询数据

flink1.12接受Kafka数据写入hive中,hive客户端不能查询数据,第1张

flink1.12接受Kafka数据写入hive中,hive客户端不能查询数据

1、代码如下 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.getConfig().registerKryoType(BusEventKafka.class);
        //env.enableCheckpointing(1000 * 60 * 1);
        Properties props  = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("auto.offset.reset","earliest");
        props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //kafkaSource就是KafkaConsumer
        FlinkKafkaConsumer kafkaSource = new FlinkKafkaConsumer<>("example", new SimpleStringSchema(), props);
        kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费






EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
        String name = "myhive";      // Catalog名称,定义一个唯一的名称表示
        String defaultDatabase = "yqfk";  // 默认数据库名称
        String hiveConfDir = "/opt/hive/conf";  // hive-site.xml路径
        //String hiveConfDir = "/Users/jinhuan/eclipse-workspace/flinkLearn";
        String version = "3.1.2";       // Hive版本号



        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
        

        tEnv.registerCatalog(name, hive);
        tEnv.useCatalog(name);
        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        tEnv.useDatabase(defaultDatabase);
        tEnv.createTemporaryView("kafka_table", kafkaStream);
        tEnv.executeSql("insert into bus_event_tmp01  select id,organization,user_name,address," +
                "sex,card_no,event_time,safety_hat,door_position,temperature,temp_abnormal," +
                "check_result,health_code,auth_method,direction,desc_info,wear_mask," +
                "user_type,user_id,equip_id,hospital_no from kafka_table");

2、建表语句

CREATE TABLE bus_event_tmp01(
  id string, 
  organization string, 
  user_name string, 
  address string, 
  sex string, 
  card_no string, 
  event_time string, 
  safety_hat string, 
  door_position string, 
  temperature string, 
  temp_abnormal string, 
  check_result string, 
  health_code string, 
  auth_method string, 
  direction string, 
  desc_info string, 
  wear_mask string, 
  user_type string, 
  user_id string, 
  equip_id string
) partitioned by (hospital_no int) stored as orc TBLPROPERTIES (
  'sink.partition-commit.delay'='0s',
  'sink.partition-commit.policy.kind'='metastore'
);

3、问题描述:数据成功写入到hive中,hive客户端查询不到数据,写入的文件是以.开头的隐藏文件如下图:

用下面命令查看文件状态文件全为空且处于inprogress状态

执行命令

hdfs dfs -count -h /user/hive/warehouse/path/*

文件处于inprogress状体

4、最后解决方法: 添加以下代码数据成功写入

env.enableCheckpointing(3000);

Flink将一直缓存从Kafka消费出来的数据,只有当Checkpoint 触发的时候,才把数据刷新到目标目录

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575517.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存