【Pyspark】常用数据分析基础 *** 作

【Pyspark】常用数据分析基础 *** 作,第1张

文章目录
  • 一、pyspark.sql部分
    • 1.窗口函数
    • 2.更换列名:
    • 3.sql将一个字段根据某个字符拆分成多个字段显示
    • 4.pd和spark的dataframe进行转换:
    • 5.报错ValueError: Some of types cannot be determined after inferring
    • 6.df按行打乱
    • 7.表格的联结
    • 8.dataframe的 *** 作
    • 9.createDataFrame的几种方法
    • 10.pd dataframe与spark dataframe转换,通过sql语句间接对pandas的dataframe进行处理
    • 11.filter筛选
    • 12. 新增或者修改spark.sql中dataframe的某列
    • 13.将dataframe保存为csv文件
    • 14. 取出对应表项内容
    • 15.agg和groupby结合使用
  • 二、Spark Core模块
    • 2.1 udf函数的传参:
    • 2.2 pandas core dataframe
    • 2.3 rdd *** 作
    • 2.4 filter *** 作
    • 2.5 flatMap
  • 三、MLlib模块
    • 3.1 kmeans聚类分析
    • 3.2 gbdt分类和回归
  • 四、推荐算法
    • 4.1 达观数据竞赛:3种改进DL算法
  • Reference

一、pyspark.sql部分 1.窗口函数
# 数据的分组聚合,找到每个用户最近的3次收藏beat(用window开窗函数)
from pyspark.sql.window import Window
import pyspark.sql.functions as F


window_out = Window.partitionBy("user_id") \
                   .orderBy(F.desc("collect_time")) 
# user_feed.withColumn("rank", F.rank().over(window_out)).show(truncate = False)
# user_feed.withColumn("rank", F.rank().over(window_out)).show(40)
user_feed_test = user_feed.withColumn("rank", F.row_number().over(window_out)) \
                          .where(F.col('rank') <= 3) 
user_feed_test.show(30)

结果如下,和mysql的窗口函数类似的,以每个user_id分组,然后组内排序,这里我只获取排序后collect_time前3的数据,即最近3次的用户收藏数据:

+--------+-------+------------+--------------------+----+
| user_id|beat_id|collect_type|        collect_time|rank|
+--------+-------+------------+--------------------+----+
|10065188| 827272|           4|2021-08-22 04:54:...|   1|
|10065188| 885812|           5|2020-10-23 18:53:...|   2|
|10068979|1069390|           5|2021-06-20 07:44:...|   1|
|10074915|     -2|           4|2021-11-27 13:42:...|   1|
|10074915|1122682|           4|2021-09-07 14:26:...|   2|
|10075397| 947751|           4|2022-01-30 07:30:...|   1|
|10075397| 336641|           5|2022-01-30 07:23:...|   2|
|10075397| 886179|           4|2022-01-05 10:35:...|   3|
|10104842| 886462|           1|2021-02-28 17:04:...|   1|
|10122654|1531961|           4|2022-03-16 11:09:...|   1|
|10122654| 893655|           4|2022-03-15 04:32:...|   2|
|10122654| 303121|           4|2022-03-14 05:59:...|   3|
|10134095|      0|           3|2021-07-24 13:02:...|   1|
|10134095|1023250|           4|2021-07-22 00:31:...|   2|
|10139927|      0|           5|2020-09-05 19:14:...|   1|
|10139927|      0|           5|2020-09-03 17:51:...|   2|
|10245428| 889915|           5|2020-05-18 14:41:...|   1|
|10245428|1073074|           5|2020-05-18 14:07:...|   2|
+--------+-------+------------+--------------------+----+
2.更换列名:

如现在有个人员信息表,新加上一列coun try字段信息:

# 修改列名
from pyspark.sql.functions import col
# df2 = df1.withColumn("avg_resp_rate", col("sum_def_imps")/col("sum_count")).withColumn("avg_ctr", col("sum_clicks")/col("sum_imps"))
 
# another example
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType


data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]
print(data)

"""
[('James', '', 'Smith', '1991-04-01', 'M', 3000), 
('Michael', 'Rose', '', '2000-05-19', 'M', 4000), 
('Robert', '', 'Williams', '1978-09-05', 'M', 4000), 
('Maria', 'Anne', 'Jones', '1967-12-01', 'F', 4000), 
('Jen', 'Mary', 'Brown', '1980-02-17', 'F', -1)]
"""

先给出对应的字段,创建我们的DataFrame格式,然后通过withColumn新加上一列,其中lit("ABC")是指整列的数据都是对应的ABC字符串:

# schema只需要给出列名即可
columns = ["firstname","middlename","lastname","dob","gender","salary"]

