Flink 链接Kafka
先建立catalog
CREATE CATALOG flink_hive WITH ( 'type' = 'hive', 'default-database' = 'imods', 'hive-conf-dir' = '/home/admin/flink/conf' );
建立kafka table
use catalog flink_hive; --创建kafka源表 CREATE TABLE IF NOT EXISTS kafka_table ( vin string, age int, ... )--with 写入链接信息以及各种设置 WITH ( 'connector' = 'kafka', 'topic' = '自定义的topic', 'properties.group.id' = '自定义的id', 'properties.bootstrap.servers' = '自己知道的地址1:端口号1,自己知道的地址2:端口号2', 'properties.security.protocol'='SASL_PLAINTEXT', 'properties.sasl.mechanism'='PLAIN', 'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";', --设定用户名与密码 'properties.key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer', 'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer', --'scan.startup.mode' = 'latest-offset',--五种选项下面会注意说明 --'scan.startup.mode' = 'earliest-offset', 'scan.startup.mode' = 'group-offsets', 'json.fail-on-missing-field' = 'false',--是否允许失败策略 'json.ignore-parse-errors' = 'true',--是否开启忽略错误策略 'format' = 'json'--传入的格式 );
group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.从提交给ZK的offset开始消费(必须注明groupID 才可以)
earliest-offset: start from the earliest offset possible. 从最初点开始消费
latest-offset: start from the latest offset.程序运行时有新消息才消费新消息
timestamp: start from user-supplied timestamp for each partition. 指定时间戳开始进行消费
specific-offsets: start from user-supplied specific offsets for each partition.
指定位置进行消费
建立对应的hivetable
--创建HIVE目标表 set table.sql-dialect=hive; create table if not exists hive_table --table字段类型顺序务必与Kafkatable一致,严格要求 ( vin string, age int, ... ) comment '我是Hive表' partitioned by (dt string) --option stored as parquet --option TBLPROPERTIES ( 'sink.rolling-policy.file-size'='128MB', 'sink.rolling-policy.rollover-interval'='30 min', 'sink.partition-commit.policy.kind'='metastore,success-file',--合并小文件选项 'auto-compaction'='true', 'compaction.file-size'='128MB', 'sink.shuffle-by-partition.enable'='true' ) ;
--执行insert语句动态分区插入 set pipeline.name=设定英文任务名; -- 设定英文任务名 不需要加引号 set table.sql-dialect=default; insert into hive_table select vin string as vin, age int as age, ... from kafka_table; --记录一个casewhen语句 用于时间戳的转换:case when CHARACTER_LENGTH(cast (eventTime as string)) = 13 then from_unixtime(cast (substr (cast (eventTime as string),0,10) as BIGINT),'yyyyMMdd') else '19700101' end as dt
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)