pyspark常用语法

pyspark常用语法,第1张

pyspark常用语法 想了解更多,欢迎移步"文渊小站"

里面有更多知识分享,以及一些有意思的小项目~

环境

spark 2.4.0

udf 库自带函数
import pyspark.sql.functions as F

## F.when(if条件,if结果).otherwise(else结果)
df = df.withColumn('a', F.when(F.col('a').isin([1,2,3]) | F.isnan(F.col('a')) | F.col('a').isNull(), F.col('a')).otherwise(4))
## F.lit,一般用于添加常数列
df = df.withColumn('a', F.lit(1))   # df = df.withColumn('a', F.lit('ABC'))    # df = df.withColumn('a', F.lit([1,2,3]))
## F.max(字段名).alias(字段重命名)
df_1 = df.groupBy('a').agg(F.max(F.col('b')).alias('b_max'))
## F.desc() 降序, F.asc() 升序
df_1 = df.sort(F.desc('a'))
df_2 = df.sort(F.desc('a'), F.asc('b'))   # 'a'降序之后,再'b'升序
# F.rand() 生成[0.0, 1.0]中double类型的随机数
df = df.withColumn('rand', F.rand(seed=45))
# F.col('a').cast(新数据类型)  将某列数据类型转成新的数据类型
df = df.withColumn('a', F.col('a').cast(IntegerType()))

# F.row_number().over(window)  
# 对'a'列分组,分组里面根据 'b'列 降序排列,生成对应排列顺序的新列 'topn'(每个分组里面从1开始递增)
from pyspark.sql.window import Window
window = Window.partitionBy('a').orderBy(F.col('b').desc())
df4 = df3.withColumn('topn', F.row_number().over(window))

# F.collect_list(), F.struct(), F.sort_array
# df 根据 'a' 分组,Row(b,c) 组成list, 生成 df_2; df_2 中的 b_c列 再根据 b升序 后,取 c 组成list,最后生成 df_3
df_2 = df.groupBy('a').agg(F.collect_list(F.struct('b', 'c')).alias('b_c'))   # b_c 数据格式: [Row(b=1, c=2671635), ...]
df_3 = df_2.select('a', F.sort_array('b_c')['c'].alias('c_list'))    # F.sort_array('b_c')['c']   根据 'b_c' 中的第一个字段('b') 升序排列,然后取 'c'字段
当自定义函数 的 参数为 1个字段时(即1列)
# 截取x字符串的部分子字符串
def f_split(x):
    return x[3:5]

udf_split = F.udf(f_split, StringType())    # 当参数为1列时,可以直接写函数名而无需lambda函数  # 或写成:udf_split = F.udf(lambda x: f_split(x), StringType())
df = df.withColumn('a_sub', udf_split('a'))
当自定义函数 的 参数为 多个字段时(即多列)
# 截取x字符串的部分子字符串
def f_sum(x,y):
    return x+y

udf_sum = F.udf(lambda x,y: f_sum(x,y), IntegerType())    # 当参数为 多列时
df = df.withColumn('a+b', udf_sum('a', 'b'))
当自定义函数 的 参数为 多个字段+常量参数 时
## (方式1)
def f_cal(a, b, val_1):
    return (a+b)*val_1

def udf_cal(val_1):
    return F.udf(lambda x, y: f_cal(x, y, val_1), IntegerType())

val_1 = 2
# df 的 'a', 'b' 两列经过自定义函数 udf_getType 生成新列 c   
df = df.withColumn('c', udf_cal(val_1)('a', 'b'))    
## (方式2)
def f_cal(a, b, val_1):
    return (a+b)*val_1

udf_cal = F.udf(lambda x, y, z: f_cal(x, y, z), IntegerType())
val_1 = 2
# df 的 'a', 'b' 两列经过自定义函数 udf_getType 生成新列 c   
df = df.withColumn('c', udf_cal('a', 'b', F.lit(val_1)))
df.rdd.map使用udf
# 方式1
from pyspark.sql.types import Row

def f_row(x, val_1):
    row = {}
    row['a_new'], row['b_new'] = (x[0]+x[3])*val_1, (x[1]-x[2])/val_1   # a_new = (a + d) * val_1 ,   b_new = (b - c) / val_1
    return row

# 假设df有字段:a,b,c,d 四列,且依次排列   # 通过自定义函数,生成新的df_1:a_new, b_new (注意:df_1字段的顺序不确定)
df_1 = df.rdd.map(lambda x: Row(**f_row(x, 100))).toDF()   
# 方式2
def f_row(x, val_1):
    row = ((x[0]+x[3])*val_1, (x[1]-x[2])/val_1)   # a_new = (a + d) * val_1 ,   b_new = (b - c) / val_1
    return row

# 假设df有字段:a,b,c,d 四列,且依次排列   # 通过自定义函数,生成新的df_1:a_new, b_new (注意:df_1字段的顺序不确定)
df_1 = df.rdd.map(lambda x: f_row(x, 100)).toDF(['a_new', 'b_new'])   
df.foreach / df.foreachPartition 使用udf
# df.foreach
def f_row(row):
    c = row['a'] + row['b']
    print(c)

df.foreach(f_row)
# df.foreach
def f_row(row, val_1):
    c = row['a'] + row['b'] + val_1
    print(c)

df.foreach(lambda row: f_row(row, 100))
# df.foreachPartition
def f_p(p, val_1):
    for row in p:
        c = row['a'] + row['b'] + val_1
        print(c)

df.foreachPartition(lambda p: f_p(p, 100))
想了解更多,欢迎移步"文渊小站"

里面有更多知识分享,以及一些有意思的小项目~

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5688680.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存