在PySpark中,你用python语法建立一个函数,然后用PySpark SQL中的udf()方法在dataframe中使用,或将其注册成udf并在sql中使用。
例1 通过select()使用UDFfrom pyspark.sql.functions import col, udf from pyspark.sql.types import StringType def convertCase(str): resStr="" arr = str.split(" ") for x in arr: resStr= resStr + x[0:1].upper() + x[1:len(x)] + " " return resStr """ Converting function to UDF """ convertUDF = udf(lambda z: convertCase(z),StringType()) """ Converting function to UDF StringType() is by default hence not required """ convertUDF = udf(lambda z: convertCase(z)) df.select(col("Seqno"),convertUDF(col("Name")).alias("Name")).show()例2 通过withColumn()使用UDF
from pyspark.sql.functions import col, udf from pyspark.sql.types import StringType def upperCase(str): return str.upper() upperCaseUDF = udf(lambda z:upperCase(z),StringType()) df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show()例3 注册UDF并在sql中使用
def convertCase(str): resStr="" arr = str.split(" ") for x in arr: resStr= resStr + x[0:1].upper() + x[1:len(x)] + " " return resStr """ Using UDF on SQL """ spark.udf.register("convertUDF", convertCase,StringType()) df.createOrReplaceTempView("NAME_TABLE") spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") .show(truncate=False)例4 通过注释创建UDF
@udf(returnType=StringType()) # 或者写@udf def upperCase(str): return str.upper() df.withColumn("Cureated Name", upperCase(col("Name"))).show()特殊处理
pyspark/spark并不能保证子句按从左到右或其他固定的顺序执行。pyspark会将执行根据查询优化与规划,所以and, or, where, having表述会有副作用
""" No guarantee Name is not null will execute first If convertUDF(Name) like '%John%' execute first then you will get runtime error """ spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + "where Name is not null and convertUDF(Name) like '%John%'") .show(truncate=False)
# 如果某些记录的name值为空,就会有问题 spark.sql("select convertUDF(Name) from NAME_TABLE2") .show(truncate=False)
tips:
- 最好在UDF函数内部检验null而不是在外部
- 如果在UDF内部不能检验null,那至少使用if或者case when来在使用UDF前检验null情况
例1,2,4用的是api方法(df.select(”col1”)之类),例3用的是sql方法(spark.sql(”select * from tbl1”)之类)
api方法定义udf的格式为
# 方法一 函数定义后写udf函数 def func1(str): return str[0] convertUDF = udf(lambda z: func1(z),StringType()) convertUDF = udf(lambda z: func1(z)) # 方法二 在函数定义的上一行写上@udf @udf(returnType=StringType()) # 或者写@udf def func1(str): return str[0]
sql方法定义udf的格式为
# 方法一:注册udf函数 def func1(str): return str[0] spark.udf.register("func1",func1)参考文献
PySpark UDF (User Defined Function)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)