用RDD
一个快速但不是特别有效的解决方案是跟随
sortByKey使用
zipWithIndex和
filter:
n = 3rdd = sc.parallelize([(4, 'a'), (12, 'e'), (2, 'u'), (49, 'y'), (6, 'p')])rdd.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys()
如果n与RDD大小相比相对较小,则更有效的方法是避免完全排序:
import heapqdef key(kv): return kv[0]top_per_partition = rdd.mapPartitions(lambda iter: heapq.nlargest(n, iter, key))top_per_partition.sortByKey().zipWithIndex().filter(lambda xi: xi[1] < n).keys()
如果键远小于值,并且最终输出的顺序无关紧要,则
filter方法可以正常工作:
keys = rdd.keys()identity = lambda x: xoffset = (keys .mapPartitions(lambda iter: heapq.nlargest(n, iter)) .sortBy(identity) .zipWithIndex() .filter(lambda xi: xi[1] < n) .keys() .max())rdd.filter(lambda kv: kv[0] <= offset)
同样,如果出现平局,它将不会保留确切的n值。
用Dataframes
您可以
orderBy和
limit:
from pyspark.sql.functions import colrdd.toDF().orderBy(col("_1").desc()).limit(n)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)