美团外卖平台的部分外卖 SPU数据实 *** 练习

美团外卖平台的部分外卖 SPU数据实 *** 练习,第1张

美团外卖平台的部分外卖 SPU数据实 *** 练习 一、环境要求 Hadoop+Hive+Spark+Hbase 开发环境。

三、数据描述 meituan_waimai_meishi.csv 是美团外卖平台的部分外卖 SPU(Standard Product Unit , 标准产品单元)数据,包含了外卖平台某地区一时间的外卖信息。具体字段说明如下:

四、功能要求 1.数据准备 请在 HDFS 中创建目录/app/data/exam,并将 meituan_waimai_meishi.csv 文件传到该 目录。并通过 HDFS 命令查询出文档有多少行数据。 2.使用 Spark,加载 HDFS 文件系统 meituan_waimai_meishi.csv 文件,并分别使用 RDD 和 Spark SQL 完成以下分析(不用考虑数据去重) ①统计每个店铺分别有多少商品(SPU)。 ②统计每个店铺的总销售额。 ③统计每个店铺销售额最高的前三个商品,输出内容包括店铺名,商品名和销售额,其 中销售额为 0 的商品不进行统计计算,例如:如果某个店铺销售为 0,则不进行统计。

目标一:.数据准备 请在 HDFS 中创建目录/app/data/exam,并将 meituan_waimai_meishi.csv 文件传到该 目录。并通过 HDFS 命令查询出文档有多少行数据。

创建一个exam文件夹用来存放表格

在hdfs创建目录,并将文件传入hdfs中

[root@gree2 exam]# hdfs dfs -mkdir -p  /app/data/exam


[root@gree2 exam]# hdfs dfs -put ./meituan_waimai_meishi.csv /app/data/exam

 查看文件有多少数据

[root@gree2 exam]# hdfs dfs  -cat /app/data/exam/meituan_waimai_meishi.csv | wc -l
22/01/04 14:36:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
983

目标二:使用 Spark,加载 HDFS 文件系统 meituan_waimai_meishi.csv 文件,并分别使用 RDD 和 Spark SQL 完成以下分析(不用考虑数据去重) 这里在xshell和idea *** 作是一样的
scala> val fileRdd=sc.textFile("/app/data/exam/meituan_waimai_meishi.csv")

查看是否加载成功

 ①统计每个店铺分别有多少商品(SPU)。先了解split里面加-1的区别
x.split(",") 与 x.split(",",-1)区别在于不会省略逗号之间空白的空间,也会打印出来


scala> sc.parallelize(List("a,b","a,b,c,","a,b,,,"))
res6: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at :25

scala> res6.map(x=>x.split(",")).foreach(x=>println(x.toList))
List(a, b)
List(a, b, c)
List(a, b)

scala> res6.map(x=>x.split(",",-1)).foreach(x=>println(x.toList))
List(a, b, , , )
List(a, b, c, )
List(a, b)

 首先分析数据,我需要的数据是去掉第一行,并且所需要的数据长度为12

scala> val spuRDD=fileRdd.filter(x=>x.startsWith("spu_id")==false).map(x=>x.split(",",-1)).filter(x=>x.size==12)

此时的spuRDD是一个数组,取出需要的字段,变成二元组,对key进行 *** 作,累加

scala> spuRDD.map(x=>(x(2),1)).reduceByKey(_+_).collect.foreach(println)

②统计每个店铺的总销售额。

将店名作为key,计算商品单价*月销售量

//导包
scala> import scala.util._


//这边用Try方法是为了防止取不到数据,防止出错,写Try前一定要导包
scala> spuRDD.map(x=>(x(2),Try(x(5).toDouble).toOption.getOrElse(0.0)*Try(x(7).toInt).toOption.getOrElse(0))).reduceByKey(_+_).foreach(println)

 

③统计每个店铺销售额最高的前三个商品,输出内容包括店铺名,商品名和销售额,其 中销售额为 0 的商品不进行统计计算,例如:如果某个店铺销售为 0,则不进行统计。
//首先过滤掉销售为0的数据,然后取出店铺名,商品名和销售额,接着想算出每个店的消费前三,先进行分组
scala> spuRDD.filter(x=>Try(x(7).toInt).toOption.getOrElse(0)>0).map(x=>(x(2),x(4),Try(x(5).toDouble).toOption.getOrElse(0.0)*Try(x(7).toInt).toOption.getOrElse(0))).groupBy(x=>x._1).collect.foreach(println)

分完组后会形成一个二元组,key是按店名分组的店名,value则是店铺名,商品名和销售额

scala> spuRDD.filter(x=>Try(x(7).toInt).toOption.getOrElse(0)>0)
.map(x=>(x(2),x(4),Try(x(5).toDouble).toOption.getOrElse(0.0)*Try(x(7).toInt).toOption.getOrElse(0)))
.groupBy(x=>x._1)
.mapValues(x=>x.toList.sortBy(x=>0-x._3).take(3))
.flatMapValues(x=>x).collect.foreach(println)

//mapValues,就会取二元组的value部分,此时的value包含三个数据店名和商品名,销售额
//接着将value,进行toList,对第三个销售额就行排序,取三个
//flatMapValues, *** 作相当于吧之前的list给去掉形成一个明了的二元组

 没去掉list之前

 进行flatmap *** 作之后

 最后只要取出二元组的value就行了

