pyflink实时接收kafka数据至hive

pyflink实时接收kafka数据至hive,第1张

pyflink实时接收kafka数据至hive
#!/usr/bin/python
# -*- coding: UTF-8 -*-



from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment, EnvironmentSettings,StreamTableEnvironment
from pyflink.table.catalog import HiveCatalog
from pyflink.table import SqlDialect
from pyflink.datastream import StreamExecutionEnvironment, CheckpointingMode

s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_parallelism(4)
# 必须开启checkpoint,时间间隔为毫秒,否则不能输出数据
s_env.enable_checkpointing(600000) #60 0000 十分钟

#env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
#t_env = BatchTableEnvironment.create(environment_settings=env_settings)
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=s_env,environment_settings=env_settings)

print("AAAA")
#t_env.enable_checkpointing(3000)
#测试环境物理机
hive_conf_dir = "/data/docker/containers/*/conf"  # a local path

#测试环境容器 /data/EtlServices/dlp/code/conf
#hive_conf_dir = "/data/E*code/conf"
catalog = HiveCatalog("myhive", "default", hive_conf_dir)
print("BBBB")
# Register the catalog
t_env.register_catalog("myhive", catalog)
print("CCC")
# set the HiveCatalog as the current catalog of the sessionT_env.use_catalog("myhive")
t_env.use_catalog("myhive")
t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
# Create a catalog table
print(t_env.execute_sql("show tables").print())
print(t_env.list_databases())
t_env.execute_sql("CREATE DATAbase IF NOT EXISTS hive_*_data")
#t_env.execute_sql("DROp TABLE IF EXISTS hive_*data_test.sink_dlp_hive")
#t_env.execute_sql("DROP TABLE IF EXISTS hive*data_test.source_dlp_kafka")
print(t_env.list_databases())
print(t_env.execute_sql("SHOW TABLES").print())
print("DDDD")


t_env.execute_sql("""CREATE TABLE IF NOT EXISTS hive*_data.sink_*_hive(
ORG_ID STRING,
DIRECTION_DESC STRING,
`RESULT` STRING
      ) PARTITIonED BY (
        ts_date STRING,
        ts_hour STRING
      ) STORED AS PARQUET
      TBLPROPERTIES (
        'sink.partition-commit.trigger' = 'process-time',
        'sink.partition-commit.delay' = '1 min',
        'sink.partition-commit.policy.kind' = 'metastore,success-file',
        'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:00:00'
      )"""
)

# should return the tables in current catalog and database.
t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
# 2. 创建 source 表
t_env.execute_sql("""
    CREATE TABLE IF NOT EXISTS  hive_*_data.source_*_kafka (
ORG_ID STRING,
DIRECTION_DESC STRING,
`RESULT` STRING
    ) WITH (
              'connector' = 'kafka',
              'topic' = '*',
              'properties.bootstrap.servers' = '2*:*.162:9092,2*3.163:9092,
                                                *4:9092,2*:9092,*6:9092,
                                                *:9092',
              'properties.group.id' = 'flink_kafka_hive01_consumers_*',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
""")



#插入数据
t_env.execute_sql("""
INSERT INTO hive_aliHipsLuojing_data.sink_aliHipsLuojing_hive
SELECT
ORG_ID,
DIRECTION_DESC,
`RESULT`,
DATE_FORMAT(LOCALTIMESTAMP,'yyyyMMdd'),
DATE_FORMAT(LOCALTIMESTAMP,'HH')
FROM hive_aliHipsLuojing_data.source_aliHipsLuojing_kafka
""").wait()

#RAW_MSG is not null
#!=''

                            

参考文档:
https://help.aliyun.com/document_detail/181568.html
https://blog.csdn.net/chenshijie2011/article/details/117399883
https://blog.csdn.net/chenshijie2011/article/details/117401621
https://www.cnblogs.com/maoxiangyi/p/13509782.html
https://www.cnblogs.com/Springmoon-venn/p/13726089.html
https://www.jianshu.com/p/295066a24092
https://blog.csdn.net/m0_37592814/article/details/108044830

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5717698.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存