Flink Sink之JDBC

Flink Sink之JDBC,第1张

Flink Sink之JDBC 5.7.3 JDBC自定义sink

Flink之Mysql数据

JDBC Connector <= 官方目前没有专门针对MySQL的,我们自己实现就好了

这里测试的是连接MySQL。

    pom依赖(我本地docker里的mysql是8.0.19版本的)

    
        mysql
        mysql-connector-java
        8.0.26
    
    

    启动mysql服务(我集群启动mysql服务)

    新建数据库

    CREATE DATAbase `test` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
    

    新建schema

    CREATE TABLE `sensor_temp` (
      `id` varchar(255) NOT NULL,
      `temp` double NOT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    

    编写java代码

    package com.zch.apitest.sink;
    
    import com.zch.apitest.beans.SensorReading;
    import com.zch.apitest.source.SourceTest4_自定义;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    
    
    public class SinkTest3_JDBC {
        public static void main(String[] args) throws Exception{
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
    //        // 读取文件
    //        DataStream inputStream = env.readTextFile("F:\JAVA\bigdata2107\zch\flink\src\main\resources\Sensor.txt");
    //
    //        SingleOutputStreamOperator dataStream = inputStream.map(lines -> {
    //            String[] split = lines.split(",");
    //            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
    //        });
            // 使用之前编写的随机变动温度的SourceFunction来生成数据
            DataStream dataStream = env.addSource(new SourceTest4_自定义.MySensorSource());
    
            // 自定义sink到JDBC
            dataStream.addSink(new MyjdbcSink());
    
            env.execute();
        }
        // 实现自定义的SinkFunction
        public static class MyjdbcSink extends RichSinkFunction{
    
            // 声明连接和预编译语句
            Connection connection = null;
            PreparedStatement insertStmt = null;
            PreparedStatement updateStmt = null;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                connection = DriverManager.getConnection("jdbc:mysql://zhaohui01:3306/test","root","123456");
                insertStmt = connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)");
                updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?");
            }
    
            // 每来一条数据,调用连接,执行sql
            @Override
            public void invoke(SensorReading value, Context context) throws Exception {
    
                // 直接执行插入,如果没有更新成功,就插入
                updateStmt.setDouble(1,value.getTemperature());
                updateStmt.setString(2,value.getId());
                updateStmt.execute();
                if (updateStmt.getUpdateCount() <= 0){
                    insertStmt.setString(1,value.getId());
                    insertStmt.setDouble(2,value.getTemperature());
                    insertStmt.execute();
                }
            }
    
            @Override
            public void close() throws Exception {
                updateStmt.close();
                insertStmt.close();
                connection.close();
            }
        }
    }
    
    

      输出结果

      运行Flink程序,查看MySQL数据(可以看到MySQL里的数据一直在变动)

      mysql> SELECT * FROM sensor_temp;
      +-----------+--------------------+
      | id        | temp               |
      +-----------+--------------------+
      | sensor_3  | 20.489172407885917 |
      | sensor_10 |  73.01289164711463 |
      | sensor_4  | 43.402500895809744 |
      | sensor_1  |  6.894772325662007 |
      | sensor_2  | 101.79309911751122 |
      | sensor_7  | 63.070612021580324 |
      | sensor_8  |  63.82606628090501 |
      | sensor_5  |  57.67115738487047 |
      | sensor_6  |  50.84442627975055 |
      | sensor_9  |  52.58400793021675 |
      +-----------+--------------------+
      10 rows in set (0.00 sec)
      
      mysql> SELECt * FROM sensor_temp;
      +-----------+--------------------+
      | id        | temp               |
      +-----------+--------------------+
      | sensor_3  | 19.498209543035923 |
      | sensor_10 |  71.92981963197121 |
      | sensor_4  | 43.566017489470426 |
      | sensor_1  |  6.378208186786803 |
      | sensor_2  | 101.71010087830145 |
      | sensor_7  |  62.11402602179431 |
      | sensor_8  |  64.33196455020062 |
      | sensor_5  |  56.39071692662006 |
      | sensor_6  | 48.952784757264894 |
      | sensor_9  | 52.078086096436685 |
      +-----------+--------------------+
      10 rows in set (0.00 sec)
      

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5710317.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存