Flink之Mysql数据
JDBC Connector <= 官方目前没有专门针对MySQL的,我们自己实现就好了
这里测试的是连接MySQL。
pom依赖(我本地docker里的mysql是8.0.19版本的)
mysql mysql-connector-java8.0.26
启动mysql服务(我集群启动mysql服务)
新建数据库
CREATE DATAbase `test` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
新建schema
CREATE TABLE `sensor_temp` ( `id` varchar(255) NOT NULL, `temp` double NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
编写java代码
package com.zch.apitest.sink; import com.zch.apitest.beans.SensorReading; import com.zch.apitest.source.SourceTest4_自定义; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class SinkTest3_JDBC { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // // 读取文件 // DataStreaminputStream = env.readTextFile("F:\JAVA\bigdata2107\zch\flink\src\main\resources\Sensor.txt"); // // SingleOutputStreamOperator dataStream = inputStream.map(lines -> { // String[] split = lines.split(","); // return new SensorReading(split[0], new Long(split[1]), new Double(split[2])); // }); // 使用之前编写的随机变动温度的SourceFunction来生成数据 DataStream dataStream = env.addSource(new SourceTest4_自定义.MySensorSource()); // 自定义sink到JDBC dataStream.addSink(new MyjdbcSink()); env.execute(); } // 实现自定义的SinkFunction public static class MyjdbcSink extends RichSinkFunction { // 声明连接和预编译语句 Connection connection = null; PreparedStatement insertStmt = null; PreparedStatement updateStmt = null; @Override public void open(Configuration parameters) throws Exception { connection = DriverManager.getConnection("jdbc:mysql://zhaohui01:3306/test","root","123456"); insertStmt = connection.prepareStatement("insert into sensor_temp (id,temp) values (?,?)"); updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?"); } // 每来一条数据,调用连接,执行sql @Override public void invoke(SensorReading value, Context context) throws Exception { // 直接执行插入,如果没有更新成功,就插入 updateStmt.setDouble(1,value.getTemperature()); updateStmt.setString(2,value.getId()); updateStmt.execute(); if (updateStmt.getUpdateCount() <= 0){ insertStmt.setString(1,value.getId()); insertStmt.setDouble(2,value.getTemperature()); insertStmt.execute(); } } @Override public void close() throws Exception { updateStmt.close(); insertStmt.close(); connection.close(); } } }
输出结果
运行Flink程序,查看MySQL数据(可以看到MySQL里的数据一直在变动)
mysql> SELECT * FROM sensor_temp; +-----------+--------------------+ | id | temp | +-----------+--------------------+ | sensor_3 | 20.489172407885917 | | sensor_10 | 73.01289164711463 | | sensor_4 | 43.402500895809744 | | sensor_1 | 6.894772325662007 | | sensor_2 | 101.79309911751122 | | sensor_7 | 63.070612021580324 | | sensor_8 | 63.82606628090501 | | sensor_5 | 57.67115738487047 | | sensor_6 | 50.84442627975055 | | sensor_9 | 52.58400793021675 | +-----------+--------------------+ 10 rows in set (0.00 sec) mysql> SELECt * FROM sensor_temp; +-----------+--------------------+ | id | temp | +-----------+--------------------+ | sensor_3 | 19.498209543035923 | | sensor_10 | 71.92981963197121 | | sensor_4 | 43.566017489470426 | | sensor_1 | 6.378208186786803 | | sensor_2 | 101.71010087830145 | | sensor_7 | 62.11402602179431 | | sensor_8 | 64.33196455020062 | | sensor_5 | 56.39071692662006 | | sensor_6 | 48.952784757264894 | | sensor_9 | 52.078086096436685 | +-----------+--------------------+ 10 rows in set (0.00 sec)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)