maven
1.8 1.8 1.8 UTF-8 UTF-8 2.12.10 3.0.0 UTF-8 org.scala-lang scala-library${scala.version} org.apache.spark spark-core_2.12${spark.version} org.apache.spark spark-sql_2.12${spark.version} org.apache.spark spark-streaming_2.12${spark.version} mysql mysql-connector-java5.1.49
java
package com.ls.sparkl; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import java.util.*; public class SparkMySqlToMySQL { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .appName("SparkMysql") .master("local[2]") .config("spark.sql.warehouse.dir", "file:///D:/GitCoder/Spark/file") .getOrCreate(); String url = "jdbc:mysql://127.0.0.1:3306/test_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC"; Mapoptions = new HashMap<>(); options.put("driver", "com.mysql.jdbc.Driver"); options.put("url", url); options.put("user", "root"); options.put("password", "xxxx"); options.put("dbtable", "user_test"); Dataset dataset = spark.read().format("jdbc").options(options).load(); dataset .show(); options.put("dbtable", "user_total_1"); dataset .write().format("jdbc").mode(SaveMode.Append).options(options).save(); spark.stop(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)