pyspark常用语法

pyspark常用语法,第1张

pyspark常用语法 想了解更多,欢迎移步"文渊小站"

里面有更多知识分享,以及一些有意思的小项目~

环境

spark 2.4.0

df查看信息(常用)
df.schema     # df表结构
df.columns    # df各字段
df.dtypes    # df各字段数据类型
df.first()   # df的第一行数据
df.head()    # df的第一行数据       # df.head(5)    # df的前五行数据
df.show()    # 展示 df 的部分行数据   # df.show(5)    # 展示 df 的前五行数据
display(df)    # 显示df的各字段数据类型     # display(df.toPandas())  # pandas样式展示df的部分行数据
df.count()   # 统计df的总数据量
选取df部分行作为新的df
#df = df.withColumn('index', F.monotonically_increasing_id())   # 不重复的自增列'index'  # 但会出现不连续的情况,故不实用

# 用于添加自增列
def flat(l):
    for k in l:
        if not isinstance(k, (list, tuple)):
            yield k
        else:
            yield from flat(k)
          
# 新增一列从0开始连续自增的'index'列
schema = df.schema.add(StructField('index', LongType()))
rdd = df.rdd.zipWithIndex()
rdd = rdd.map(lambda x: list(flat(x)))
df = spark.createDataframe(rdd, schema)

df = df.where(F.col('index').between(0, 49999))   # 选择 index 为 0-49999 共5万条数据
df = df.drop(df.index)  # 删除'index'列
空值填充
# 步骤1:NaN替换为Null【然后再fillna空值填充,才不会出问题(fillna不会对NaN进行填充)】
# 方式1
df = df.replace(float('nan'), None)
# 方式2
import pyspark.sql.functions as F
columns = df.columns
for column in columns:
    df = df.withColumn(column, F.when(F.isnan(F.col(column)) | F.col(column).isNull(), None).otherwise(F.col(column)))
    

# 步骤2 fillna空值填充
# 给 'a','b','c' 3列的空值填充为 '0'   
# 注意:'a','b','c' 3列 需要是 string类型,不然填充无效(如果 'a','b','c' 3列 为 int类型,则填充 0)
df_train = df.fillna('0', ['a', 'b', 'c'])
删除指定列
# 删除1列
df = df.drop('col1')
df = df.drop(df.col1)

# 删除多列
cols_drop = ['a', 'b', 'c']
df = df.drop(*cols_drop)
选择指定列
df_1 = df.select('a', 'b', 'c')   # 或者写成:df_1 = df.select(['a', 'b', 'c'])
df某列重命名
df = df.withColumnRenamed('a', 'a_new')
df新增一列
df = df.withColumn('a', F.lit(1))  # 给df添加'a'列,值为1
df.where条件
# where(sql语句)
df_1 = df.where("col1 not in {}".format([1,2,3,4]))    # 选择df中'col1'列值不在 [1,2,3,4] 的所有行 作为 df_1   
整行去重
df = df.distinct()
将df的某列转成list
# 方式1
import pandas as pd
df_col1 = pd.Dataframe(df.select('a').toPandas(),columns=['a'])    # df 为pyspark的df, ‘a’为指定列名
col1_list = list(df_col1['a'])
# 方式2
col1_list = [x['a'] for x in df.select('a').collect()]   # df.collect() 的结果是 list(pyspark.sql.types.Row)类型数据
对某列进行聚合成list
import pyspark.sql.functions as F
df = df.groupBy('a').agg(F.collect_list('b').alias('b_list'))
df = df.groupBy(['a','c']).agg(F.collect_list('b').alias('b_list'))
对list类型列变成每个元素对应一行
import pyspark.sql.functions as F
df = df.select('a', 'b', 'c', F.explode('d_list').alias('d'))
将df映射成表写sql
df_test.createOrReplaceTempView('df_test')
df_new = spark.sql("select a,b from df_test")   # 从df_test中选择a,b两列作为df_new
想了解更多,欢迎移步"文渊小站"

里面有更多知识分享,以及一些有意思的小项目~

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688638.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存