spark

spark,第1张

spark This is a sample Python script. Press Shift+F10 to execute it or replace it with your code. Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.

import pyspark.sql.types
from pyspark.sql import SparkSession
from pyecharts import Bar, Page
from pyecharts import Map
import requests
import json
from pyecharts import Funnel,EffectScatter,Line, Overlap,Grid,Parallel,Pie,configure
from pyspark.sql.types import DoubleType

import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql.functions import split
import os
def hubeirent(filename): #文件夹 分析湖北不同地区租房的价格
spark=SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
files=os.listdir(filename)
# wuhan 0 yichang 1 huangshi 2
max_list = [0 for i in range(3)]
mean_list = [1.2 for i in range(3)]
min_list = [0 for i in range(3)]
mid_list = [0 for i in range(3)]
mapp={“武汉.csv”:0,“宜昌.csv”:1,“黄石.csv”:2}

all_list = []
for file in files:
    a=filename+'/'+file
    df = spark.read.csv(a, header=True)
    i=mapp[file]
    df=df.withColumn('价格',df["价格"].cast('double'))
    min_list[i]=df.agg({"价格": "min"}).first()['min(价格)']
    max_list[i] = df.agg({"价格": "max"}).first()['max(价格)']
    mean_list[i] = df.agg({"价格": "mean"}).first()['avg(价格)']
    mid_list[i] = df.approxQuantile("价格",[0.5],0.01)[0]
all_list.append(min_list)
all_list.append(max_list)
all_list.append(mean_list)
all_list.append(mid_list)

return all_list

def shenghuirent(filename):
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
files = os.listdir(filename)
# wuhan 0 yichang 1 huangshi 2
max_list = [0 for i in range(5)]
mean_list = [1.2 for i in range(5)]
min_list = [0 for i in range(5)]
mid_list = [0 for i in range(5)]
mapp = {“武汉.csv”: 0, “上海.csv”: 1, “重庆.csv”: 2,“北京.csv”:3,“天津.csv”:4}

all_list = []
for file in files:
    a = filename + '/' + file
    df = spark.read.csv(a, header=True)
    i = mapp[file]
    df = df.withColumn('价格', df["价格"].cast('double'))
    min_list[i] = df.agg({"价格": "min"}).first()['min(价格)']
    max_list[i] = df.agg({"价格": "max"}).first()['max(价格)']
    mean_list[i] = df.agg({"价格": "mean"}).first()['avg(价格)']
    mid_list[i] = df.approxQuantile("价格", [0.5], 0.01)[0]
all_list.append(min_list)
all_list.append(max_list)
all_list.append(mean_list)
all_list.append(mid_list)

return all_list

def wuhandiqurent(filename): #比较不同地区的租金 武汉.csv
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)
df = df.withColumn(‘价格’, df[“价格”].cast(‘double’))
a=df.groupby(‘地址’).agg({‘价格’: ‘mean’}).collect()

qu=[]
price=[]
for i in a:
    qu.append(i[0])
    price.append(i[1])


return qu,price

def wuhanhudiqufangyuan(filename): #统计武汉各地区房源top10
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)
df=df.groupby(‘地址’).count()
a=df.orderBy(-df[‘count’]).collect()
diqu=[]
shu=[]

for j in a[:10]:
    diqu.append(j[0])
    shu.append(j[1])


return diqu,shu

def wuhandiquhuxing(filename): #分析武汉地区户型性价比 jiage/mianji
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)
df=df.filter(df[‘面积’].isNotNull())
df = df.withColumn(‘价格’, df[“价格”].cast(‘double’))
df=df.withColumn(‘面积’, split(df[‘面积’],“㎡”)[0])
df.withColumn(‘面积’, df[“面积”].cast(‘double’))
df=df.withColumn(‘性价比’,df[‘价格’]/df[“面积”])
df=df.groupby(‘房型’).agg({‘性价比’: ‘mean’,‘面积’: ‘mean’,‘价格’:‘mean’})
a=df.orderBy(-df[‘avg(性价比)’]).collect()
huxing=[]
danjia=[]
jiage=[]
mianji=[]
for j in a[:10]:
huxing.append(j[0])
danjia.append(j[1])
jiage.append(j[2])
mianji.append(j[3])

