使用flink迁移数据

使用flink迁移数据,第1张

使用flink迁移数据

在公司中,遇到这样一个业务,需要将数据库A从oracle迁移到pg数据库,原本让实习生去实现了这样的一个工具,但是最后他写出来的工具存在较大问题。

正好最近在学习spark、flink等流式处理框架,那么我们就用flink来处理这样一个需求吧:

1、主类:
package com.ogj.flink;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;


import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

import java.util.concurrent.TimeUnit;


public class DbMove {
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource dataSource = env.createInput(
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(DBContent.SourceDB.url)
                        .setDrivername(DBContent.PGDRIVER)
                        .setUsername(DBContent.SourceDB.username)
                        .setPassword(DBContent.SourceDB.password)
                        .setQuery("select task_name,file_path from cloud_task")
                        .setRowTypeInfo(
                                new RowTypeInfo(
                                        BasicTypeInfo.STRING_TYPE_INFO,
                                        BasicTypeInfo.STRING_TYPE_INFO
                                )
                        )
                        .finish()
        );
        dataSource.output(
                JDBCOutputFormat.buildJDBCOutputFormat()
                        .setDBUrl(DBContent.DstDB.url)
                        .setDrivername(DBContent.PGDRIVER)
                        .setUsername(DBContent.DstDB.username)
                        .setPassword(DBContent.DstDB.password)
                        .setQuery("insert into test(task_name,file_path) values(?,?)")
                        .finish()
        );

        env.execute("db move");

        System.out.println("写入数据中");
        TimeUnit.SECONDS.sleep(5);
        //查询出来
        DataSource read = env.createInput(
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(DBContent.DstDB.url)
                        .setDrivername(DBContent.PGDRIVER)
                        .setUsername(DBContent.DstDB.username)
                        .setPassword(DBContent.DstDB.password)
                        .setQuery("select task_name,file_path from test")
                        .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO))
                        .finish()
        );

        System.out.println("read dst dataSource");

        read.print();

        System.out.println("=========end==========");
    }
}

2、数据库配置
package com.ogj.flink;

public class DBContent {

    public static final String MYSQLDRIVER = "com.mysql.jdbc.Driver";
    public static final String PGDRIVER = "org.postgresql.Driver";

    public static class SourceDB {

        public static String url = "jdbc:postgresql://127.0.0.1:5432/dmt_url?currentSchema=schema_name";
        public static String username = "postgres";
        public static String password = "123456";

    }

    public static class DstDB {
        public static String url = "jdbc:postgresql://127.0.0.1:5432/RequestMonitor";
        public static String username = "postgres";
        public static String password = "123456";
    }
}

3、maven打包配置:





        
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
            






















            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.0
                
                    8
                    8
                    UTF-8
                
                
                    
                        compile
                        
                            compile
                        
                    
                
            


















            
            
                org.apache.maven.plugins
                maven-shade-plugin
                2.4.3
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    *:*
                                    
                                        meta-INF/*.SF
                                        meta-INF/*.DSA
                                        meta-INF/*.RSA
                                    
                                
                            
                            
                                
                                    reference.conf
                                
                                
                                    cn.itcast.rpc.Master 
                                
                            
                        
                    
                
            
        
    

运行完毕就是这样的啦:

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5717969.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存