package com.ithhs.spark import java.util.Properties import org.apache.spark.sql.functions.{coalesce, from_unixtime, to_date, unix_timestamp} import org.apache.spark.sql.{Column, Dataframe, SaveMode, SparkSession} object JdbcDemo { def main(args: Array[String]): Unit = { //创建执行环境 val spark: SparkSession = SparkSession.builder() //hive元数据 .config("hive.metastore.uris", "thrift://192.168.80.81:9083") //hive存储路径 .config("spark.sql.warehouse.dir", "hdfs://192.168.80.81:9000/user/hive/warehouse") //执行本地 .master("local") //名称 .appName("data") //必须加enableHiveSupport(),这样才可以往hive数据表写数据 .enableHiveSupport() .getOrCreate() // 连接mysql方式1 建议使用该方式 val properties = new Properties() properties.put("user", "root") //用户名 properties.put("password", "password") // 密码 val jdbc_url = "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8"//连接数据库 val df1: Dataframe = spark.read.jdbc(jdbc_url, "(select * from test) tmp", properties) //jdbc //连接mysql方式2 不建议使用 val df2: Dataframe = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8")//连接数据库 //.option("dbtable", "(select a.*,b.cid,b.cname from province a left join city b on a.pid=b.pid) tmp") .option("dbtable", "(select * from tmail) tmp")//sql语句,也可以是表名 .option("user", "root")//用户 .option("password", "password")//密码 .load() df2.show() //连接mysql方式3 val mapJdbc = Map( "url" -> "jdbc:mysql://192.168.80.81:3306/test?useUnicode=true&characterEncoding=utf-8", //连接数据库 "user" -> "root", //用户 "password" -> "password", //密码 "dbtable" -> "(select * from user)as a" //按要求写sql语句 ) //读取mysql数据 val df3: Dataframe = spark.read.format("jdbc") .options(mapJdbc).load() df3.show() //写入hive数据库的 *** 作,Overwrite覆盖 Append追加,ErrorIfExists存在保报错,Ignore:如果存在就忽略 df3.write.mode(SaveMode.Append).format("hive").saveAsTable("web.users")// //动态分区 //因为要做动态分区, 所以要先设定partition参数 //由于default是false, 需要额外下指令打开这个开关 spark sql ("set hive.exec.dynamic.partition=true") spark sql ("set hive.exec.dynamic.partition.mode=nostrick") //指定分区字段到分区表中 df3.write.mode(SaveMode.Append).format("hive").partitionBy("bt").saveAsTable("gdcmxy.users") } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)