- 一、pyspark.sql部分
- 1.窗口函数
- 2.更换列名:
- 3.sql将一个字段根据某个字符拆分成多个字段显示
- 4.pd和spark的dataframe进行转换:
- 5.报错ValueError: Some of types cannot be determined after inferring
- 6.df按行打乱
- 7.表格的联结
- 8.dataframe的 *** 作
- 9.createDataFrame的几种方法
- 10.pd dataframe与spark dataframe转换,通过sql语句间接对pandas的dataframe进行处理
- 11.filter筛选
- 12. 新增或者修改spark.sql中dataframe的某列
- 13.将dataframe保存为csv文件
- 14. 取出对应表项内容
- 15.agg和groupby结合使用
- 二、Spark Core模块
- 2.1 udf函数的传参:
- 2.2 pandas core dataframe
- 2.3 rdd *** 作
- 2.4 filter *** 作
- 2.5 flatMap
- 三、MLlib模块
- 3.1 kmeans聚类分析
- 3.2 gbdt分类和回归
- 四、推荐算法
- 4.1 达观数据竞赛:3种改进DL算法
- Reference
# 数据的分组聚合,找到每个用户最近的3次收藏beat(用window开窗函数)
from pyspark.sql.window import Window
import pyspark.sql.functions as F
window_out = Window.partitionBy("user_id") \
.orderBy(F.desc("collect_time"))
# user_feed.withColumn("rank", F.rank().over(window_out)).show(truncate = False)
# user_feed.withColumn("rank", F.rank().over(window_out)).show(40)
user_feed_test = user_feed.withColumn("rank", F.row_number().over(window_out)) \
.where(F.col('rank') <= 3)
user_feed_test.show(30)
结果如下,和mysql的窗口函数类似的,以每个user_id
分组,然后组内排序,这里我只获取排序后collect_time
前3的数据,即最近3次的用户收藏数据:
+--------+-------+------------+--------------------+----+
| user_id|beat_id|collect_type| collect_time|rank|
+--------+-------+------------+--------------------+----+
|10065188| 827272| 4|2021-08-22 04:54:...| 1|
|10065188| 885812| 5|2020-10-23 18:53:...| 2|
|10068979|1069390| 5|2021-06-20 07:44:...| 1|
|10074915| -2| 4|2021-11-27 13:42:...| 1|
|10074915|1122682| 4|2021-09-07 14:26:...| 2|
|10075397| 947751| 4|2022-01-30 07:30:...| 1|
|10075397| 336641| 5|2022-01-30 07:23:...| 2|
|10075397| 886179| 4|2022-01-05 10:35:...| 3|
|10104842| 886462| 1|2021-02-28 17:04:...| 1|
|10122654|1531961| 4|2022-03-16 11:09:...| 1|
|10122654| 893655| 4|2022-03-15 04:32:...| 2|
|10122654| 303121| 4|2022-03-14 05:59:...| 3|
|10134095| 0| 3|2021-07-24 13:02:...| 1|
|10134095|1023250| 4|2021-07-22 00:31:...| 2|
|10139927| 0| 5|2020-09-05 19:14:...| 1|
|10139927| 0| 5|2020-09-03 17:51:...| 2|
|10245428| 889915| 5|2020-05-18 14:41:...| 1|
|10245428|1073074| 5|2020-05-18 14:07:...| 2|
+--------+-------+------------+--------------------+----+
2.更换列名:
如现在有个人员信息表,新加上一列coun try
字段信息:
# 修改列名
from pyspark.sql.functions import col
# df2 = df1.withColumn("avg_resp_rate", col("sum_def_imps")/col("sum_count")).withColumn("avg_ctr", col("sum_clicks")/col("sum_imps"))
# another example
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
data = [('James','','Smith','1991-04-01','M',3000),
('Michael','Rose','','2000-05-19','M',4000),
('Robert','','Williams','1978-09-05','M',4000),
('Maria','Anne','Jones','1967-12-01','F',4000),
('Jen','Mary','Brown','1980-02-17','F',-1)
]
print(data)
"""
[('James', '', 'Smith', '1991-04-01', 'M', 3000),
('Michael', 'Rose', '', '2000-05-19', 'M', 4000),
('Robert', '', 'Williams', '1978-09-05', 'M', 4000),
('Maria', 'Anne', 'Jones', '1967-12-01', 'F', 4000),
('Jen', 'Mary', 'Brown', '1980-02-17', 'F', -1)]
"""
先给出对应的字段,创建我们的DataFrame
格式,然后通过withColumn
新加上一列,其中lit("ABC")
是指整列的数据都是对应的ABC
字符串:
# schema只需要给出列名即可
columns = ["firstname","middlename","lastname","dob","gender","salary"]
# 增加
df = spark.createDataFrame(data=data, schema = columns)
df.show()
# 增加or修改列
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.show()
df3 = df.withColumn("salary",col("salary")*100)
df3.show()
# lit默认均是USA
df5 = df.withColumn("Coun try", lit("ABC"))
df5.show()
结果如下:
+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+----------+------+------+
| James| | Smith|1991-04-01| M| 3000|
| Michael| Rose| |2000-05-19| M| 4000|
| Robert| |Williams|1978-09-05| M| 4000|
| Maria| Anne| Jones|1967-12-01| F| 4000|
| Jen| Mary| Brown|1980-02-17| F| -1|
+---------+----------+--------+----------+------+------+
+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+----------+------+------+
| James| | Smith|1991-04-01| M| 3000|
| Michael| Rose| |2000-05-19| M| 4000|
| Robert| |Williams|1978-09-05| M| 4000|
| Maria| Anne| Jones|1967-12-01| F| 4000|
| Jen| Mary| Brown|1980-02-17| F| -1|
+---------+----------+--------+----------+------+------+
+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+----------+------+------+
| James| | Smith|1991-04-01| M|300000|
| Michael| Rose| |2000-05-19| M|400000|
| Robert| |Williams|1978-09-05| M|400000|
| Maria| Anne| Jones|1967-12-01| F|400000|
| Jen| Mary| Brown|1980-02-17| F| -100|
+---------+----------+--------+----------+------+------+
+---------+----------+--------+----------+------+------+-------+
|firstname|middlename|lastname| dob|gender|salary|Country|
+---------+----------+--------+----------+------+------+-------+
| James| | Smith|1991-04-01| M| 3000| ABC|
| Michael| Rose| |2000-05-19| M| 4000| ABC|
| Robert| |Williams|1978-09-05| M| 4000| ABC|
| Maria| Anne| Jones|1967-12-01| F| 4000| ABC|
| Jen| Mary| Brown|1980-02-17| F| -1| ABC|
+---------+----------+--------+----------+------+------+-------+
3.sql将一个字段根据某个字符拆分成多个字段显示
可以通过withColumn
和split
进行分隔,参考上次写的【Pyspark基础】sql获取user最近3次使用的item。
更多参考:
https://www.cnblogs.com/360aq/p/13269417.html
https://www.pythonheidong.com/blog/article/690421/aa556949151c244e81f8/
- 当需要把Spark DataFrame转换成Pandas DataFrame时,可以调用
toPandas()
; - 当需要从Pandas DataFrame创建Spark DataFrame时,可以采用
createDataFrame(pandas_df)
。
更多参考:
厦大数据实验室-借助arrow实现pyspark和pandas之间的数据转换:http://dblab.xmu.edu.cn/blog/2752-2/
是因为有字段类型spark识别不了:
(1)可以提高数据采样率:
sqlContext.createDataFrame(rdd, samplingRatio=0.01)
(2)显式声明要创建的 DataFrame 的数据结构schema
:
from pyspark.sql.types import *
schema = StructType([
StructField("c1", StringType(), True),
StructField("c2", IntegerType(), True)
])
df = sqlContext.createDataFrame(rdd, schema=schema)
# 方法二: 使用toDF
from pyspark.sql.types import *
schema = StructType([
StructField("c1", StringType(), True),
StructField("c2", IntegerType(), True)
])
df = rdd.toDF(schema=schema)
参考:https://www.codeleading.com/article/64083243294/
6.df按行打乱每行生成随机数后排序,然后删除这一随机数的列。
import pyspark.sql.functions as F
# 从rdd生成dataframe
schema = StructType(fields)
df_1 = spark.createDataFrame(rdd, schema)
# 乱序: pyspark.sql.functions.rand生成[0.0, 1.0]中double类型的随机数
df_2 = df_1.withColumn('rand', F.rand(seed=42))
# 按随机数排序
df_rnd = df_2.orderBy(df_2.rand)
# 删除随机数的一列
df = df_rnd.drop(df_rnd.rand)
7.表格的联结
Spark DataFrame理解和使用之两个DataFrame的关联 *** 作
SQL数据库语言基础之SqlServer多表连接查询与INNER JOIN内连接查询
SQL的表格之间的join连接方式——inner join/left join/right join/full join语法及其用法实例
(1)注:structtype的格式(官方文档):
可以通过StructType
设置schema
,关于其使用:
# 读取beat数据
schema = StructType([StructField("beatid", StringType(), True)\
,StructField("name", StringType(), True)\
,StructField("language", StringType(), True)])
beats = spark.read.csv("filepath", header=False, schema=schema)
# print(beats.show())
beats.show()
(2)从pandas dataframe创建spark dataframe
# 从pandas dataframe创建spark dataframe
colors = ['white','green','yellow','red','brown','pink']
color_df=pd.DataFrame(colors,columns=['color'])
color_df['length']=color_df['color'].apply(len)
color_df=spark.createDataFrame(color_df)
color_df.show()
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.StructType.html?highlight=structtype#pyspark.sql.types.StructType
10.pd dataframe与spark dataframe转换,通过sql语句间接对pandas的dataframe进行处理pandasDF_out.createOrReplaceTempView("pd_data")
# %%
spark.sql("select * from pd_data").show()
# %%
res = spark.sql("""select * from pd_data
where math>= 90
order by english desc""")
res.show()
# %%
output_DF = res.toPandas()
print(type(output_DF))
更多参考:https://blog.csdn.net/weixin_46408961/article/details/120407900
11.filter筛选可以通过filter进行筛选,如找出category_title
为00后
的对应行:
# 方法一:filter
category_beat.filter(" category_title = '00后' ").head(5)
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.filter.html
12. 新增或者修改spark.sql中dataframe的某列官方文档pyspark.sql.DataFrame.withColumn描述:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html?highlight=withcolumn#pyspark.sql.DataFrame.withColumn
13.将dataframe保存为csv文件这里的repartition(num)
是一个可选项:
# save positive_res_data
file_location = "file:///home/hadoop/development/RecSys/data/"
positive_res_data.repartition(2).write.csv(file_location + 'positive_res_data.csv')
# 保存成功,有多个文件
也可以使用save:
https://wenku.baidu.com/view/1329194ea75177232f60ddccda38376baf1fe078.html
首先初始表category_beat
是这样的:
+--------------------------+----------------------------------+
| category_title| collect_set(name)|
+--------------------------+----------------------------------+
| 00后|[白月光与朱砂痣, 致姗姗来迟的你...|
| 04年唱的歌| [祝我生日快乐, 旅行的意义, 柠...|
|10年前没有iPhone但有这些歌| [逆光, 改变自己, 达尔文, 最...|
+--------------------------+----------------------------------+
现在要得到第一行的collect_set
表项内容,即对应的集合:
import random
ans = category_beat.where(col("category_title") == '00后').select(col("collect_set(name)"))
# type(ans)
ans.show()
# 取出对应行列里的set,并且转为对应的list
lst = ans.take(1)[0][0]
# take是取出前1行
lst = list(lst)
print(lst, "\n")
print("推荐的歌曲:", lst[random.randint(0, len(lst))])
"""
+----------------------------------+
| collect_set(name)|
+----------------------------------+
|[白月光与朱砂痣, 致姗姗来迟的你...|
+----------------------------------+
['白月光与朱砂痣', '致姗姗来迟的你', '陨落', '踏山河', '花.间.酒', '山海', '
推荐的歌曲: 山海
"""
15.agg和groupby结合使用
二、Spark Core模块
2.1 udf函数的传参:
https://blog.csdn.net/yeshang_lady/article/details/121570361
2.2 pandas core dataframe可以使用rdd的api。
2.3 rdd *** 作rdd是spark的重点,包括两个算子:
【变换】map、flatMap、groupByKey、reduceByKey等
【动作】collect、count、take、top、first等
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
rdd = rdd.filter(lambda x: x % 2 == 0)
rdd.collect()
# [2, 4, 6]
2.5 flatMap
对rdd中每个元素按照函数 *** 作,并将结果进行扁平化处理。
rdd = sc.parallelize([3, 4, 5])
fm = rdd.flatMap(lambda x: range(1, x))
fm.collect()
# [1, 2, 1, 2, 3, 1, 2, 3, 4]
三、MLlib模块
MLlib (DataFrame-based)
MLlib (RDD-based)
api的使用本身不难,和sklearn的使用差不多:
from pyspark.ml.clustering import KMeans
kMeans = KMeans(k=25, seed=1)
model = kMeans.fit(kmeans_data.select('features'))
model.transform(kmeans_data).show(1000)
# 分析模型训练的结果
# 训练得到的模型是否有summary
print(model.hasSummary, "\n")
# 获得聚类中心
print(model.clusterCenters(), "\n")
# 每个簇的大小(含有的数据点数)
print(model.summary.clusterSizes)
3.2 gbdt分类和回归
四、推荐算法
4.1 达观数据竞赛:3种改进DL算法
http://www.360doc.com/content/17/0615/21/16619343_663476259.shtml
【安装pyspark】
在Ubuntu上安装pyspark:https://zhuanlan.zhihu.com/p/34635519
apache官网上的安装包:https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
[1] https://spark.apache.org/docs/latest/api/python/reference/index.html
[2] 算法工程师的数据分析:https://zhuanlan.zhihu.com/p/343375787
[3] 用createDataFrame创建表的几种方法
[4] spark dataframe按行随机打乱
[5] dataframe的常用 *** 作:
0)https://blog.csdn.net/xc_zhou/article/details/118617642
1)https://blog.csdn.net/yeshang_lady/article/details/89528090
2)https://www.jianshu.com/p/acd96549ee15
3)https://www.dandelioncloud.cn/article/details/1441272697576869890
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)