使用基本的Spark SQL函数相对简单。
蟒蛇
from pyspark.sql.functions import array, col, explode, struct, litdf = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])def to_long(df, by): # Filter dtypes and split into column names and type description cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by)) # Spark SQL supports only homogeneous columns assert len(set(dtypes)) == 1, "All columns have to be of the same type" # Create and explode an array of (column_name, column_value) structs kvs = explode(array([ struct(lit(c).alias("key"), col(c).alias("val")) for c in cols ])).alias("kvs") return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])to_long(df, ["A"])
Scala :
import org.apache.spark.sql.Dataframeimport org.apache.spark.sql.functions.{array, col, explode, lit, struct}val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")def toLong(df: Dataframe, by: Seq[String]): Dataframe = { val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1") val kvs = explode(array( cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _* )) val byExprs = by.map(col(_)) df .select(byExprs :+ kvs.alias("_kvs"): _*) .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)}toLong(df, Seq("A"))
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)