return huxing,danjia,jiage,mianji

def yichangdiqufangyuan(filename):
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)
df = df.groupby(‘地址’).count()
a=df.orderBy(-df[‘count’]).collect()
diqu = []
fangyuan = []

for j in a:
    diqu.append(j[0])
    fangyuan.append(j[1])

return diqu, fangyuan

def yichangdiqujiage(filename):
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)
df = df.withColumn(‘价格’, df[“价格”].cast(‘double’))
a=df.groupby(‘地址’).agg({‘价格’: ‘min’}).collect()
b=df.groupby(‘地址’).agg({‘价格’: ‘max’}).collect()
c=df.groupby(‘地址’).agg({‘价格’: ‘avg’}).collect()
diqu=[]
min_list=[]
max_list=[]
mean_list=[]
for i in range(len(a)):
diqu.append(a[i][0])
min_list.append(a[i][1])
max_list.append(b[i][1])
mean_list.append(c[i][1])

return diqu,min_list,max_list,mean_list

def huangshimianji(filename):
spark = SparkSession.builder.master(“local”).appName(“main”).getOrCreate()
df = spark.read.csv(filename, header=True)

df = df.withColumn('面积', split(df['面积'], "㎡")[0])
df.withColumn('面积', df["面积"].cast('double'))


a=df.filter(df.面积< 50).groupby('面积').agg({'价格': 'avg'}).collect()
b=df.filter(df.面积>= 50).filter(df.面积<100).groupby('面积').agg({'价格': 'avg'}).collect()
c = df.filter(df.面积 >= 100).filter(df.面积 < 150).groupby('面积').agg({'价格': 'avg'}).collect()
d = df.filter(df.面积 >= 150).filter(df.面积 < 200).groupby('面积').agg({'价格': 'avg'}).collect()
e = df.filter(df.面积 >= 200).groupby('面积').agg({'价格': 'avg'}).collect()
resulta=[]
resultb=[]
resultc=[]
resultd=[]
resulte=[]
for i in a:
    resulta.append([i[0],i[1]])
for i in b:
    resultb.append([i[0],i[1]])
for i in c:
    resultc.append([i[0], i[1]])
for i in d:
    resultd.append([i[0], i[1]])
for i in e:
    resulte.append([i[0], i[1]])

return resulta,resultb,resultc,resultd,resulte

def draw_hubeirent(list):
attr = [“武汉”, “宜昌”, “黄石”]
v0 = list[0]
v1 = list[1]
v2 = list[2]
v3 = list[3]

bar = Bar("湖北省租房租金概况")
bar.add("最小值", attr, v0, is_stack=True)
bar.add("最大值", attr, v1, is_stack=True)
bar.add("平均值", attr, v2, is_stack=True)
bar.add("中位数", attr, v3, is_stack=True)
bar.render("湖北.html")

def draw_shenghuirent(list):
attr = [“武汉”, “上海”, “重庆”, “北京”, “天津”]
v0 = list[0]
v1 = list[1]
v2 = list[2]
v3 = list[3]

bar = Bar("武汉市和直辖市租房租金概况")
bar.add("最小值", attr, v0, is_stack=True)
bar.add("最大值", attr, v1, is_stack=True)
bar.add("平均值", attr, v2, is_stack=True)
bar.add("中位数", attr, v3, is_stack=True)
bar.render("省会.html")

def draw_wuhandiqurent(list1,list2): #地区 价格
line = Line(‘地区价格图’, background_color=‘white’, title_text_size=25)
line.add(‘地区’, list1,list2, is_label_show = False,is_smooth=True,is_fill=True)
line.render(“武汉小区租金对比.html”)

