基于select语句的Flink-CDC 适用于数据同步的全量同步的场景,可以结合 Azkaban 或者dolphin scheduler 做定时调度 T+1 数据同步。
1、maven
1.13.6
2.11
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
org.apache.flink
flink-runtime-web_${scala.binary.version}
${flink.version}
com.alibaba.ververica
flink-connector-mysql-cdc
1.4.0
org.projectlombok
lombok
1.18.20
com.alibaba
fastjson
1.2.75
org.apache.maven.plugins
maven-shade-plugin
3.1.0
false
package
shade
com.flink.cdc.demo.MysqlCdcMysql
reference.conf
*:*:*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
org.apache.maven.plugins
maven-compiler-plugin
6
2、MysqlReader
package com.flink.cdc.demo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class MysqlReader extends RichSourceFunction> {
private Connection connection = null;
private PreparedStatement ps = null;
//该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
ps = connection.prepareStatement("select id,username,password from flink_cdc_test.t_test");
}
@Override
public void run(SourceContext> sourceContext) throws Exception {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
Tuple3 tuple = new Tuple3();
tuple.setFields(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3));
sourceContext.collect(tuple);
}
}
@Override
public void cancel() {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、MysqlWriter
package com.flink.cdc.demo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MysqlWriter extends RichSinkFunction> {
private Connection connection = null;
private PreparedStatement ps = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (connection == null) {
Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
}
ps = connection.prepareStatement("insert into ods_flink_cdc_test.ods_t_test values (?,?,?)");
System.out.println("完成");
}
@Override
public void invoke(Tuple3 value, Context context) throws Exception {
//获取JdbcReader发送过来的结果
try {
ps.setInt(1, value.f0);
ps.setString(2, value.f1);
ps.setString(3, value.f2);
ps.executeUpdate();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
super.close();
if (ps != null) {
ps.close();
}
if (connection != null) {
connection.close();
}
super.close();
}
}
4、主类MysqlCdcMysql
package com.flink.cdc.demo;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class MysqlCdcMysql {
public static void main(String[] args) throws Exception {
// ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost",8081,"D:\\flink-steven\\target\\flink-0.0.1-SNAPSHOT.jar");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "D:\\flink-steven\\target\\flink-0.0.1-SNAPSHOT.jar");
// 最好不要在程序设置并行度 如果设置了8 要保证安装的flink配置里面的parallelism这个参数大于8 不然会导致资源异常
// env.setParallelism(8);
DataStreamSource> dataStream = env.addSource(new MysqlReader());
dataStream.print();
dataStream.addSink(new MysqlWriter());
env.execute("Flink cost MySQL data to write MySQL");
}
}
5、本地运行
6、打成jar包进行上传
注意:flink版本要和maven里的版本一致 scala版本也要保持一致
7、运行欢迎分享,转载请注明来源:内存溢出
评论列表(0条)