scala> spuRDD.filter(x=>Try(x(7).toInt).toOption.getOrElse(0)>0)
.map(x=>(x(2),x(4),Try(x(5).toDouble).toOption.getOrElse(0.0)*Try(x(7).toInt).toOption.getOrElse(0)))
.groupBy(x=>x._1)
.mapValues(x=>x.toList.sortBy(x=>0-x._3).take(3))
.flatMapValues(x=>x)
.map(x=>x._2).collect.foreach(println)


//第二种简便方法
scala> spuRDD.filter(x=>Try(x(7).toInt).toOption.getOrElse(0)>0).map(x=>(x(2),x(4),Try(x(5).toDouble).toOption.getOrElse(0.0)*Try(x(7).toInt).toOption.getOrElse(0))).groupBy(x=>x._1).flatMap(x=>x._2.toList.sortBy(x=>0-x._3).take(3)).collect.foreach(println)

下来用SparkSql进行 *** 作一遍
//第一种创建DF方法
scala> val spuDF=spark.read.format("csv")
.option("header",true)
.option("inferSchema",true)  //inferSchema是为了识别字段类型
.load("/app/data/exam/meituan_waimai_meishi.csv")


//第一种方法不建议用,建议使用第二种方法
scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

//创建Row
scala> val spuRowRdd=spuRDD.filter(x=>Try(x(7).toInt).toOption.getOrElse(0)>0)
.map(x=>Row(x(1)
,(2)
,x(4)
,Try(x(5).toDouble).toOption.getOrElse(0.0),Try(x(7).toInt).toOption.getOrElse(0)))

//创建Schema
scala> val spuSchema=StructType(Array(
StructField("shop_id",StringType),
StructField("shop_name",StringType),
StructField("spu_name",StringType),
StructField("spu_price",DoubleType),
StructField("month_sales",IntegerType)))


scala> val spuDF=spark.createDataframe(spuRowRdd,spuSchema)

scala> spuDF.printSchema


 

接下来进行sparksql *** 作
//先创建一个临时表
scala> spuDF.createOrReplaceTempView("spu")


①统计每个店铺分别有多少商品(SPU)。
scala> spark.sql("select shop_name , count(spu_name) cnt from spu group by shop_name ").show

②统计每个店铺的总销售额。
scala> spark.sql("select shop_name ,sum(spu_price*month_sales) sum  from spu group by shop_name ").show

③统计每个店铺销售额最高的前三个商品,输出内容包括店铺名,商品名和销售额,其 中销售额为 0 的商品不进行统计计算,例如:如果某个店铺销售为 0,则不进行统计。
//先进行分窗

scala> spark.sql("select shop_name ,spu_name,
spu_price*month_sales money ,
row_number() over(partition by shop_name order by spu_price*month_sales desc ) as rn
from spu ").show

//然后在分窗的基础上进行查询
scala> spark.sql("select t.shop_name,t.spu_name,t.money from (select shop_name ,spu_name,spu_price*month_sales money ,row_number() over(partition by shop_name order by spu_price*month_sales desc ) as rn   from spu)t where t.rn<=3").show

 

目标三: 3.创建 Hbase 数据表 在 Hbase 中创建命名空间(namespace)exam,在该命名空间下创建 spu 表,该表下有 1 个列族 result。 4. 请 在 Hive 中 创 建 数 据 库 spu_db , 在 该 数 据 库 中 创 建 外 部 表 ex_spu 指 向 /app/data/exam 下的测试数据 ;创建外部表 ex_spu_hbase 映射至 Hbase 中的 exam:spu 表的 result 列族

 Hbase *** 作
hbase(main):008:0> create_namespace 'exam'  //创建库

hbase(main):015:0> create 'exam:spu','result'  //创建族

hive *** 作
hive> create databases spu_db;  //创建数据库

//在 该 数 据 库 中 创 建 外 部 表 ex_spu 指 向 /app/data/exam 下的测试数据
hive> create external table if not exists ex_spu(
    > spu_id string,
    > shop_id string,
    > shop_name string,
    > category_name string,
    > spu_name string,
    > spu_price double,
    > spu_originprice double,
    > month_sales int,
    > praise_num int,
    > spu_unit string,
    > spu_desc string,
    > spu_image string
    > )
    > row format delimited fields terminated by ','
    > stored as textfile location '/app/data/exam'
    > tblproperties("skip.header.line.count"="1");

//创建外部表 ex_spu_hbase 映射至 Hbase 中的 exam:spu 表的 result 列族
hive> create external table if not exists ex_spu_hbase(
    > key string,
    > sales double,
    > praise int
    > )
    > stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler' 
    > with serdeproperties("hbase.columns.mapping"=":key,result:sales,result:praise")
    > tblproperties("hbase.table.name"="exam:spu");

5. 统计查询  ① 统计每个店铺的总销售额 sales, 店铺的商品总点赞数 praise,并将 shop_id 和 shop_name 的组合作为 RowKey,并将结果映射到 Hbase。
hive> insert into ex_spu_hbase 
select concat(t.shop_id,t.shop_name) as key,
t.sales, t.praise from (select shop_id,shop_name, 
sum(spu_price*month_sales) as sales, sum(praise_num) as praise 
from ex_spu group by shop_id,shop_name) t;

② 完成统计后,分别在 hive 和 Hbase 中查询结果数据

 

hbase(main):006:0> scan 'exam:spu'

 

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5695976.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存