第一题上传
hdfs dfs -mkdir /app/data/allprovinces
hdfs dfs -put /opt/1226/products.txt /app/data/allprovinces/
hdfs dfs -put /opt/1226/allprovinces.txt /app/data/allprovinces/
hdfs dfs -mv /app/data/allprovinces/products.txt /app/data/events/products
- 创建 Hbase 数据表
- 请在 Hive 中创建数据库 market,并在 market 数据库中创建三个外表
hive --service hiveserver2
hive --service metastore
create database market;
create database market;
use market;
create external table ex_allprovinces(
name string,
abbr string
)
row format delimited fields terminated by 't'
location '/app/data/allprovinces/';
create external table ex_products(
name string,
price float,
craw_time string,
market string,
province string,
city string
)
row format delimited fields terminated by 't'
location '/app/data/events/products';
3c
create external table ex_province_market(
rowkey string,
marketCount string,
provicneName string
)
stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler'
with serdeproperties
("hbase.columns.mapping" = ":key,market:count,info:name")
tblproperties ("hbase.table.name" = "exam:province_market");
4d插入那个hbase表的语句
with t1
as (
select province,count(distinct(market))con from ex_products where province!="" group by province )
insert into table ex_province_market
select t2.abbr,t1.con,t1.province from t1 inner join ex_allprovinces t2 on t1.province=t2.name ;
select * from ex_province_market;
4.在 Spark-Shell 中,将 products.txt 装载到 RDD 中,使用 RDD 对农产品种类统
计,并将结果输出到控制台上。
//rdd 1题
val rdd = sc.textFile("hdfs://192.168.10.136:9000/app/data/events/products/products.txt")
rdd.map(_.split("t")).filter(_.size==6).map(x=>{
(x(4),x(0))
}).groupByKey().map(x=>(x._1,x._2.toArray.distinct.size))
.repartition(1).sortBy(_._2,false).take(3).foreach(println)
//rdd 2题
val lines = sc.textFile("hdfs://192.168.10.136:9000/app/data/events/products/products.txt")
val tmp=lines.distinct.filter(_.split("t").length==6)
tmp.map(line=>{
val fields: Array[String] = line.split("t")
((fields(4),fields(3)),1)
}).reduceByKey(_+_).map({case((province,market),nums)=>(province,(market,nums))})
.groupByKey().mapValues(x=>x.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)).foreach(println)
5.在 Spark-Shell 中,将 products.txt 装载成 Dataframe,计算山西省每种农产
品的价格波动趋势,即计算每天价格均值,并将结果输出到控制台上。
val df: Dataframe = sc.textFile("hdfs://192.168.10.136:9000/app/data/events/products/products.txt")
.map(_.split("t")).filter(_.length == 6).map(x => {
(x(0), x(1), x(2), x(3), x(4), x(5))
}).toDF("name", "price", " craw_time", "market", "province", "city")
df.where("province='山西'").groupBy("name").agg( sum($"price").as("sum"), min($"price").as("min"), count($"price").as("count"), max($"price").as("max") ).withColumn("pavg", when($"count">2, ($"sum" - $"max" - $"min") / ($"count" - 2)).otherwise($"sum"/$"count")) .show()
这里空值是因为有的种类只有一个价格 减掉2变成负数所以最后得到null 题目没标注 就不特意处理了 保持原汁原味
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)