数据集来自数据网站kaggle的美国新冠肺炎疫情数据集,(从百度网盘下载,提取码:t7tu)该数据集以数据表us-countIEs.csv组织
import pandas as pd#.csv->.txtdata = pd.read_csv('/home/hadoop/data/us-countIEs.csv')with open('/home/hadoop/data/us-countIEs.txt','a+',enCoding='utf-8') as f: for line in data.values: f.write((str(line[0])+'\t'+str(line[1])+'\t' +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))
三、将文件上传至HDFS文件系统中启动Hadoop,并查看启动结果在HDFS文件系统中,创建/user/hadoop文件夹,后续的运行结果文件都将存放在这里
./bin/hdfs dfs -mkdir -p /user/hadoop ./bin/hdfs dfs -put /home/hadoop/data/us-countIEs.txt /user/hadoop./bin/hdfs dfs -ls /user/hadoop
四、使用Spark对数据进行分析创建analyst.py文件,代码如下:from pyspark import SparkConf,SparkContextfrom pyspark.sql import Rowfrom pyspark.sql.types import *from pyspark.sql import SparkSessionfrom datetime import datetimeimport pyspark.sql.functions as func def toDate(inputStr): newStr = "" if len(inputStr) == 8: s1 = inputStr[0:4] s2 = inputStr[5:6] s3 = inputStr[7] newStr = s1+"-"+"0"+s2+"-"+"0"+s3 else: s1 = inputStr[0:4] s2 = inputStr[5:6] s3 = inputStr[7:] newStr = s1+"-"+"0"+s2+"-"+s3 date = datetime.strptime(newStr, "%Y-%m-%d") return date #主程序:spark = SparkSession.builder.config(conf = SparkConf()).getorCreate() fIElds = [StructFIEld("date", DateType(),False),StructFIEld("county", StringType(),False),StructFIEld("state", StringType(),False), StructFIEld("cases", IntegerType(),False),StructFIEld("deaths", IntegerType(),False),]schema = StructType(fIElds) rdd0 = spark.sparkContext.textfile("/user/hadoop/us-countIEs.txt")rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4]))) shemaUsInfo = spark.createDataFrame(rdd1,schema) shemaUsInfo.createOrReplaceTempVIEw("usInfo") #1.计算每日的累计确诊病例数和死亡数df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc()) #列重命名df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")df1.repartition(1).write.Json("result1.Json") #写入hdfs #注册为临时表供下一步使用df1.createOrReplaceTempVIEw("ustotal") #2.计算每日较昨日的新增确诊病例数和死亡病例数df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)") df2.sort(df2["date"].asc()).repartition(1).write.Json("result2.Json") #写入hdfs #3.统计截止5.19日 美国各州的累计确诊人数和死亡人数df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state") df3.sort(df3["totalCases"].desc()).repartition(1).write.Json("result3.Json") #写入hdfs df3.createOrReplaceTempVIEw("eachStateInfo") #4.找出美国确诊最多的10个州df4 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases desc limit 10")df4.repartition(1).write.Json("result4.Json") #5.找出美国死亡最多的10个州df5 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths desc limit 10")df5.repartition(1).write.Json("result5.Json") #6.找出美国确诊最少的10个州df6 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases asc limit 10")df6.repartition(1).write.Json("result6.Json") #7.找出美国死亡最少的10个州df7 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths asc limit 10")df7.repartition(1).write.Json("result7.Json") #8.统计截止5.19全美和各州的病死率df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.Json("result8.Json")
在终端查看文件是否都存在五、将结果从HDFS下载至本地文件系统在本地文件系统中,在/home/hadoop中创建result文件夹,在result文件夹中再创建8个子文件,分别以result1、result2等进行命名。
cd ~/result mkdir result cd result mkdir result1 result2 result3 result4 result5 result6 result7 result8
将HDFS文件系统中的文件下载至本地文件系统。./bin/hdfs dfs -get /user/hadoop/result1.Json/*.Json ~/result/result1/./bin/hdfs dfs -get /user/hadoop/result2.Json/*.Json ~/result/result2/# 对于其他文件以此类推,将文件路径修改即可
查看下载好的文件五、数据可视化安装第三方可视化工具pyecharts
pip install pyecharts
2. 创建showdata.py文件中。具体代码如下:
from pyecharts import options as optsfrom pyecharts.charts import barfrom pyecharts.charts import linefrom pyecharts.components import tablefrom pyecharts.charts import WordCloudfrom pyecharts.charts import PIEfrom pyecharts.charts import Funnelfrom pyecharts.charts import Scatterfrom pyecharts.charts import Pictorialbarfrom pyecharts.options import ComponentTitleOptsfrom pyecharts.globals import SymbolTypeimport Json #1.画出每日的累计确诊病例数和死亡数——>双柱状图def drawChart_1(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.Json" date = [] cases = [] deaths = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break Js = Json.loads(line) date.append(str(Js['date'])) cases.append(int(Js['cases'])) deaths.append(int(Js['deaths'])) d = ( bar() .add_xaxis(date) .add_yaxis("累计确诊人数", cases, stack="stack1") .add_yaxis("累计死亡人数", deaths, stack="stack1") .set_serIEs_opts(label_opts=opts.LabelOpts(is_show=False)) .set_global_opts(Title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数")) .render("/home/hadoop/result/result1/result1.HTML") ) #2.画出每日的新增确诊病例数和死亡数——>折线图def drawChart_2(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.Json" date = [] cases = [] deaths = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break Js = Json.loads(line) date.append(str(Js['date'])) cases.append(int(Js['caseIncrease'])) deaths.append(int(Js['deathIncrease'])) ( line(init_opts=opts.Initopts(wIDth="1600px", height="800px")) .add_xaxis(xaxis_data=date) .add_yaxis( serIEs_name="新增确诊", y_axis=cases, markpoint_opts=opts.MarkPointopts( data=[ opts.MarkPointItem(type_="max", name="最大值") ] ), markline_opts=opts.MarklineOpts( data=[opts.MarklineItem(type_="average", name="平均值")] ), ) .set_global_opts( Title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""), tooltip_opts=opts.tooltipOpts(trigger="axis"), toolBox_opts=opts.ToolBoxOpts(is_show=True), xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False), ) .render("/home/hadoop/result/result2/result1.HTML") ) ( line(init_opts=opts.Initopts(wIDth="1600px", height="800px")) .add_xaxis(xaxis_data=date) .add_yaxis( serIEs_name="新增死亡", y_axis=deaths, markpoint_opts=opts.MarkPointopts( data=[opts.MarkPointItem(type_="max", name="最大值")] ), markline_opts=opts.MarklineOpts( data=[ opts.MarklineItem(type_="average", name="平均值"), opts.MarklineItem(symbol="none", x="90%", y="max"), opts.MarklineItem(symbol="circle", type_="max", name="最高点"), ] ), ) .set_global_opts( Title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""), tooltip_opts=opts.tooltipOpts(trigger="axis"), toolBox_opts=opts.ToolBoxOpts(is_show=True), xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False), ) .render("/home/hadoop/result/result2/result2.HTML") ) #3.画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格def drawChart_3(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.Json" allState = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break Js = Json.loads(line) row = [] row.append(str(Js['state'])) row.append(int(Js['totalCases'])) row.append(int(Js['totalDeaths'])) row.append(float(Js['deathRate'])) allState.append(row) table = table() headers = ["State name", "Total cases", "Total deaths", "Death rate"] rows = allState table.add(headers, rows) table.set_global_opts( Title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="") ) table.render("/home/hadoop/result/result3/result1.HTML") #4.画出美国确诊最多的10个州——>词云图def drawChart_4(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.Json" data = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break Js = Json.loads(line) row=(str(Js['state']),int(Js['totalCases'])) data.append(row) c = ( WordCloud() .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND) .set_global_opts(Title_opts=opts.TitleOpts(title="美国各州确诊top10")) .render("/home/hadoop/result/result4/result1.HTML") ) #5.画出美国死亡最多的10个州——>象柱状图def drawChart_5(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.Json" state = [] totalDeath = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break Js = Json.loads(line) state.insert(0,str(Js['state'])) totalDeath.insert(0,int(Js['totalDeaths'])) c = ( Pictorialbar() .add_xaxis(state) .add_yaxis( "", totalDeath, label_opts=opts.LabelOpts(is_show=False), symbol_size=18, symbol_repeat="fixed", symbol_offset=[0, 0], is_symbol_clip=True, symbol=SymbolType.ROUND_RECT, ) .reversal_axis() .set_global_opts( Title_opts=opts.TitleOpts(title="Pictorialbar-美国各州死亡人数top10"), xaxis_opts=opts.AxisOpts(is_show=False), yaxis_opts=opts.AxisOpts( axistick_opts=opts.AxisTickOpts(is_show=False), axisline_opts=opts.AxislineOpts( linestyle_opts=opts.linestyleOpts(opacity=0) ), ), ) .render("/home/hadoop/result/result5/result1.HTML") ) #6.找出美国确诊最少的10个州——>词云图def drawChart_6(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.Json" data = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break Js = Json.loads(line) row=(str(Js['state']),int(Js['totalCases'])) data.append(row) c = ( WordCloud() .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND) .set_global_opts(Title_opts=opts.TitleOpts(title="美国各州确诊最少的10个州")) .render("/home/hadoop/result/result6/result1.HTML") ) #7.找出美国死亡最少的10个州——>漏斗图def drawChart_7(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.Json" data = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break Js = Json.loads(line) data.insert(0,[str(Js['state']),int(Js['totalDeaths'])]) c = ( Funnel() .add( "State", data, sort_="ascending", label_opts=opts.LabelOpts(position="insIDe"), ) .set_global_opts(Title_opts=opts.TitleOpts(title="")) .render("/home/hadoop/result/result7/result1.HTML") ) #8.美国的病死率--->饼状图def drawChart_8(index): root = "/home/hadoop/result/result" + str(index) +"/part-00000.Json" values = [] with open(root, 'r') as f: while True: line = f.readline() if not line: # 到 EOF,返回空字符串,则终止循环 break Js = Json.loads(line) if str(Js['state'])=="USA": values.append(["Death(%)",round(float(Js['deathRate'])*100,2)]) values.append(["No-Death(%)",100-round(float(Js['deathRate'])*100,2)]) c = ( PIE() .add("", values) .set_colors(["blcak","orange"]) .set_global_opts(Title_opts=opts.TitleOpts(title="全美的病死率")) .set_serIEs_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}")) .render("/home/hadoop/result/result8/result1.HTML") ) #可视化主程序:index = 1while index<9: funcStr = "drawChart_" + str(index) eval(funcStr)(index)index+=1
查看八个问题所生成的HTML图(1)美国每日的累计确诊病例数和死亡数——>双柱状图
(2)美国每日的新增确诊病例数——>折线图
美国每日的新增死亡病例数——>折线图
(3)截止5.19,美国各州累计确诊、死亡人数和病死率—>表格
(4)截止5.19,美国累计确诊人数前10的州—>词云图
(5)截止5.19,美国累计死亡人数前10的州—>象柱状图
(6)截止5.19,美国累计确诊人数最少的10个州—>词云图
(7)截止5.19,美国累计死亡人数最少的10个州—>漏斗图
(8)截止5.19,美国的病死率—>饼状图
参考材料
http://dblab.xmu.edu.cn/blog/2636-2/
注:本篇文章是基于林子雨老师博客的文章,经本人实 *** 后发表。
以上是内存溢出为你收集整理的使用spark+python对2020年美国新冠肺炎疫情数据分析全部内容,希望文章能够帮你解决使用spark+python对2020年美国新冠肺炎疫情数据分析所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)