Scala112-scala和mysql的交互

Scala112-scala和mysql的交互,第1张

Scala112-scala和mysql的交互

  涉及到scala读、写、改mysql的一些 *** 作,记录代码,备查。

读mysql一般有两种 *** 作:

  • 整张表load进来
  • 通过执行sql,load部分数据进来
import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}

def loadMysqlAsDf(sparkSession: SparkSession, tableName: String): Dataframe = {
    val tb_name = tableName
    val jdbcDF = sparkSession.read.format("jdbc")
      .option("url", Constant.mysqlPath)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("dbtable", tb_name)
      .option("user", Constant.mysqlUser)
      .option("password", Constant.mysqlPassword).load()
    jdbcDF.show()
    println(jdbcDF.count())
    jdbcDF
  }

 def executeMysqlAsDf(sparkSession: SparkSession, sql: String): Dataframe = {
    val jdbcDF = sparkSession.read.format("jdbc")
      .option("url", Constant.mysqlPath)
      .option("driver", "com.mysql.jdbc.Driver")
      .option("dbtable", s"( $sql ) temp")
      .option("user", Constant.mysqlUser)
      .option("password", Constant.mysqlPassword).load()
    jdbcDF.show()
    println(jdbcDF.count())
    jdbcDF


  }

executeMysqlAsDf函数的sql参数就是要执行的sql

写没啥好说的,就是不同的表要单独写,代码复用性差些。不过,效率比df.write要高,重复打开关闭数据库链接很耗时。

def insert2Mysql(sparkSession: SparkSession, mysqlTable: String): Unit = {
    val mysqlPath = Constant.mysqlPath
    val mysqlUser = Constant.mysqlUser
    val mysqlPassword = Constant.mysqlPassword
    val spark = CommonUtils.getOrCreate(this.getClass.getName)
    val mySqlTable = "mysql_test"
    import spark.implicits._
    val sql =
      s"""
         |SELECt id,
         |       name ,
         |       female
         |FROM hive_test
         |""".stripMargin
    val df = spark.sql(sql)
    df.show(truncate = false)
    classOf[com.mysql.jdbc.Driver]
    val mysql_fullUrl = s"${mysqlPath}?user=${mysqlUser}&password=${mysqlPassword}&autoReconnect=true&rewriteBatchedStatements=true"
    df.rdd.map(x => {
      val name = x.getAs[String]("name")
      val female = x.getAs[String]("female")
      (name, female)
    }).coalesce(10)
      .cache()
      .foreachPartition(
        x => {
          val conn = DriverManager.getConnection(mysql_fullUrl)
          val statement = conn.prepareStatement(s"insert into ${mySqlTable}(female,name) values(?,?) on duplicate key update female=values(female),name=values(name)")
          x.foreach(record => {
            statement.setString(1, record._1)
            statement.setString(2, record._2)
            statement.addBatch()
          })
          statement.executeBatch()
          statement.close()
          conn.close()
        }
      )
  }

update和insert差别不大

 def updateMysqlRows(spark: SparkSession, tableName: String): Unit = {
    val mysqlPath = Constant.mysqlPath
    val mysqlUser = Constant.mysqlUser
    val mysqlPassword = Constant.mysqlPassword
    val mySqlTable = "mysql_test"
    import spark.implicits._
    val sql =
      s"""
         |SELECT id,
         |       name ,
         |       female
         |FROM hive_test
         |""".stripMargin
    val df = spark.sql(sql)
    df.show(truncate = false)
    classOf[com.mysql.jdbc.Driver]
    val mysql_fullUrl = s"${mysqlPath}?user=${mysqlUser}&password=${mysqlPassword}&autoReconnect=true&rewriteBatchedStatements=true"
    df.rdd.map(x => {
      val name = x.getAs[String]("name")
      val female = x.getAs[String]("female")
      (name, female)
    }).coalesce(10)
      .cache()
      .foreachPartition(
        x => {
          val conn = DriverManager.getConnection(mysql_fullUrl)
          val statement = conn.prepareStatement(s"UPDATE ${mySqlTable} SET female=? WHERe name = ? ")
          x.foreach(record => {
            statement.setString(1, record._1)
            statement.setString(2, record._2)
            statement.addBatch()
          })
          statement.executeBatch()
          statement.close()
          conn.close()
        }
      )
  }

                        2021-10-19 于南京市江宁区九龙湖

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4007300.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-22
下一篇 2022-10-22

发表评论

登录后才能评论

评论列表(0条)

保存