object demo_extractjob { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root") val sparkBuilder = SparkSession.builder() if ((args.length > 0 && args(0).equals("local")) || args.length == 0) { sparkBuilder.master("local[*]") } val spark = sparkBuilder.appName("demo_extractjob") .enableHiveSupport() .getOrCreate() spark.read.format("jdbc") .option("url", "jdbc:mysql://slave2:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false") //mysql url .option("driver", "com.mysql.jdbc.Driver") //mysql driver .option("user","root") //mysql user .option("password", "123456") //mysql password .option("dbtable", "students").load().createTempView("mysql_table1") spark.sql("select * from mysql_table1").show() spark.sql("show databases").show() //插入hive表 spark.sql( s""" |insert overwrite table test.students_01 partition (city='henan') |select * from mysql_table1 """.stripMargin) } }2.0 数据清洗-去重
-
复制表结构
CREATE TABLE
like -
插入去重后的数据
insert overwrite table
( select t.name,t.score,t.type from( select name,score,type,row_number() over(distribute by name sort by score) as rn from ) t where t.rn = 1 ); -
总结
insert overwrite table
( select <字段> from ( select <字段>, row_number() over(distribute by <有重复的字段> sort by <重复字段的排列根据字段>) as rn from ) t where t.rn6=1 );
|select t1.name,t1.rk from( |select *,NVL(comment,-1) rk from region )t13.0 离线数据统计 3.1 统计销售量Top5的数据
select *, row_number() over(partition by subject order by score desc),--不并列排名 rank() over(partition by subject order by score desc),--并列空位排名 dense_rank() over(partition by subject order by score desc)--并列不空位 from score;
select * from (select *,row_number() over(partition by subject order by score desc) rmp from score ) t where t.rmp<=3;3.2 统计某月的总销售额
select *,count(name) over() as total from business where substr(orderdate,1,7)='2017-04';3.3 统计指定几个月的销售额
select *,sum(cost) over(partition by name,substr(orderdate,1,7)) total_amount from business;3.4 保存到Mysql中
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("data_from_ods_to_mysql") val spark=SparkSession.builder().config(sparkConf) .config("hive.metastore.uris", "thrift://192.168.1.101:9083")//指定hive的metastore服务地址 .config("spark.sql.warehouse.dir", "hdfs://192.168.1.100:50070//usr/hive_remote/warehouse")//指定hive在hdfs上的warehouse仓库地址 .enableHiveSupport() .getOrCreate() spark.sql("use jsj") val data = spark.sql("select * from hive_test") data.show() data.write .format("JDBC") .option("url", "jdbc:mysql://192.168.1.102:3306/student?useSSL=false") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "123456") .option("dbtable", "student2") .mode(SaveMode.Append) .save() println("ok")
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)