pyspark-用户自定义函数udf

pyspark-用户自定义函数udf,第1张

pyspark-用户自定义函数udf

在PySpark中,你用python语法建立一个函数,然后用PySpark SQL中的udf()方法在dataframe中使用,或将其注册成udf并在sql中使用。

例1 通过select()使用UDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr

""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z),StringType())

""" Converting function to UDF 
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z))

df.select(col("Seqno"),convertUDF(col("Name")).alias("Name")).show()
例2 通过withColumn()使用UDF
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

def upperCase(str):
    return str.upper()
upperCaseUDF = udf(lambda z:upperCase(z),StringType())   

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show()

例3 注册UDF并在sql中使用
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr

""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") 
     .show(truncate=False)
例4 通过注释创建UDF
@udf(returnType=StringType()) # 或者写@udf
def upperCase(str):
    return str.upper()

df.withColumn("Cureated Name", upperCase(col("Name"))).show()
特殊处理

pyspark/spark并不能保证子句按从左到右或其他固定的顺序执行。pyspark会将执行根据查询优化与规划,所以and, or, where, having表述会有副作用

""" 
No guarantee Name is not null will execute first
If convertUDF(Name) like '%John%' execute first then 
you will get runtime error
"""
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " +  
         "where Name is not null and convertUDF(Name) like '%John%'") 
     .show(truncate=False)
# 如果某些记录的name值为空,就会有问题
spark.sql("select convertUDF(Name) from NAME_TABLE2") 
     .show(truncate=False)

tips:

  1. 最好在UDF函数内部检验null而不是在外部
  2. 如果在UDF内部不能检验null,那至少使用if或者case when来在使用UDF前检验null情况
总结

例1,2,4用的是api方法(df.select(”col1”)之类),例3用的是sql方法(spark.sql(”select * from tbl1”)之类)

api方法定义udf的格式为

# 方法一 函数定义后写udf函数
def func1(str):
	return str[0]
convertUDF = udf(lambda z: func1(z),StringType())
convertUDF = udf(lambda z: func1(z))

# 方法二 在函数定义的上一行写上@udf
@udf(returnType=StringType()) # 或者写@udf
def func1(str):
	return str[0]

sql方法定义udf的格式为

# 方法一:注册udf函数
def func1(str):
	return str[0]
spark.udf.register("func1",func1)
参考文献

PySpark UDF (User Defined Function)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5681418.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存