创建一个exam文件夹用来存放表格
在hdfs创建目录,并将文件传入hdfs中
[root@gree2 exam]# hdfs dfs -mkdir -p /app/data/exam [root@gree2 exam]# hdfs dfs -put ./meituan_waimai_meishi.csv /app/data/exam
查看文件有多少数据
[root@gree2 exam]# hdfs dfs -cat /app/data/exam/meituan_waimai_meishi.csv | wc -l 22/01/04 14:36:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 983目标二:使用 Spark,加载 HDFS 文件系统 meituan_waimai_meishi.csv 文件,并分别使用 RDD 和 Spark SQL 完成以下分析(不用考虑数据去重) 这里在xshell和idea *** 作是一样的
scala> val fileRdd=sc.textFile("/app/data/exam/meituan_waimai_meishi.csv")
查看是否加载成功
①统计每个店铺分别有多少商品(SPU)。先了解split里面加-1的区别x.split(",") 与 x.split(",",-1)区别在于不会省略逗号之间空白的空间,也会打印出来 scala> sc.parallelize(List("a,b","a,b,c,","a,b,,,")) res6: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at:25 scala> res6.map(x=>x.split(",")).foreach(x=>println(x.toList)) List(a, b) List(a, b, c) List(a, b) scala> res6.map(x=>x.split(",",-1)).foreach(x=>println(x.toList)) List(a, b, , , ) List(a, b, c, ) List(a, b)
首先分析数据,我需要的数据是去掉第一行,并且所需要的数据长度为12
scala> val spuRDD=fileRdd.filter(x=>x.startsWith("spu_id")==false).map(x=>x.split(",",-1)).filter(x=>x.size==12)
此时的spuRDD是一个数组,取出需要的字段,变成二元组,对key进行 *** 作,累加
scala> spuRDD.map(x=>(x(2),1)).reduceByKey(_+_).collect.foreach(println)②统计每个店铺的总销售额。
将店名作为key,计算商品单价*月销售量
//导包 scala> import scala.util._ //这边用Try方法是为了防止取不到数据,防止出错,写Try前一定要导包 scala> spuRDD.map(x=>(x(2),Try(x(5).toDouble).toOption.getOrElse(0.0)*Try(x(7).toInt).toOption.getOrElse(0))).reduceByKey(_+_).foreach(println)
③统计每个店铺销售额最高的前三个商品,输出内容包括店铺名,商品名和销售额,其 中销售额为 0 的商品不进行统计计算,例如:如果某个店铺销售为 0,则不进行统计。
//首先过滤掉销售为0的数据,然后取出店铺名,商品名和销售额,接着想算出每个店的消费前三,先进行分组 scala> spuRDD.filter(x=>Try(x(7).toInt).toOption.getOrElse(0)>0).map(x=>(x(2),x(4),Try(x(5).toDouble).toOption.getOrElse(0.0)*Try(x(7).toInt).toOption.getOrElse(0))).groupBy(x=>x._1).collect.foreach(println)
分完组后会形成一个二元组,key是按店名分组的店名,value则是店铺名,商品名和销售额
scala> spuRDD.filter(x=>Try(x(7).toInt).toOption.getOrElse(0)>0) .map(x=>(x(2),x(4),Try(x(5).toDouble).toOption.getOrElse(0.0)*Try(x(7).toInt).toOption.getOrElse(0))) .groupBy(x=>x._1) .mapValues(x=>x.toList.sortBy(x=>0-x._3).take(3)) .flatMapValues(x=>x).collect.foreach(println) //mapValues,就会取二元组的value部分,此时的value包含三个数据店名和商品名,销售额 //接着将value,进行toList,对第三个销售额就行排序,取三个 //flatMapValues, *** 作相当于吧之前的list给去掉形成一个明了的二元组
没去掉list之前
进行flatmap *** 作之后
最后只要取出二元组的value就行了
scala> spuRDD.filter(x=>Try(x(7).toInt).toOption.getOrElse(0)>0) .map(x=>(x(2),x(4),Try(x(5).toDouble).toOption.getOrElse(0.0)*Try(x(7).toInt).toOption.getOrElse(0))) .groupBy(x=>x._1) .mapValues(x=>x.toList.sortBy(x=>0-x._3).take(3)) .flatMapValues(x=>x) .map(x=>x._2).collect.foreach(println) //第二种简便方法 scala> spuRDD.filter(x=>Try(x(7).toInt).toOption.getOrElse(0)>0).map(x=>(x(2),x(4),Try(x(5).toDouble).toOption.getOrElse(0.0)*Try(x(7).toInt).toOption.getOrElse(0))).groupBy(x=>x._1).flatMap(x=>x._2.toList.sortBy(x=>0-x._3).take(3)).collect.foreach(println)下来用SparkSql进行 *** 作一遍
//第一种创建DF方法 scala> val spuDF=spark.read.format("csv") .option("header",true) .option("inferSchema",true) //inferSchema是为了识别字段类型 .load("/app/data/exam/meituan_waimai_meishi.csv") //第一种方法不建议用,建议使用第二种方法 scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> import org.apache.spark.sql._ import org.apache.spark.sql._ scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ //创建Row scala> val spuRowRdd=spuRDD.filter(x=>Try(x(7).toInt).toOption.getOrElse(0)>0) .map(x=>Row(x(1) ,(2) ,x(4) ,Try(x(5).toDouble).toOption.getOrElse(0.0),Try(x(7).toInt).toOption.getOrElse(0))) //创建Schema scala> val spuSchema=StructType(Array( StructField("shop_id",StringType), StructField("shop_name",StringType), StructField("spu_name",StringType), StructField("spu_price",DoubleType), StructField("month_sales",IntegerType))) scala> val spuDF=spark.createDataframe(spuRowRdd,spuSchema) scala> spuDF.printSchema
接下来进行sparksql *** 作
//先创建一个临时表 scala> spuDF.createOrReplaceTempView("spu")①统计每个店铺分别有多少商品(SPU)。
scala> spark.sql("select shop_name , count(spu_name) cnt from spu group by shop_name ").show②统计每个店铺的总销售额。
scala> spark.sql("select shop_name ,sum(spu_price*month_sales) sum from spu group by shop_name ").show③统计每个店铺销售额最高的前三个商品,输出内容包括店铺名,商品名和销售额,其 中销售额为 0 的商品不进行统计计算,例如:如果某个店铺销售为 0,则不进行统计。
//先进行分窗 scala> spark.sql("select shop_name ,spu_name, spu_price*month_sales money , row_number() over(partition by shop_name order by spu_price*month_sales desc ) as rn from spu ").show //然后在分窗的基础上进行查询 scala> spark.sql("select t.shop_name,t.spu_name,t.money from (select shop_name ,spu_name,spu_price*month_sales money ,row_number() over(partition by shop_name order by spu_price*month_sales desc ) as rn from spu)t where t.rn<=3").show
目标三: 3.创建 Hbase 数据表 在 Hbase 中创建命名空间(namespace)exam,在该命名空间下创建 spu 表,该表下有 1 个列族 result。 4. 请 在 Hive 中 创 建 数 据 库 spu_db , 在 该 数 据 库 中 创 建 外 部 表 ex_spu 指 向 /app/data/exam 下的测试数据 ;创建外部表 ex_spu_hbase 映射至 Hbase 中的 exam:spu 表的 result 列族 Hbase *** 作
hbase(main):008:0> create_namespace 'exam' //创建库 hbase(main):015:0> create 'exam:spu','result' //创建族hive *** 作
hive> create databases spu_db; //创建数据库 //在 该 数 据 库 中 创 建 外 部 表 ex_spu 指 向 /app/data/exam 下的测试数据 hive> create external table if not exists ex_spu( > spu_id string, > shop_id string, > shop_name string, > category_name string, > spu_name string, > spu_price double, > spu_originprice double, > month_sales int, > praise_num int, > spu_unit string, > spu_desc string, > spu_image string > ) > row format delimited fields terminated by ',' > stored as textfile location '/app/data/exam' > tblproperties("skip.header.line.count"="1");
//创建外部表 ex_spu_hbase 映射至 Hbase 中的 exam:spu 表的 result 列族 hive> create external table if not exists ex_spu_hbase( > key string, > sales double, > praise int > ) > stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler' > with serdeproperties("hbase.columns.mapping"=":key,result:sales,result:praise") > tblproperties("hbase.table.name"="exam:spu");5. 统计查询 ① 统计每个店铺的总销售额 sales, 店铺的商品总点赞数 praise,并将 shop_id 和 shop_name 的组合作为 RowKey,并将结果映射到 Hbase。
hive> insert into ex_spu_hbase select concat(t.shop_id,t.shop_name) as key, t.sales, t.praise from (select shop_id,shop_name, sum(spu_price*month_sales) as sales, sum(praise_num) as praise from ex_spu group by shop_id,shop_name) t;② 完成统计后,分别在 hive 和 Hbase 中查询结果数据
hbase(main):006:0> scan 'exam:spu'
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)