您可以减少在列列表中使用SQL表达式:
from pyspark.sql.functions import max as max_, col, whenfrom functools import reducedef row_max(*cols): return reduce( lambda x, y: when(x > y, x).otherwise(y), [col(c) if isinstance(c, str) else c for c in cols] )df = (sc.parallelize([(1, 2, 3), (2, 1, 2), (3, 4, 5)]) .toDF(["a", "b", "c"]))df.select(row_max("a", "b", "c").alias("max")))
Spark 1.5+还提供
least,
greatest
from pyspark.sql.functions import greatestdf.select(greatest("a", "b", "c"))
如果要保留最大名称,可以使用`structs:
from pyspark.sql.functions import struct, litdef row_max_with_name(*cols): cols_ = [struct(col(c).alias("value"), lit(c).alias("col")) for c in cols] return greatest(*cols_).alias("greatest({0})".format(",".join(cols))) maxs = df.select(row_max_with_name("a", "b", "c").alias("maxs"))
最后,您可以使用上面的内容来选择“顶部”列:
from pyspark.sql.functions import max((_, c), ) = (maxs .groupBy(col("maxs")["col"].alias("col")) .count() .agg(max(struct(col("count"), col("col")))) .first())df.select(c)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)