#数据
tmpd = [{'model': 'AVA', 'city': '苏州', 'y': 0}, {'model': 'AVA', 'city': '苏州', 'y': 0},
{'model': 'TNY', 'city': '青岛', 'y': 0}, {'model': 'AVA', 'city': '青岛', 'y': 0},
{'model': 'TNY', 'city': '青岛', 'y': 0}, {'model': 'TNY', 'city': '青岛', 'y': 0},
{'model': 'TNY', 'city': '青岛', 'y': 0}, {'model': 'AVA', 'city': '上海', 'y': 0},
{'model': 'Mi', 'city': '上海', 'y': 0}, {'model': 'Mi', 'city': '上海', 'y': 0},
{'model': 'Mi', 'city': '上海', 'y': 0}, {'model': 'fla', 'city': '北京', 'y': 0}, ]
对city 和 model 两列 个数小于4的,全部修改为默认值
+----+-----+---+--------+---------+--------+--------+
|city|model| y|city_cnt|model_cnt| city_| model_|
+----+-----+---+--------+---------+--------+--------+
|上海| AVA| 0| 4| 4| 上海| AVA|
|苏州| AVA| 0| 2| 4|defeault| AVA|
|苏州| AVA| 0| 2| 4|defeault| AVA|
|青岛| AVA| 0| 5| 4| 青岛| AVA|
|上海| Mi| 0| 4| 3| 上海|defeault|
|上海| Mi| 0| 4| 3| 上海|defeault|
|上海| Mi| 0| 4| 3| 上海|defeault|
|青岛| TNY| 0| 5| 4| 青岛| TNY|
|青岛| TNY| 0| 5| 4| 青岛| TNY|
|青岛| TNY| 0| 5| 4| 青岛| TNY|
|青岛| TNY| 0| 5| 4| 青岛| TNY|
|北京| fla| 0| 1| 1|defeault|defeault|
1.第一种思路,生成临时表,统计数量,用withcolumn 修改数据
tmpd = spark.createDataFrame(tmpd)
tmpd.createOrReplaceTempView('info')
df_info=spark.sql("select *,count(*) over(partition by city) city_cnt,count(*) over(partition by model) model_cnt from info")
res=df_info.withColumn('city_',when(df_info['city_cnt']>3,df_info.city).otherwise("defeault")).\
withColumn('model_',when(df_info['model_cnt']>3,df_info.model).otherwise("defeault"))
res.show()
2.第二种思路,调用pandas API 处理数据
psdf=tmpd.to_pandas_on_spark()
#统计个数
psdf1=ps.sql("select *,count(*) over(partition by city) city_cnt,count(*) over(partition by model) model_cnt from {psdf}")
psdf1.loc[psdf1['city_cnt']<4,'city' ]='default'
psdf1.loc[psdf1['model_cnt']<4,'model' ]='default'
psdf1
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)