# 增加
df = spark.createDataFrame(data=data, schema = columns)
df.show()

# 增加or修改列
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.show()

df3 = df.withColumn("salary",col("salary")*100)
df3.show()

# lit默认均是USA
df5 = df.withColumn("Coun try", lit("ABC"))
df5.show()

结果如下:

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|300000|
|  Michael|      Rose|        |2000-05-19|     M|400000|
|   Robert|          |Williams|1978-09-05|     M|400000|
|    Maria|      Anne|   Jones|1967-12-01|     F|400000|
|      Jen|      Mary|   Brown|1980-02-17|     F|  -100|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+----------+------+------+-------+
|firstname|middlename|lastname|       dob|gender|salary|Country|
+---------+----------+--------+----------+------+------+-------+
|    James|          |   Smith|1991-04-01|     M|  3000|    ABC|
|  Michael|      Rose|        |2000-05-19|     M|  4000|    ABC|
|   Robert|          |Williams|1978-09-05|     M|  4000|    ABC|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|    ABC|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|    ABC|
+---------+----------+--------+----------+------+------+-------+
3.sql将一个字段根据某个字符拆分成多个字段显示

可以通过withColumnsplit进行分隔,参考上次写的【Pyspark基础】sql获取user最近3次使用的item。

更多参考:
https://www.cnblogs.com/360aq/p/13269417.html
https://www.pythonheidong.com/blog/article/690421/aa556949151c244e81f8/

4.pd和spark的dataframe进行转换:
  • 当需要把Spark DataFrame转换成Pandas DataFrame时,可以调用toPandas()
  • 当需要从Pandas DataFrame创建Spark DataFrame时,可以采用createDataFrame(pandas_df)

更多参考:
厦大数据实验室-借助arrow实现pyspark和pandas之间的数据转换:http://dblab.xmu.edu.cn/blog/2752-2/

5.报错ValueError: Some of types cannot be determined after inferring

是因为有字段类型spark识别不了:

(1)可以提高数据采样率:

sqlContext.createDataFrame(rdd, samplingRatio=0.01)

(2)显式声明要创建的 DataFrame 的数据结构schema

from pyspark.sql.types import *
schema = StructType([
    StructField("c1", StringType(), True),
    StructField("c2", IntegerType(), True)
])
df = sqlContext.createDataFrame(rdd, schema=schema)

# 方法二: 使用toDF
from pyspark.sql.types import *
schema = StructType([
    StructField("c1", StringType(), True),
    StructField("c2", IntegerType(), True)
])
df = rdd.toDF(schema=schema)

参考:https://www.codeleading.com/article/64083243294/

6.df按行打乱

每行生成随机数后排序,然后删除这一随机数的列。

import pyspark.sql.functions as F

# 从rdd生成dataframe
schema = StructType(fields)
df_1 = spark.createDataFrame(rdd, schema)

# 乱序: pyspark.sql.functions.rand生成[0.0, 1.0]中double类型的随机数
df_2 = df_1.withColumn('rand', F.rand(seed=42))

# 按随机数排序
df_rnd = df_2.orderBy(df_2.rand)

# 删除随机数的一列
df = df_rnd.drop(df_rnd.rand)
7.表格的联结

Spark DataFrame理解和使用之两个DataFrame的关联 *** 作
SQL数据库语言基础之SqlServer多表连接查询与INNER JOIN内连接查询
SQL的表格之间的join连接方式——inner join/left join/right join/full join语法及其用法实例

8.dataframe的 *** 作 9.createDataFrame的几种方法

(1)注:structtype的格式(官方文档):
可以通过StructType设置schema,关于其使用:

# 读取beat数据
schema = StructType([StructField("beatid", StringType(), True)\
                   ,StructField("name", StringType(), True)\
                   ,StructField("language", StringType(), True)])
beats = spark.read.csv("filepath", header=False, schema=schema)
# print(beats.show())
beats.show()

(2)从pandas dataframe创建spark dataframe

# 从pandas dataframe创建spark dataframe
colors = ['white','green','yellow','red','brown','pink']
color_df=pd.DataFrame(colors,columns=['color'])
color_df['length']=color_df['color'].apply(len)

color_df=spark.createDataFrame(color_df)
color_df.show()

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html?highlight=structtype#pyspark.sql.types.StructType

10.pd dataframe与spark dataframe转换,通过sql语句间接对pandas的dataframe进行处理
pandasDF_out.createOrReplaceTempView("pd_data")
# %%
spark.sql("select * from pd_data").show()
# %%
res = spark.sql("""select * from pd_data 
                where math>= 90 
                order by english desc""")
res.show()
# %%
output_DF = res.toPandas()
print(type(output_DF))

