上次发的是1.10的flink,当时版本的flink不支持hive数据更新后依然以最新数据和kafka关联。
本次以1.12.2版本,java代码形式实现“动态”关联。下方是这个tiny demo的依赖和代码。
依赖:
org.apache.flink flink-clients_2.121.12.2 org.apache.flink flink-streaming-java_2.121.12.2 org.apache.flink flink-connector-kafka_2.121.12.2 org.apache.flink flink-table-api-scala_2.121.12.2 org.apache.flink flink-table-api-java-bridge_2.121.12.2 org.apache.flink flink-table-planner_2.121.12.2 org.apache.flink flink-table-planner-blink_2.121.12.2 org.apache.flink flink-table-api-scala-bridge_2.121.12.2 org.apache.flink flink-connector-hive_2.121.12.2 org.apache.hive hive-exec3.1.0 org.apache.flink flink-csv1.12.2 org.apache.flink flink-cep-scala_2.121.12.2 org.apache.flink flink-table-common1.12.2 org.apache.hadoop hadoop-mapreduce-client-core2.7.2 org.apache.hadoop hadoop-common2.7.2 org.apache.hadoop hadoop-mapreduce-client-common2.7.2 org.apache.hadoop hadoop-mapreduce-client-jobclient2.7.2
代码:
package StreamBatch_fh; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; //import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; public class FlinkHiveMain { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool para = ParameterTool.fromArgs(args); String host = para.get("host"); Integer port = para.getInt("port"); String topic = para.get("topic"); String hivedir = para.get("hivedir"); EnvironmentSettings envSet = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env, envSet); // tableEnv里添加hive相关的参数 streamTableEnvironment.getConfig().getConfiguration().setString("table.dynamic-table-options.enabled", "true"); String name = "myhive"; String defaultDatabase = "flink01"; // String hiveConfDir = "D:\ocspx_20210616\flink0810\flink12hive\src\main\resources"; String hiveConfDir = hivedir; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); //注册catalog streamTableEnvironment.registerCatalog("myhive", hive); //使用catalog streamTableEnvironment.useCatalog("myhive"); String dropoldkafkatable = String.format("DROP table if exists UserScores"); streamTableEnvironment.executeSql(dropoldkafkatable); String createKafkaTable = String.format( "CREATE TABLE UserScores (name1 STRING,scoure DOUBLE,zoneCode STRING,proctime as PROCTIME())n" + "WITH (n" + " 'connector' = 'kafka',n" + " 'topic' = 'test_in1',n" + " 'properties.bootstrap.servers' = '10.1.236.92:6667',n" + " 'properties.group.id' = 'testGroup',n" + " 'format' = 'csv',n" + //" 'scan.startup.timestamp-millis' = '1605147648000',n" + // " 'csv.field-delimiter' = 't',n" + " 'scan.startup.mode' = 'latest-offset'n" + ")"); //创建表 也就是上方那一段sql TableResult tableResult = streamTableEnvironment.executeSql(createKafkaTable); // Table table1 = streamTableEnvironment.sqlQuery("select * from UserScores"); //把表转为流 再打印 DataStreamgg = streamTableEnvironment.toAppendStream(table1, Row.class); gg.print("kafka源数据"); // String createHiveTable = String.format(""); // 方言改为hive streamTableEnvironment.getConfig().setSqlDialect(SqlDialect.HIVE); // ??table.exec.hive.fallback-mapred-reader=true // Table table2 = streamTableEnvironment.sqlQuery( // "select * from dis_users_1118a " ); // String hive1 = String.format("select * from dis_users_1118a "); //下方where条件里 仿佛左右两个表不能有重复的字段 // Table tableJoin = table1.join(table2).where("name1 = name"); // streamTableEnvironment.toAppendStream(table2, Row.class).print("当前hive数据"); // streamTableEnvironment.toAppendStream(tableJoin, Row.class).print("关联后"); // DataStream
gg2 = streamTableEnvironment.toAppendStream(table2, Row.class); // gg2.print("hive中创建的users表"); String dropoldHivetable = String.format("DROP table if exists dimension_table"); streamTableEnvironment.executeSql(dropoldHivetable); // String hive_create = String.format("CREATE TABLE dimension_table (n" + " product_id STRING,n" + " user_name STRING,n" + " unit_price DECIMAL(10, 4),n" + " pv_count BIGINT,n" + " like_count BIGINT,n" + " comment_count BIGINT,n" + " update_user STRINGn" + ") TBLPROPERTIES (n" + " 'streaming-source.enable' = 'false', n" + " 'streaming-source.partition.include' = 'all', n" + " 'lookup.join.cache.ttl' = '1 min'n" + ")"); streamTableEnvironment.executeSql(hive_create); String Insert = String.format(" INSERT INTO dimension_table values('Bill','Bill',9.22,20211122,1122,2021,'hh')"); streamTableEnvironment.executeSql(Insert); // String join = String.format("select * from UserScores join dimension_table ON UserScores.name1 = dimension_table.product_name"); // streamTableEnvironment.sqlQuery(join).printSchema(); String join2 = String.format("select * from UserScores join dimension_table FOR SYSTEM_TIME AS OF UserScores.proctime ON UserScores.name1 = dimension_table.user_name"); Table t = streamTableEnvironment.sqlQuery(join2); streamTableEnvironment.toAppendStream(t, Row.class).print("输出关联结果"); try { env.execute("hive test01"); } catch (Exception e) { e.printStackTrace(); } } }
运行效果:
打包好后提交到yarn上:
/usr/hdp/3.1.0.0-78/flink/bin/flink run -c StreamBatch_fh.FlinkHiveMain -m yarn-cluster -yt /data/hh/app_jar/hive_join/resources /data/hh/app_jar/hive_join/flink12hive-1.0-SNAPSHOT.jar --port 6667 --host 10.***.92 --topic test_in1 --hivedir /data/hh/app_jar/hive_join/resources
提交作业后,给hive维表插入1条数据,加上代码里默认插入的1条,共2条数据:
此时给一条对应hive中存在的数据:
输出为:
因为hive更新后,要等一个ttl的时间,所以并不是立即就能带出关联到的维表数据。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)