Spark-mysql-to-mysql

Spark-mysql-to-mysql,第1张

Spark-mysql-to-mysql

maven


       1.8
        1.8
        1.8
        UTF-8
        UTF-8
        2.12.10
        3.0.0
        UTF-8
    

    
        
            org.scala-lang
            scala-library
            ${scala.version}
        
        
            org.apache.spark
            spark-core_2.12
            ${spark.version}
        
        
            org.apache.spark
            spark-sql_2.12
            ${spark.version}
        
        
            org.apache.spark
            spark-streaming_2.12
            ${spark.version}
        
        
            mysql
            mysql-connector-java
            5.1.49
        
    

java

package com.ls.sparkl;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.*;

public class SparkMySqlToMySQL {

    public static void main(String[] args) {

        SparkSession spark = SparkSession
                .builder()
                .appName("SparkMysql")
                .master("local[2]")
                .config("spark.sql.warehouse.dir", "file:///D:/GitCoder/Spark/file")
                .getOrCreate();



        String url = "jdbc:mysql://127.0.0.1:3306/test_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";

        Map options = new HashMap<>();
        options.put("driver", "com.mysql.jdbc.Driver");
        options.put("url", url);
        options.put("user", "root");
        options.put("password", "xxxx");
        options.put("dbtable", "user_test");


        Dataset dataset = spark.read().format("jdbc").options(options).load();
       
        dataset .show();


        options.put("dbtable", "user_total_1");
        dataset .write().format("jdbc").mode(SaveMode.Append).options(options).save();


        spark.stop();

    }

    
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575989.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存