#!/usr/bin/python # -*- coding: UTF-8 -*- from pyflink.dataset import ExecutionEnvironment from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings,StreamTableEnvironment from pyflink.table.catalog import HiveCatalog from pyflink.table import SqlDialect from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode s_env = StreamExecutionEnvironment.get_execution_environment() s_env.set_parallelism(4) # 必须开启checkpoint,时间间隔为毫秒,否则不能输出数据 s_env.enable_checkpointing(600000) #60 0000 十分钟 #env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() #t_env = BatchTableEnvironment.create(environment_settings=env_settings) env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create(stream_execution_environment=s_env,environment_settings=env_settings) print("AAAA") #t_env.enable_checkpointing(3000) #测试环境物理机 hive_conf_dir = "/data/docker/containers/*/conf" # a local path #测试环境容器 /data/EtlServices/dlp/code/conf #hive_conf_dir = "/data/E*code/conf" catalog = HiveCatalog("myhive", "default", hive_conf_dir) print("BBBB") # Register the catalog t_env.register_catalog("myhive", catalog) print("CCC") # set the HiveCatalog as the current catalog of the sessionT_env.use_catalog("myhive") t_env.use_catalog("myhive") t_env.get_config().set_sql_dialect(SqlDialect.HIVE) # Create a catalog table print(t_env.execute_sql("show tables").print()) print(t_env.list_databases()) t_env.execute_sql("CREATE DATAbase IF NOT EXISTS hive_*_data") #t_env.execute_sql("DROp TABLE IF EXISTS hive_*data_test.sink_dlp_hive") #t_env.execute_sql("DROP TABLE IF EXISTS hive*data_test.source_dlp_kafka") print(t_env.list_databases()) print(t_env.execute_sql("SHOW TABLES").print()) print("DDDD") t_env.execute_sql("""CREATE TABLE IF NOT EXISTS hive*_data.sink_*_hive( ORG_ID STRING, DIRECTION_DESC STRING, `RESULT` STRING ) PARTITIonED BY ( ts_date STRING, ts_hour STRING ) STORED AS PARQUET TBLPROPERTIES ( 'sink.partition-commit.trigger' = 'process-time', 'sink.partition-commit.delay' = '1 min', 'sink.partition-commit.policy.kind' = 'metastore,success-file', 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:00:00' )""" ) # should return the tables in current catalog and database. t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) # 2. 创建 source 表 t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS hive_*_data.source_*_kafka ( ORG_ID STRING, DIRECTION_DESC STRING, `RESULT` STRING ) WITH ( 'connector' = 'kafka', 'topic' = '*', 'properties.bootstrap.servers' = '2*:*.162:9092,2*3.163:9092, *4:9092,2*:9092,*6:9092, *:9092', 'properties.group.id' = 'flink_kafka_hive01_consumers_*', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ) """) #插入数据 t_env.execute_sql(""" INSERT INTO hive_aliHipsLuojing_data.sink_aliHipsLuojing_hive SELECT ORG_ID, DIRECTION_DESC, `RESULT`, DATE_FORMAT(LOCALTIMESTAMP,'yyyyMMdd'), DATE_FORMAT(LOCALTIMESTAMP,'HH') FROM hive_aliHipsLuojing_data.source_aliHipsLuojing_kafka """).wait() #RAW_MSG is not null #!=''
参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)