您可以链接条件以查找哪些列等于最大值:
cond = "psf.when" + ".when".join(["(psf.col('" + c + "') == psf.col('max_value'), psf.lit('" + c + "'))" for c in df.columns])import pyspark.sql.functions as psfdf.withColumn("max_value", psf.greatest(*df.columns)) .withColumn("MAX", eval(cond)) .show() +-----+--------+----+-----+---------+--------+ |Alice|Eleonora|Mike|Helen|max_value| MAX| +-----+--------+----+-----+---------+--------+ | 2| 7| 8| 6| 8| Mike| | 11| 5| 9| 4| 11| Alice| | 6| 15| 12| 3| 15|Eleonora| | 5| 3| 7| 8| 8| Helen| +-----+--------+----+-----+---------+--------+
或: 爆炸并过滤
df.withColumn("max_value", psf.greatest(*df.columns)) .select("*", psf.posexplode(psf.create_map(list(chain(*[(psf.lit(c), psf.col(c)) for c in df.columns]))))) .filter("max_value = value") .select(df.columns + [psf.col("key").alias("MAX")]) .show()
OR: 使用
UDF在词典:
from pyspark.sql.types import *argmax_udf = psf.udf(lambda m: max(m, key=m.get), StringType())df.withColumn("map", psf.create_map(list(chain(*[(psf.lit(c), psf.col(c)) for c in df.columns])))) .withColumn("MAX", argmax_udf("map")) .drop("map") .show()
OR: 使用
UDF一个参数:
from pyspark.sql.types import *def argmax(cols, *args): return [c for c, v in zip(cols, args) if v == max(args)][0]argmax_udf = lambda cols: psf.udf(lambda *args: argmax(cols, *args), StringType())df.withColumn("MAX", argmax_udf(df.columns)(*df.columns)) .show()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)