def draw_wuhanhudiqufangyuan(list1,list2):
funnel = Funnel(“武汉小区房源top10”)
funnel.add(“地址”, list1,list2, is_label_show=True, label_pos=“inside”, label_text_color="#fff")
funnel.render(“武汉房源top10.html”)
def draw_yichangdiqufangyuan(list1,list2):
map=Map(“宜昌地区房源对比图”,“宜昌市”)
map.add(“宜昌”,list1,list2,visual_range=[0,max(list2)],maptype=‘宜昌’,is_visualmap=True)
map.render(“宜昌地区房源对比图.html”)
def draw_yichangdiqujiage(list1,list2,list3,list4):
parallel = Parallel(“宜昌主要地区租金对比图”, width=600, height=400,)
parallel.config(list1)
parallel.add(“宜昌”, [list2,list3,list4], visual_range=[0, max(list2,list3,list4)], is_roam=True) # type有scatter, effectScatter, heatmap三种模式可选,可根据自己的需求选择对应的图表模式
parallel.render(path=“宜昌主要地区租金对比图.html”)

def draw_wuhandiquhuxing(list1,list2,list3,list4): #户型 单价 租金 面积
bar = Bar(title=“户型对比”)
line = Line()
bar.add(“租金”, list1, list2, mark_line=[“average”], mark_point=[“max”, “min”],
is_label_show=True) # 将数据加入图中
bar.add(“面积”, list1, list3, mark_line=[“average”], mark_point=[“max”, “min”],
is_label_show=True) # 将数据加入图中
line.add(“单价”, list1, list4, is_smooth=True) # 将数据加入图中

overlap = Overlap()
overlap.add(bar)
overlap.add(line)
overlap.render("户型top10对比图.html")

def draw_huangshimianji(list1,list2,list3,list4,list5):
key=[]
value=[]
for i in list1:
key.append(i[0])
value.append(i[1])

es1=EffectScatter('面积<50')
es1.add('面积<50',key,value,symbol="rect")
es1.render('面积<50.html')
key = []
value = []
for i in list2:
    key.append(i[0])
    value.append(i[1])
es2 = EffectScatter('50<=面积<100')
es2.add('50<=面积<100', key, value, symbol="roundRect")
es2.render('50<=面积<100.html')
key = []
value = []
for i in list3:
    key.append(i[0])
    value.append(i[1])
es3= EffectScatter('100<=面积<150')
es3.add('100<=面积<150', key, value, symbol="triangle")
es3.render('100<=面积<150.html')
key = []
value = []
for i in list4:
    key.append(i[0])
    value.append(i[1])
es4 = EffectScatter('150<=面积<200')
es4.add('150<=面积<200', key, value, symbol="diamond")
es4.render('150<=面积<200.html')
key = []
value = []
for i in list5:
    key.append(i[0])
    value.append(i[1])
es5= EffectScatter('200<=面积')
es5.add('200<=面积', key, value, symbol="pin")
es5.render('200<=面积.html')
key=['面积<50','50<=面积<100','100<=面积<150','150<=面积<200','200<=面积']
value=[len(list1),len(list2),len(list3),len(list4),len(list5)]

pie = Pie('黄石房源面积对比图')
pie.add("房源面积数量", key, value)
pie.render("黄石房源面积对比图.html")

if name == ‘main’:
files=“链家/湖北”
all_list=hubeirent(files)
draw_hubeirent(all_list)
files2=“链家/省会”
all_list2 = shenghuirent(files2)
draw_shenghuirent(all_list2)
files3=“链家/湖北/武汉.csv”
qu,price=wuhandiqurent(files3)
draw_wuhandiqurent(qu,price)

diqu,shu=wuhanhudiqufangyuan(files3)
draw_wuhanhudiqufangyuan(diqu,shu)
huxing,danjia,jiage,mianji=wuhandiquhuxing(files3)
draw_wuhandiquhuxing(huxing,danjia,jiage,mianji)
file4="链家/湖北/宜昌.csv"
diqu, fangyuan=yichangdiqufangyuan(file4)
draw_yichangdiqufangyuan(diqu, fangyuan)
diqu,min_list,max_list,mean_list=yichangdiqujiage(file4)
draw_yichangdiqujiage(diqu,min_list,max_list,mean_list)
files5="链家/湖北/黄石.csv"
resulta, resultb, resultc, resultd, resulte =huangshimianji(files5)
draw_huangshimianji(resulta, resultb, resultc, resultd, resulte)

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5700528.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存