Flink实战-(1)Flink-CDC MySQL同步到MySQL(select)

Flink实战-(1)Flink-CDC MySQL同步到MySQL(select),第1张

背景

基于select语句的Flink-CDC 适用于数据同步的全量同步的场景,可以结合 Azkaban 或者dolphin scheduler 做定时调度 T+1 数据同步。

1、maven
   
        1.13.6
        2.11
    

    
        
            org.apache.flink
            flink-java
            ${flink.version}
        

        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-runtime-web_${scala.binary.version}
            ${flink.version}
        

        
            com.alibaba.ververica
            flink-connector-mysql-cdc
            1.4.0
        

        
            org.projectlombok
            lombok
            1.18.20
        

        
            com.alibaba
            fastjson
            1.2.75
        
    

    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.1.0
                
                    false
                
                
                    
                        package
                        
                            shade
                        

                        
                            

                                
                                    
                                    com.flink.cdc.demo.MysqlCdcMysql
                                
                                
                                    reference.conf
                                
                            
                            
                                
                                    *:*:*:*
                                    
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                    
                                
                            
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    6
                    6
                
            
        
    
2、MysqlReader
package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class MysqlReader extends RichSourceFunction> {

    private Connection connection = null;
    private PreparedStatement ps = null;


    //该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
        connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
        ps = connection.prepareStatement("select id,username,password from flink_cdc_test.t_test");
    }


    @Override
    public void run(SourceContext> sourceContext) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            Tuple3 tuple = new Tuple3();
            tuple.setFields(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3));
            sourceContext.collect(tuple);
        }
    }

    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3、MysqlWriter
package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;



public class MysqlWriter extends RichSinkFunction> {
    private Connection connection = null;
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (connection == null) {
            Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
            connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
        }

        ps = connection.prepareStatement("insert into ods_flink_cdc_test.ods_t_test values (?,?,?)");
        System.out.println("完成");
    }

    @Override
    public void invoke(Tuple3 value, Context context) throws Exception {
        //获取JdbcReader发送过来的结果
        try {
            ps.setInt(1, value.f0);
            ps.setString(2, value.f1);
            ps.setString(3, value.f2);
            ps.executeUpdate();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (ps != null) {
            ps.close();
        }
        if (connection != null) {
            connection.close();
        }
        super.close();
    }
}
4、主类MysqlCdcMysql
package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MysqlCdcMysql {
    public static void main(String[] args) throws Exception {
//        ExecutionEnvironment env  =  ExecutionEnvironment.createRemoteEnvironment("localhost",8081,"D:\\flink-steven\\target\\flink-0.0.1-SNAPSHOT.jar");

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "D:\\flink-steven\\target\\flink-0.0.1-SNAPSHOT.jar");
// 最好不要在程序设置并行度 如果设置了8 要保证安装的flink配置里面的parallelism这个参数大于8 不然会导致资源异常
//        env.setParallelism(8);
        DataStreamSource> dataStream = env.addSource(new MysqlReader());
        dataStream.print();
        dataStream.addSink(new MysqlWriter());
        env.execute("Flink cost MySQL data to write MySQL");
    }
}
5、本地运行

6、打成jar包进行上传

注意:flink版本要和maven里的版本一致 scala版本也要保持一致 

7、运行

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/719965.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存