[root@gree139 exam]# hdfs dfs -mkdir -p /app/data/exam2011
[root@gree139 exam]# hdfs dfs -put ./countrydata.csv /app/data/exam2011
scala> sc.textFile("/app/data/exam2011/")
scala> res24
scala> res24.map(x=>x.split(","))
scala> res27.collect.foreach(x=>println(x.toList))
Id 累积 当日 日期 国家 国家码 洲
scala> res27.map(x=>(x(4),x(1).toInt)).reduceByKey((a,b)=>if(a>b) a else b).collect.foreach(println)
scala> res27.map(x=>(x(4),x(1).toInt)).reduceByKey((a,b)=>math.max(a,b)).collect.foreach(println)
②统计全世界在数据截止统计时的总感染人数。(9 分)
scala> res27.map(x=>(x(4),x(1).toInt)).reduceByKey((a,b)=>math.max(a,b)).reduce((x,y)=>("wordsum",x._2+y._2))
res33: (String, Int) = (wordsum,10785407)
③统计每个大洲中每日新增确诊人数最多的国家及确诊人数,并输出 20200408 这一天各大洲当日新增确诊人数最多的国家及确诊人数。(9 分)
scala> res27.map(x=>((x(3),x(6)),(x(2).toInt,x(4)))).reduceByKey((a,b)=>if(a._1>b._1) a else b).foreach(println)
scala> res38
scala> res38.filter(x=>x._1._1=="20200408").collect.foreach(println)
((20200408,欧洲),(6180,西班牙))
((20200408,大洋洲),(208,澳大利亚))
((20200408,非洲),(159,南非))
((20200408,其他),(0,钻石公主号邮轮))
((20200408,南美洲),(2210,巴西))
((20200408,北美洲),(26912,美国))
((20200408,亚洲),(4697,伊朗))
④统计每个大洲中每日累计确诊人数最多的国家及确诊人数,并输出 20200607 这一天各 大洲当日累计确诊人数最多的国家及确诊人数。(9 分)
scala> res27.map(x=>((x(3),x(6)),(x(1).toInt,x(4)))).reduceByKey((a,b)=>if(a._1>b._1) a else b)
scala> res40.filter(x=>x._1._1=="20200607").collect.foreach(println)
((20200607,南美洲),(691962,巴西))
((20200607,北美洲),(1938931,美国))
((20200607,大洋洲),(7255,澳大利亚))
((20200607,非洲),(48285,南非))
((20200607,亚洲),(246628,印度))
((20200607,欧洲),(467673,俄罗斯))
((20200607,其他),(712,钻石公主号邮轮))
⑤统计每个大洲每月累计确诊人数,显示 202006 这个月每个大洲的累计确诊人数。(9)
scala>res27.map(x=>((x(3).substring(0,6),x(6)),x(2).toInt)).reduceByKey(_+_).collect.foreach(println)
scala>res27.map(x=>((x(3).substring(0,6),x(6)),x(2).toInt)).reduceByKey(_+_).filter(x=>x._1._1=="202005").foreach(println)
((202005,大洋洲),539)
((202005,欧洲),627231)
((202005,其他),0)
((202005,亚洲),611774)
((202005,非洲),108413)
((202005,北美洲),858043)
((202005,南美洲),700867)
use exam; create external table if not exists ex_exam_record2( id string, confirmedCount int, confirmedIncr int, recordDate string, countryName string, countryShortCode string, continent string )row format delimited fields terminated by ',' stored as textfile location '/app/data/exam2011'; select * from ex_exam_record2; create external table if not exists ex_exam_anlysis( student_id string, total_score float, question_count int, accuracy float )stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler' with serdeproperties ("hbase.columns.mapping"=":key,accuracy:total_score,accuracy:question_count,accuracy:accuracy") tblproperties ("hbase.table.name"="exam:analysis"); create external table if not exists ex_exam_covid19_record ( key string, maxIncreaseCountry string, maxIncreaseCount int )stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler' with serdeproperties ("hbase.columns.mapping"=":key,record:maxIncreaseCountry,record:maxIncreaseCount") tblproperties ("hbase.table.name"="exam:covid19_world"); select * from ex_exam_covid19_record; insert into ex_exam_covid19_record select key,countryName,confirmedIncr from (select concat(continent,recordDate) key, countryName, /confirm/iedIncr, rank() over(partition by continent,recordDate order by confirmedIncr desc) rn from ex_exam_record2) t where t.rn=1; select count(*) from ex_exam_covid19_record;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)