更多参考:https://blog.csdn.net/weixin_46408961/article/details/120407900

11.filter筛选

可以通过filter进行筛选,如找出category_title00后的对应行:

# 方法一:filter
category_beat.filter(" category_title = '00后' ").head(5)

https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.filter.html

12. 新增或者修改spark.sql中dataframe的某列

官方文档pyspark.sql.DataFrame.withColumn描述:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html?highlight=withcolumn#pyspark.sql.DataFrame.withColumn

13.将dataframe保存为csv文件

这里的repartition(num)是一个可选项:

# save positive_res_data
file_location = "file:///home/hadoop/development/RecSys/data/"
positive_res_data.repartition(2).write.csv(file_location + 'positive_res_data.csv')
# 保存成功,有多个文件

也可以使用save:
https://wenku.baidu.com/view/1329194ea75177232f60ddccda38376baf1fe078.html

14. 取出对应表项内容

首先初始表category_beat是这样的:

+--------------------------+----------------------------------+
|            category_title|                 collect_set(name)|
+--------------------------+----------------------------------+
|                      00|[白月光与朱砂痣, 致姗姗来迟的你...|
|                04年唱的歌|  [祝我生日快乐, 旅行的意义,...|
|10年前没有iPhone但有这些歌|    [逆光, 改变自己, 达尔文,...|
+--------------------------+----------------------------------+

现在要得到第一行的collect_set表项内容,即对应的集合:

import random

ans = category_beat.where(col("category_title") == '00后').select(col("collect_set(name)"))
# type(ans)
ans.show()

# 取出对应行列里的set,并且转为对应的list
lst = ans.take(1)[0][0]
# take是取出前1行

lst = list(lst)
print(lst, "\n")

print("推荐的歌曲:", lst[random.randint(0, len(lst))])

"""
+----------------------------------+
|                 collect_set(name)|
+----------------------------------+
|[白月光与朱砂痣, 致姗姗来迟的你...|
+----------------------------------+

['白月光与朱砂痣', '致姗姗来迟的你', '陨落', '踏山河', '花.间.酒', '山海', '

推荐的歌曲: 山海
"""
15.agg和groupby结合使用 二、Spark Core模块 2.1 udf函数的传参:

https://blog.csdn.net/yeshang_lady/article/details/121570361

2.2 pandas core dataframe

可以使用rdd的api。

2.3 rdd *** 作

rdd是spark的重点,包括两个算子:
【变换】map、flatMap、groupByKey、reduceByKey等
【动作】collect、count、take、top、first等

2.4 filter *** 作
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
rdd = rdd.filter(lambda x: x % 2 == 0)
rdd.collect()
# [2, 4, 6]
2.5 flatMap

对rdd中每个元素按照函数 *** 作,并将结果进行扁平化处理。

rdd = sc.parallelize([3, 4, 5])
fm = rdd.flatMap(lambda x: range(1, x))
fm.collect()
# [1, 2, 1, 2, 3, 1, 2, 3, 4]
三、MLlib模块

MLlib (DataFrame-based)
MLlib (RDD-based)

3.1 kmeans聚类分析

api的使用本身不难,和sklearn的使用差不多:

from pyspark.ml.clustering import KMeans
kMeans = KMeans(k=25, seed=1)

model = kMeans.fit(kmeans_data.select('features'))
model.transform(kmeans_data).show(1000)

# 分析模型训练的结果

# 训练得到的模型是否有summary
print(model.hasSummary, "\n")

# 获得聚类中心
print(model.clusterCenters(), "\n")

# 每个簇的大小(含有的数据点数)
print(model.summary.clusterSizes)
3.2 gbdt分类和回归 四、推荐算法 4.1 达观数据竞赛:3种改进DL算法

http://www.360doc.com/content/17/0615/21/16619343_663476259.shtml

【安装pyspark】
在Ubuntu上安装pyspark:https://zhuanlan.zhihu.com/p/34635519
apache官网上的安装包:https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

Reference

[1] https://spark.apache.org/docs/latest/api/python/reference/index.html
[2] 算法工程师的数据分析:https://zhuanlan.zhihu.com/p/343375787
[3] 用createDataFrame创建表的几种方法
[4] spark dataframe按行随机打乱
[5] dataframe的常用 *** 作:
0)https://blog.csdn.net/xc_zhou/article/details/118617642
1)https://blog.csdn.net/yeshang_lady/article/details/89528090
2)https://www.jianshu.com/p/acd96549ee15
3)https://www.dandelioncloud.cn/article/details/1441272697576869890

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1294801.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-10
下一篇 2022-06-10

发表评论

登录后才能评论

评论列表(0条)

保存