S抽取Mysql

S抽取Mysql,第1张

S抽取Mysql sq1.0 用Scala抽取Mysql指定数据到Hive的ODS层的指定分区表中
object demo_extractjob {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val sparkBuilder = SparkSession.builder()
    if ((args.length > 0 && args(0).equals("local")) || args.length == 0) {
      sparkBuilder.master("local[*]")
    }
    val spark = sparkBuilder.appName("demo_extractjob")
      .enableHiveSupport()
      .getOrCreate()

    
    spark.read.format("jdbc")
      .option("url", "jdbc:mysql://slave2:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false")  //mysql url
      .option("driver", "com.mysql.jdbc.Driver") //mysql driver
      .option("user","root") //mysql user
      .option("password", "123456") //mysql password
      .option("dbtable", "students").load().createTempView("mysql_table1")

    spark.sql("select * from mysql_table1").show()


    spark.sql("show databases").show()
    //插入hive表
    spark.sql(
      s"""
         |insert overwrite table test.students_01 partition (city='henan')
         |select * from mysql_table1
      """.stripMargin)
  }
}
2.0 数据清洗-去重
  1. 复制表结构

    CREATE TABLE  like 
    
  2. 插入去重后的数据

    insert overwrite table (
    select t.name,t.score,t.type
    from(
    	select
        	name,score,type,row_number() over(distribute by name sort by score) as rn
        from 
      ) t where t.rn = 1
    );
    
  3. 总结

    insert overwrite table  (
    select <字段>
    from (
    select <字段>, row_number() over(distribute by <有重复的字段> sort by <重复字段的排列根据字段>) as rn
    from 
    ) t where t.rn6=1
    );
    
2.1 数据清洗-缺失字段填充
   |select t1.name,t1.rk from(
   |select *,NVL(comment,-1) rk from region )t1
3.0 离线数据统计 3.1 统计销售量Top5的数据
select *,
row_number() over(partition by subject order by score desc),--不并列排名
rank() over(partition by subject order by score desc),--并列空位排名
dense_rank() over(partition by subject order by score desc)--并列不空位
from score;
select * from 
(select *,row_number() over(partition by subject order by score desc) rmp from score
) t where t.rmp<=3;
3.2 统计某月的总销售额
select *,count(name) over() as total from business where substr(orderdate,1,7)='2017-04';
3.3 统计指定几个月的销售额
select *,sum(cost) over(partition by name,substr(orderdate,1,7)) total_amount from business;
3.4 保存到Mysql中
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("data_from_ods_to_mysql")
val spark=SparkSession.builder().config(sparkConf)
  .config("hive.metastore.uris", "thrift://192.168.1.101:9083")//指定hive的metastore服务地址
  .config("spark.sql.warehouse.dir", "hdfs://192.168.1.100:50070//usr/hive_remote/warehouse")//指定hive在hdfs上的warehouse仓库地址
  .enableHiveSupport()
  .getOrCreate()
spark.sql("use jsj")

val data = spark.sql("select * from hive_test")

data.show()
data.write
  .format("JDBC")
  .option("url", "jdbc:mysql://192.168.1.102:3306/student?useSSL=false")
      .option("driver", "com.mysql.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .option("dbtable", "student2")
      .mode(SaveMode.Append)
      .save()
println("ok")

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5679138.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存