- 环境
- 解析
- 完整代码
- 源码下载
简单实现从一个数据源获取数据,根据特定标识字段,将数据sink到不同mysql表中。
创建环境、输入流,从socket获取输入数据
val env = StreamExecutionEnvironment.getExecutionEnvironment val inputStream = env.socketTextStream("server120", 9999)
测试表、测试数据。 将stu开头的数据插入到stu表,project开头的表,插入到project表。
--测试表 create table stu(sid int,name varchar(25))charset=utf8; create table project(pid int,sid int,pro varchar(25),score int)charset=utf8; --测试数据 stu,1001,张三 stu,1002,李四 project,10001,1001,语文,90 project,10002,1002,语文,99 project,10003,1001,数学,79 project,10004,1002,数学,120
加工数据,根据数据的第一个字符串,将数据拆分到不同的侧输出流中
val resultStream = inputStream .filter(x => { x != null && !"".equals(x) })//filter 算子,过滤为空数据 .map(v => { v })//map算子,未进行任何处理 .process(new ProcessFunction[String, String]() {//process算子,定义输入输出类型为String override def processElement(v: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = { val vs: Array[String] = v.split(",")//根据逗号分割 vs(0).toString().toLowerCase match { case "stu" => {//第一个值匹配stu,走这个逻辑 ctx.output(new OutputTag[String]("stu"), v) } case "project" => {//第一个值匹配project,走这个逻辑 ctx.output(new OutputTag[String]("project"), v) } } } })
获取侧输出流
val stu = new OutputTag[String]("stu") val project = new OutputTag[String]("project") //获取侧输出流 val stuStream = resultStream.getSideOutput(stu) val projectStream = resultStream.getSideOutput(project)
定义stu数据流的sink
//定义Jdbc sink stuStream.addSink(JdbcSink.sink("insert into stu (sid,name) values(?,?)", new JdbcStatementBuilder[String]() { override def accept(t: PreparedStatement, u: String): Unit = { t.setInt(1, Integer.parseInt(u.split(",")(1))) t.setString(2, u.split(",")(2)) } }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接 .withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集.. .withDriverName("com.mysql.jdbc.Driver") .withUsername("flink_test") .withPassword("flink_test") .build()))
定义project流的sink
//定义Jdbc sink projectStream.addSink(JdbcSink.sink("insert into project (pid,sid,pro,score) values(?,?,?,?)", new JdbcStatementBuilder[String]() { override def accept(t: PreparedStatement, u: String): Unit = { t.setInt(1, Integer.parseInt(u.split(",")(1))) t.setInt(2, Integer.parseInt(u.split(",")(2))) t.setString(3, u.split(",")(3).toString) t.setInt(4, Integer.parseInt(u.split(",")(4))) } }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接 .withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集.. .withDriverName("com.mysql.jdbc.Driver") .withUsername("flink_test") .withPassword("flink_test") .build()))
阻塞进程,等待数据
env.execute()完整代码
package com.z.sink import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder} import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import java.sql.PreparedStatement object SocketSinkMysql { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val inputStream = env.socketTextStream("server120", 9999) val resultStream = inputStream .filter(x => { x != null && !"".equals(x) })//filter 算子,过滤为空数据 .map(v => { v })//map算子,未进行任何处理 .process(new ProcessFunction[String, String]() {//process算子,定义输入输出类型为String override def processElement(v: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = { val vs: Array[String] = v.split(",")//根据逗号分割 vs(0).toString().toLowerCase match { case "stu" => {//第一个值匹配stu,走这个逻辑 ctx.output(new OutputTag[String]("stu"), v) } case "project" => {//第一个值匹配project,走这个逻辑 ctx.output(new OutputTag[String]("project"), v) } } } }) //定义侧输出流 val stu = new OutputTag[String]("stu") val project = new OutputTag[String]("project") //获取侧输出流 val stuStream = resultStream.getSideOutput(stu) val projectStream = resultStream.getSideOutput(project) //定义Jdbc sink stuStream.addSink(JdbcSink.sink("insert into stu (sid,name) values(?,?)", new JdbcStatementBuilder[String]() { override def accept(t: PreparedStatement, u: String): Unit = { t.setInt(1, Integer.parseInt(u.split(",")(1))) t.setString(2, u.split(",")(2)) } }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接 .withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集.. .withDriverName("com.mysql.jdbc.Driver") .withUsername("flink_test") .withPassword("flink_test") .build())) //定义Jdbc sink projectStream.addSink(JdbcSink.sink("insert into project (pid,sid,pro,score) values(?,?,?,?)", new JdbcStatementBuilder[String]() { override def accept(t: PreparedStatement, u: String): Unit = { t.setInt(1, Integer.parseInt(u.split(",")(1))) t.setInt(2, Integer.parseInt(u.split(",")(2))) t.setString(3, u.split(",")(3).toString) t.setInt(4, Integer.parseInt(u.split(",")(4))) } }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接 .withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集.. .withDriverName("com.mysql.jdbc.Driver") .withUsername("flink_test") .withPassword("flink_test") .build())) env.execute() } }
flink jdbc依赖
org.apache.flink flink-connector-jdbc_${scala.version}${flink.version} mysql mysql-connector-java5.1.47
flink 依赖
源码下载org.apache.flink flink-clients_${scala.version}${flink.version} org.apache.flink flink-scala_${scala.version}${flink.version} org.apache.flink flink-streaming-scala_${scala.version}${flink.version}
https://download.csdn.net/download/sinat_25528181/44038825
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)