Flink Sink Mysql 简单实现

Flink Sink Mysql 简单实现,第1张

Flink Sink Mysql 简单实现

Flink Sink Mysql 简单实现
  • 环境
  • 解析
  • 完整代码
  • 源码下载

环境
简单实现从一个数据源获取数据,根据特定标识字段,将数据sink到不同mysql表中。
组件版本scala2.12netcat*kafka*flink1.13.3 解析
创建环境、输入流,从socket获取输入数据
	val env = StreamExecutionEnvironment.getExecutionEnvironment
	val inputStream = env.socketTextStream("server120", 9999)
测试表、测试数据。
将stu开头的数据插入到stu表,project开头的表,插入到project表。
--测试表
create table stu(sid int,name varchar(25))charset=utf8;
create table project(pid int,sid int,pro varchar(25),score int)charset=utf8;
--测试数据
stu,1001,张三
stu,1002,李四
project,10001,1001,语文,90
project,10002,1002,语文,99
project,10003,1001,数学,79
project,10004,1002,数学,120
加工数据,根据数据的第一个字符串,将数据拆分到不同的侧输出流中
    val resultStream = inputStream
      .filter(x => {
        x != null && !"".equals(x)
      })//filter 算子,过滤为空数据
      .map(v => {
        v
      })//map算子,未进行任何处理
      .process(new ProcessFunction[String, String]() {//process算子,定义输入输出类型为String
        override def processElement(v: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
          val vs: Array[String] = v.split(",")//根据逗号分割
          vs(0).toString().toLowerCase match {
            case "stu" => {//第一个值匹配stu,走这个逻辑
              ctx.output(new OutputTag[String]("stu"), v)
            }
            case "project" => {//第一个值匹配project,走这个逻辑
              ctx.output(new OutputTag[String]("project"), v)
            }
          }
        }
      })
获取侧输出流
    val stu = new OutputTag[String]("stu")
    val project = new OutputTag[String]("project")
    //获取侧输出流
    val stuStream = resultStream.getSideOutput(stu)
    val projectStream = resultStream.getSideOutput(project)
定义stu数据流的sink
//定义Jdbc sink
    stuStream.addSink(JdbcSink.sink("insert into stu (sid,name) values(?,?)",
      new JdbcStatementBuilder[String]() {
        override def accept(t: PreparedStatement, u: String): Unit = {
          t.setInt(1, Integer.parseInt(u.split(",")(1)))
          t.setString(2, u.split(",")(2))
        }
      },
      JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接
        .withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集..
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("flink_test")
        .withPassword("flink_test")
        .build()))
定义project流的sink
//定义Jdbc sink
    projectStream.addSink(JdbcSink.sink("insert into project (pid,sid,pro,score) values(?,?,?,?)",
      new JdbcStatementBuilder[String]() {
        override def accept(t: PreparedStatement, u: String): Unit = {
          t.setInt(1, Integer.parseInt(u.split(",")(1)))
          t.setInt(2, Integer.parseInt(u.split(",")(2)))
          t.setString(3, u.split(",")(3).toString)
          t.setInt(4, Integer.parseInt(u.split(",")(4)))
        }
      },
      JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接
        .withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集..
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("flink_test")
        .withPassword("flink_test")
        .build()))
阻塞进程,等待数据
    env.execute()
完整代码
package com.z.sink

import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.connector.jdbc.{JdbcConnectionOptions, JdbcExecutionOptions, JdbcSink, JdbcStatementBuilder}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import java.sql.PreparedStatement


object SocketSinkMysql {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputStream = env.socketTextStream("server120", 9999)
    
    val resultStream = inputStream
      .filter(x => {
        x != null && !"".equals(x)
      })//filter 算子,过滤为空数据
      .map(v => {
        v
      })//map算子,未进行任何处理
      .process(new ProcessFunction[String, String]() {//process算子,定义输入输出类型为String
        override def processElement(v: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
          val vs: Array[String] = v.split(",")//根据逗号分割
          vs(0).toString().toLowerCase match {
            case "stu" => {//第一个值匹配stu,走这个逻辑
              ctx.output(new OutputTag[String]("stu"), v)
            }
            case "project" => {//第一个值匹配project,走这个逻辑
              ctx.output(new OutputTag[String]("project"), v)
            }
          }
        }
      })
    //定义侧输出流
    val stu = new OutputTag[String]("stu")
    val project = new OutputTag[String]("project")
    //获取侧输出流
    val stuStream = resultStream.getSideOutput(stu)
    val projectStream = resultStream.getSideOutput(project)
    //定义Jdbc sink
    stuStream.addSink(JdbcSink.sink("insert into stu (sid,name) values(?,?)",
      new JdbcStatementBuilder[String]() {
        override def accept(t: PreparedStatement, u: String): Unit = {
          t.setInt(1, Integer.parseInt(u.split(",")(1)))
          t.setString(2, u.split(",")(2))
        }
      },
      JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接
        .withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集..
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("flink_test")
        .withPassword("flink_test")
        .build()))
    //定义Jdbc sink
    projectStream.addSink(JdbcSink.sink("insert into project (pid,sid,pro,score) values(?,?,?,?)",
      new JdbcStatementBuilder[String]() {
        override def accept(t: PreparedStatement, u: String): Unit = {
          t.setInt(1, Integer.parseInt(u.split(",")(1)))
          t.setInt(2, Integer.parseInt(u.split(",")(2)))
          t.setString(3, u.split(",")(3).toString)
          t.setInt(4, Integer.parseInt(u.split(",")(4)))
        }
      },
      JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(5)
        .build(),
      new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()//定义数据库连接
        .withUrl("jdbc:mysql://server120:3306/flink_test?characterEncoding=utf8") //必须加字符集..
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("flink_test")
        .withPassword("flink_test")
        .build()))
    env.execute()
  }
}

flink jdbc依赖


        
            org.apache.flink
            flink-connector-jdbc_${scala.version}
            ${flink.version}
        
        
            mysql
            mysql-connector-java
            5.1.47
        
    

flink 依赖


        
            org.apache.flink
            flink-clients_${scala.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-scala_${scala.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_${scala.version}
            ${flink.version}
        
    
源码下载

https://download.csdn.net/download/sinat_25528181/44038825

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5573217.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存