里面有更多知识分享,以及一些有意思的小项目~
环境spark 2.4.0
udf 库自带函数import pyspark.sql.functions as F ## F.when(if条件,if结果).otherwise(else结果) df = df.withColumn('a', F.when(F.col('a').isin([1,2,3]) | F.isnan(F.col('a')) | F.col('a').isNull(), F.col('a')).otherwise(4)) ## F.lit,一般用于添加常数列 df = df.withColumn('a', F.lit(1)) # df = df.withColumn('a', F.lit('ABC')) # df = df.withColumn('a', F.lit([1,2,3])) ## F.max(字段名).alias(字段重命名) df_1 = df.groupBy('a').agg(F.max(F.col('b')).alias('b_max')) ## F.desc() 降序, F.asc() 升序 df_1 = df.sort(F.desc('a')) df_2 = df.sort(F.desc('a'), F.asc('b')) # 'a'降序之后,再'b'升序 # F.rand() 生成[0.0, 1.0]中double类型的随机数 df = df.withColumn('rand', F.rand(seed=45)) # F.col('a').cast(新数据类型) 将某列数据类型转成新的数据类型 df = df.withColumn('a', F.col('a').cast(IntegerType())) # F.row_number().over(window) # 对'a'列分组,分组里面根据 'b'列 降序排列,生成对应排列顺序的新列 'topn'(每个分组里面从1开始递增) from pyspark.sql.window import Window window = Window.partitionBy('a').orderBy(F.col('b').desc()) df4 = df3.withColumn('topn', F.row_number().over(window)) # F.collect_list(), F.struct(), F.sort_array # df 根据 'a' 分组,Row(b,c) 组成list, 生成 df_2; df_2 中的 b_c列 再根据 b升序 后,取 c 组成list,最后生成 df_3 df_2 = df.groupBy('a').agg(F.collect_list(F.struct('b', 'c')).alias('b_c')) # b_c 数据格式: [Row(b=1, c=2671635), ...] df_3 = df_2.select('a', F.sort_array('b_c')['c'].alias('c_list')) # F.sort_array('b_c')['c'] 根据 'b_c' 中的第一个字段('b') 升序排列,然后取 'c'字段当自定义函数 的 参数为 1个字段时(即1列)
# 截取x字符串的部分子字符串 def f_split(x): return x[3:5] udf_split = F.udf(f_split, StringType()) # 当参数为1列时,可以直接写函数名而无需lambda函数 # 或写成:udf_split = F.udf(lambda x: f_split(x), StringType()) df = df.withColumn('a_sub', udf_split('a'))当自定义函数 的 参数为 多个字段时(即多列)
# 截取x字符串的部分子字符串 def f_sum(x,y): return x+y udf_sum = F.udf(lambda x,y: f_sum(x,y), IntegerType()) # 当参数为 多列时 df = df.withColumn('a+b', udf_sum('a', 'b'))当自定义函数 的 参数为 多个字段+常量参数 时
## (方式1) def f_cal(a, b, val_1): return (a+b)*val_1 def udf_cal(val_1): return F.udf(lambda x, y: f_cal(x, y, val_1), IntegerType()) val_1 = 2 # df 的 'a', 'b' 两列经过自定义函数 udf_getType 生成新列 c df = df.withColumn('c', udf_cal(val_1)('a', 'b'))
## (方式2) def f_cal(a, b, val_1): return (a+b)*val_1 udf_cal = F.udf(lambda x, y, z: f_cal(x, y, z), IntegerType()) val_1 = 2 # df 的 'a', 'b' 两列经过自定义函数 udf_getType 生成新列 c df = df.withColumn('c', udf_cal('a', 'b', F.lit(val_1)))df.rdd.map使用udf
# 方式1 from pyspark.sql.types import Row def f_row(x, val_1): row = {} row['a_new'], row['b_new'] = (x[0]+x[3])*val_1, (x[1]-x[2])/val_1 # a_new = (a + d) * val_1 , b_new = (b - c) / val_1 return row # 假设df有字段:a,b,c,d 四列,且依次排列 # 通过自定义函数,生成新的df_1:a_new, b_new (注意:df_1字段的顺序不确定) df_1 = df.rdd.map(lambda x: Row(**f_row(x, 100))).toDF()
# 方式2 def f_row(x, val_1): row = ((x[0]+x[3])*val_1, (x[1]-x[2])/val_1) # a_new = (a + d) * val_1 , b_new = (b - c) / val_1 return row # 假设df有字段:a,b,c,d 四列,且依次排列 # 通过自定义函数,生成新的df_1:a_new, b_new (注意:df_1字段的顺序不确定) df_1 = df.rdd.map(lambda x: f_row(x, 100)).toDF(['a_new', 'b_new'])df.foreach / df.foreachPartition 使用udf
# df.foreach def f_row(row): c = row['a'] + row['b'] print(c) df.foreach(f_row)
# df.foreach def f_row(row, val_1): c = row['a'] + row['b'] + val_1 print(c) df.foreach(lambda row: f_row(row, 100))
# df.foreachPartition def f_p(p, val_1): for row in p: c = row['a'] + row['b'] + val_1 print(c) df.foreachPartition(lambda p: f_p(p, 100))想了解更多,欢迎移步"文渊小站"
里面有更多知识分享,以及一些有意思的小项目~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)