让我们从示例数据开始:
df = sqlContext.createDataframe([ ("a", 1, "m1"), ("a", 1, "m2"), ("a", 2, "m3"), ("a", 3, "m4"), ("b", 4, "m1"), ("b", 1, "m2"), ("b", 2, "m3"), ("c", 3, "m1"), ("c", 4, "m3"), ("c", 5, "m4"), ("d", 6, "m1"), ("d", 1, "m2"), ("d", 2, "m3"), ("d", 3, "m4"), ("d", 4, "m5"), ("e", 4, "m1"), ("e", 5, "m2"), ("e", 1, "m3"), ("e", 1, "m4"), ("e", 1, "m5")], ("a", "cnt", "major"))
请注意,我已更改
count为
cnt。Count是大多数SQL方言中的保留关键字,对于列名而言,它不是一个好的选择。
至少有两种方法可以重塑此数据:
在Dataframe上聚合
from pyspark.sql.functions import col, when, max
majors = sorted(df.select(“major”)
.distinct()
.map(lambda row: row[0])
.collect())cols = [when(col(“major”) == m, col(“cnt”)).otherwise(None).alias(m)
for m in majors]
maxs = [max(col(m)).alias(m) for m in majors]reshaped1 = (df
.select(col(“a”), cols)
.groupBy(“a”)
.agg(maxs)
.na.fill(0))reshaped1.show()
+—+—+—+—+—+—+| a| m1| m2| m3| m4| m5|+—+—+—+—+—+—+| a| 1| 1| 2| 3| 0|| b| 4| 1| 2| 0| 0|| c| 3| 0| 4| 5| 0|| d| 6| 1| 2| 3| 4|| e| 4| 5| 1| 1| 1|+—+—+—+—+—+—+groupBy
超过RDDfrom pyspark.sql import Row
grouped = (df
.map(lambda row: (row.a, (row.major, row.cnt)))
.groupByKey())def make_row(kv):
k, vs = kv
tmp = dict(list(vs) + [(“a”, k)])
return Row(**{k: tmp.get(k, 0) for k in [“a”] + majors})reshaped2 = sqlContext.createDataframe(grouped.map(make_row))
reshaped2.show()
+—+—+—+—+—+—+| a| m1| m2| m3| m4| m5|+—+—+—+—+—+—+| a| 1| 1| 2| 3| 0|| e| 4| 5| 1| 1| 1|| c| 3| 0| 4| 5| 0|| b| 4| 1| 2| 0| 0|| d| 6| 1| 2| 3| 4|+—+—+—+—+—+—+
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)