spark写入mysql和hive的方式

spark写入mysql和hive的方式,第1张

spark写入mysql和hive的方式 写入mysql和hive的方式
package com.ithhs.spark

import java.util.Properties

import org.apache.spark.sql.functions.{coalesce, from_unixtime, to_date, unix_timestamp}
import org.apache.spark.sql.{Column, Dataframe, SaveMode, SparkSession}

object JdbcDemo {

  def main(args: Array[String]): Unit = {
    //创建执行环境
    val spark: SparkSession = SparkSession.builder()
      //hive元数据
      .config("hive.metastore.uris", "thrift://192.168.80.81:9083")
      //hive存储路径
      .config("spark.sql.warehouse.dir", "hdfs://192.168.80.81:9000/user/hive/warehouse")
      //执行本地
      .master("local")
      //名称
      .appName("data")
      //必须加enableHiveSupport(),这样才可以往hive数据表写数据
      .enableHiveSupport()
      .getOrCreate()

    // 连接mysql方式1 建议使用该方式
    val properties = new Properties()
    properties.put("user", "root") //用户名
    properties.put("password", "password") // 密码
    val jdbc_url = "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8"//连接数据库

    val df1: Dataframe = spark.read.jdbc(jdbc_url, "(select * from test) tmp", properties) //jdbc

    //连接mysql方式2 不建议使用
    val df2: Dataframe = spark.read.format("jdbc")
      .option("url", "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8")//连接数据库
      //.option("dbtable", "(select a.*,b.cid,b.cname from province a left join city b on a.pid=b.pid) tmp")
      .option("dbtable", "(select * from tmail) tmp")//sql语句,也可以是表名
      .option("user", "root")//用户
      .option("password", "password")//密码
      .load()
      df2.show()

    //连接mysql方式3
    val mapJdbc = Map(
      "url" -> "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8", //连接数据库
      "user" -> "root", //用户
      "password" -> "password", //密码
      "dbtable" -> "(select * from user)as a" //按要求写sql语句
    )  
    //读取mysql数据
    val df3: Dataframe = spark.read.format("jdbc")
      .options(mapJdbc).load()
      df3.show()

    //写入hive数据库的 *** 作,Overwrite覆盖 Append追加,ErrorIfExists存在保报错,Ignore:如果存在就忽略
    df3.write.mode(SaveMode.Append).format("hive").saveAsTable("web.users")//
    //动态分区
    //因为要做动态分区, 所以要先设定partition参数
    //由于default是false, 需要额外下指令打开这个开关
    spark sql ("set hive.exec.dynamic.partition=true")
    spark sql ("set hive.exec.dynamic.partition.mode=nostrick")
    //指定分区字段到分区表中
    df3.write.mode(SaveMode.Append).format("hive").partitionBy("bt").saveAsTable("gdcmxy.users")
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5690400.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存