在启动spark-shell时候,必须指定 jar 和 driver类路径,这两个路径是一致的,都是mysql驱动的路径。
然后选择对应的jar包
2.2读取数据import org.apache.spark.sql.SparkSession object sparkSQLTestJDBC { def main(args: Array[String]): Unit = { //create spark commander val spark = SparkSession.builder().master("local").appName("sparkSQLTest").getOrCreate() //read val studentDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://10.10.10.10:3306/dbName?serverTimezone=GMT") //IP and db name .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", "student") //table name .option("user", "root") //user name .option("password", "123456") //password .load() //show studentDF.show() } }2.3写入数据
写入数据跟编程方式定义RDD一样
//for create table schema import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.Row //for save connection info import java.util.Properties object sparkSQLTestJDBCWrite { def main(args: Array[String]): Unit = { //create spark commander val spark = SparkSession.builder().appName("sparkSQLTest").master("local").getOrCreate() //create table schema val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("score", IntegerType, true), StructField("course", StringType, true) )) //generate the insert data val insertRDD: RDD[String] = spark.sparkContext.parallelize(Array("6 墨子 66 地理", "7 孔子 86 英语", "8 非子 96 化学")) //split the rdd and into rows val insertDataDF = insertRDD.map(_.split(" ")) .map(x => Row(x(0).trim.toInt, x(1), x(2).trim.toInt, x(3))) //merge val dataframe = spark.createDataframe(insertDataDF, schema) //for restore the connection info val prop = new Properties() prop.put("user","bigdata") prop.put("password","Bigdata2021") prop.put("driver", "com.mysql.jdbc.Driver") //write table dataframe.write.mode("append") .jdbc("jdbc:mysql://10.253.38.9:3306/finereport?serverTimezone=GMT","student",prop) } }
结果校验:
ps:在指定字段数据类型时候,注意包的选择
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)