Flink Client 使用技巧和心得(Flink on Zeppelin)

Flink Client 使用技巧和心得(Flink on Zeppelin),第1张

Flink Client 使用技巧和心得(Flink on Zeppelin)

Flink 链接Kafka
先建立catalog

CREATE CATALOG flink_hive WITH (
    'type' = 'hive',
    'default-database' = 'imods',
    'hive-conf-dir' = '/home/admin/flink/conf'
);

建立kafka table

use catalog flink_hive;
--创建kafka源表
CREATE TABLE IF NOT EXISTS kafka_table (
vin  string,
age int,
...
)--with 写入链接信息以及各种设置
   WITH (
     'connector' = 'kafka',
     'topic' = '自定义的topic',
     'properties.group.id' = '自定义的id',
     'properties.bootstrap.servers' = '自己知道的地址1:端口号1,自己知道的地址2:端口号2',
     'properties.security.protocol'='SASL_PLAINTEXT',
     'properties.sasl.mechanism'='PLAIN',
     'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";', --设定用户名与密码
     'properties.key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
     'properties.value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer',
     --'scan.startup.mode' = 'latest-offset',--五种选项下面会注意说明
      --'scan.startup.mode' = 'earliest-offset',
      'scan.startup.mode' = 'group-offsets',
     'json.fail-on-missing-field' = 'false',--是否允许失败策略
     'json.ignore-parse-errors' = 'true',--是否开启忽略错误策略
     'format' = 'json'--传入的格式
);

group-offsets: start from committed offsets in ZK / Kafka brokers of a specific consumer group.从提交给ZK的offset开始消费(必须注明groupID 才可以)
earliest-offset: start from the earliest offset possible. 从最初点开始消费
latest-offset: start from the latest offset.程序运行时有新消息才消费新消息
timestamp: start from user-supplied timestamp for each partition. 指定时间戳开始进行消费
specific-offsets: start from user-supplied specific offsets for each partition.
指定位置进行消费

建立对应的hivetable

--创建HIVE目标表
set table.sql-dialect=hive;
create table if not exists hive_table   --table字段类型顺序务必与Kafkatable一致,严格要求
 (
vin  string,
age int,
...
)
comment '我是Hive表'
 partitioned by (dt string)  --option
 stored as parquet           --option
TBLPROPERTIES (
  'sink.rolling-policy.file-size'='128MB',
  'sink.rolling-policy.rollover-interval'='30 min',  
  'sink.partition-commit.policy.kind'='metastore,success-file',--合并小文件选项
  'auto-compaction'='true',
  'compaction.file-size'='128MB',
  'sink.shuffle-by-partition.enable'='true'
)
;
--执行insert语句动态分区插入
set pipeline.name=设定英文任务名;  -- 设定英文任务名 不需要加引号
set table.sql-dialect=default;
insert into  hive_table
select 
vin  string as vin,
age int as age,
...
from  kafka_table;

--记录一个casewhen语句 用于时间戳的转换:case when CHARACTER_LENGTH(cast (eventTime as string)) = 13 then from_unixtime(cast (substr (cast (eventTime as string),0,10) as BIGINT),'yyyyMMdd') else '19700101' end  as dt

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705601.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存