涉及到scala读、写、改mysql的一些 *** 作,记录代码,备查。
读读mysql一般有两种 *** 作:
- 整张表load进来
- 通过执行sql,load部分数据进来
import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp} import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession} def loadMysqlAsDf(sparkSession: SparkSession, tableName: String): Dataframe = { val tb_name = tableName val jdbcDF = sparkSession.read.format("jdbc") .option("url", Constant.mysqlPath) .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", tb_name) .option("user", Constant.mysqlUser) .option("password", Constant.mysqlPassword).load() jdbcDF.show() println(jdbcDF.count()) jdbcDF } def executeMysqlAsDf(sparkSession: SparkSession, sql: String): Dataframe = { val jdbcDF = sparkSession.read.format("jdbc") .option("url", Constant.mysqlPath) .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", s"( $sql ) temp") .option("user", Constant.mysqlUser) .option("password", Constant.mysqlPassword).load() jdbcDF.show() println(jdbcDF.count()) jdbcDF }
executeMysqlAsDf函数的sql参数就是要执行的sql
写写没啥好说的,就是不同的表要单独写,代码复用性差些。不过,效率比df.write要高,重复打开关闭数据库链接很耗时。
def insert2Mysql(sparkSession: SparkSession, mysqlTable: String): Unit = { val mysqlPath = Constant.mysqlPath val mysqlUser = Constant.mysqlUser val mysqlPassword = Constant.mysqlPassword val spark = CommonUtils.getOrCreate(this.getClass.getName) val mySqlTable = "mysql_test" import spark.implicits._ val sql = s""" |SELECt id, | name , | female |FROM hive_test |""".stripMargin val df = spark.sql(sql) df.show(truncate = false) classOf[com.mysql.jdbc.Driver] val mysql_fullUrl = s"${mysqlPath}?user=${mysqlUser}&password=${mysqlPassword}&autoReconnect=true&rewriteBatchedStatements=true" df.rdd.map(x => { val name = x.getAs[String]("name") val female = x.getAs[String]("female") (name, female) }).coalesce(10) .cache() .foreachPartition( x => { val conn = DriverManager.getConnection(mysql_fullUrl) val statement = conn.prepareStatement(s"insert into ${mySqlTable}(female,name) values(?,?) on duplicate key update female=values(female),name=values(name)") x.foreach(record => { statement.setString(1, record._1) statement.setString(2, record._2) statement.addBatch() }) statement.executeBatch() statement.close() conn.close() } ) }改
update和insert差别不大
def updateMysqlRows(spark: SparkSession, tableName: String): Unit = { val mysqlPath = Constant.mysqlPath val mysqlUser = Constant.mysqlUser val mysqlPassword = Constant.mysqlPassword val mySqlTable = "mysql_test" import spark.implicits._ val sql = s""" |SELECT id, | name , | female |FROM hive_test |""".stripMargin val df = spark.sql(sql) df.show(truncate = false) classOf[com.mysql.jdbc.Driver] val mysql_fullUrl = s"${mysqlPath}?user=${mysqlUser}&password=${mysqlPassword}&autoReconnect=true&rewriteBatchedStatements=true" df.rdd.map(x => { val name = x.getAs[String]("name") val female = x.getAs[String]("female") (name, female) }).coalesce(10) .cache() .foreachPartition( x => { val conn = DriverManager.getConnection(mysql_fullUrl) val statement = conn.prepareStatement(s"UPDATE ${mySqlTable} SET female=? WHERe name = ? ") x.foreach(record => { statement.setString(1, record._1) statement.setString(2, record._2) statement.addBatch() }) statement.executeBatch() statement.close() conn.close() } ) }
2021-10-19 于南京市江宁区九龙湖
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)