Action算子返回的结果为数值,不是RDD。
- countByKey
统计key出现的次数, 返回是一个字典
dataRDD = sc.parallelize([(3, 'SQL'), (1, 'hadoop'), (2, 'java'), (1, 'spark'), (2, 'scala'), (2, 'python')]) result = dataRDD.countByKey() print(result) # defaultdict(, {3: 1, 1: 2, 2: 3})
- countByValue
统计RDD中每个值出现的次数
dataRDD = sc.parallelize([(3, 'SQL'), (1, 'hadoop'), (2, 'java'), (1, 'spark'), (2, 'hadoop'), (2, 'hadoop'), (2, 'python')]) result = dataRDD.countByValue() print(result) # defaultdict(, {(3, 'SQL'): 1, (1, 'hadoop'): 1, (2, 'java'): 1, (1, 'spark'): 1, (2, 'hadoop'): 2, (2, 'python'): 1})
- collect
将RDD的每个分区的数据统一收集到Driver中,形成一个list对象。此算子在使用之前要保证结果数据不会太大,不然会导致Driver内存溢出。
- reduce
按照传入的逻辑对RDD数据进行聚合
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9]) result = dataRDD.reduce(lambda a, b: a + b) print('result: ', result) # result: 45
- fold
与reduce功能一样,不同的是聚合是带有初始值的,此初始值聚合会作用在:分区内聚合和分区间聚合
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3) result = dataRDD.fold(10, lambda a, b: a + b) print('result: ', result) # result: 85
先每个分区将10作为初始值聚合,然后3个分区再以10为初始值进行聚合。
- first
取出RDD中第一个元素
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3) result = dataRDD.first() print('result: ', result) # result: 1
- take
取RDD的前n个元素并以list返回
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3) result = dataRDD.take(4) print('result: ', result) # result: [1, 2, 3, 4]
- top
对RDD先进行降序排序再取前n个
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3) result = dataRDD.top(5) print('result: ', result) # result: [9, 8, 7, 6, 5]
- count
统计RDD中数据的数量
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3) result = dataRDD.count() print('result: ', result) # result: 9
- takeSample
takeSample(self, withReplacement, num, seed=None)
withReplacement:为True表示取出的数允许重复,Fasle则不重复
num:表示取样的个数
seed:表示随机数种子(一般不设)
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3) result = dataRDD.takeSample(True, 6) print('result: ', result) # result: [6, 7, 4, 6, 4, 4]
- takeOrdered
按升序或可选键函数指定的 RDD 中获取 N 个元素。该算子不会改变RDD的值。仅当结果较小时才应使用此算子,因为所有数据都会加载到Driver的内存中。
takeOrdered(self, num, key=None)
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3) # 按升序取4个 result = dataRDD.takeOrdered(4) print('result: ', result) # 按某种键函数先升序再取值 result = dataRDD.takeOrdered(4, lambda x: -x) print('result: ', result) # result: [1, 2, 3, 4] # result: [9, 8, 7, 6]
- foreach
按照提供的逻辑对RDD中的每一个元素进行 *** 作(类似map),但是该算子无返回值。此算子执行过程直接由所在分区的Executor执行,不经过Driver。
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2) def foreach_func(data): print(str(data * 10) + ',') dataRDD.foreach(foreach_func) 10,50, 20,60, 30,70, 80,40, 90,
- saveAsTextFile
将RDD的数据写入文本文件中,支持本地写出和hdfs等文件系统。
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2) dataRDD.saveAsTextFile('../data/outer')
该算子在保存文件时,是分布式执行的,执行过程不经过Driver,每个分区所在的Executor直接将数据写出到目标文件系统中,所以才会有多少个分区就会有多少个结果文件。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)