在公司中,遇到这样一个业务,需要将数据库A从oracle迁移到pg数据库,原本让实习生去实现了这样的一个工具,但是最后他写出来的工具存在较大问题。
正好最近在学习spark、flink等流式处理框架,那么我们就用flink来处理这样一个需求吧:
1、主类:package com.ogj.flink; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink; import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.types.Row; import java.util.concurrent.TimeUnit; public class DbMove { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource2、数据库配置类dataSource = env.createInput( JDBCInputFormat.buildJDBCInputFormat() .setDBUrl(DBContent.SourceDB.url) .setDrivername(DBContent.PGDRIVER) .setUsername(DBContent.SourceDB.username) .setPassword(DBContent.SourceDB.password) .setQuery("select task_name,file_path from cloud_task") .setRowTypeInfo( new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ) ) .finish() ); dataSource.output( JDBCOutputFormat.buildJDBCOutputFormat() .setDBUrl(DBContent.DstDB.url) .setDrivername(DBContent.PGDRIVER) .setUsername(DBContent.DstDB.username) .setPassword(DBContent.DstDB.password) .setQuery("insert into test(task_name,file_path) values(?,?)") .finish() ); env.execute("db move"); System.out.println("写入数据中"); TimeUnit.SECONDS.sleep(5); //查询出来 DataSource
read = env.createInput( JDBCInputFormat.buildJDBCInputFormat() .setDBUrl(DBContent.DstDB.url) .setDrivername(DBContent.PGDRIVER) .setUsername(DBContent.DstDB.username) .setPassword(DBContent.DstDB.password) .setQuery("select task_name,file_path from test") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO)) .finish() ); System.out.println("read dst dataSource"); read.print(); System.out.println("=========end=========="); } }
package com.ogj.flink; public class DBContent { public static final String MYSQLDRIVER = "com.mysql.jdbc.Driver"; public static final String PGDRIVER = "org.postgresql.Driver"; public static class SourceDB { public static String url = "jdbc:postgresql://127.0.0.1:5432/dmt_url?currentSchema=schema_name"; public static String username = "postgres"; public static String password = "123456"; } public static class DstDB { public static String url = "jdbc:postgresql://127.0.0.1:5432/RequestMonitor"; public static String username = "postgres"; public static String password = "123456"; } }3、maven打包配置:
net.alchim31.maven scala-maven-plugin3.2.2 compile testCompile org.apache.maven.plugins maven-compiler-plugin3.0 8 8 UTF-8 compile compile org.apache.maven.plugins maven-shade-plugin2.4.3 package shade *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA reference.conf cn.itcast.rpc.Master
运行完毕就是这样的啦:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)