Spark RDD常用算子-Action(Python版)

Spark RDD常用算子-Action(Python版),第1张

Spark RDD常用算子-Action(Python版)

Action算子返回的结果为数值,不是RDD。

  1. countByKey
    统计key出现的次数, 返回是一个字典
dataRDD = sc.parallelize([(3, 'SQL'), (1, 'hadoop'), (2, 'java'), (1, 
'spark'), (2, 'scala'), (2, 'python')])
result = dataRDD.countByKey()
print(result)

# defaultdict(, {3: 1, 1: 2, 2: 3})

  1. countByValue
    统计RDD中每个值出现的次数
dataRDD = sc.parallelize([(3, 'SQL'), (1, 'hadoop'), (2, 'java'), (1, 'spark'), (2, 'hadoop'), (2, 'hadoop'), (2, 'python')])
result = dataRDD.countByValue()
print(result)

# defaultdict(, {(3, 'SQL'): 1, (1, 'hadoop'): 1, (2, 'java'): 1, (1, 'spark'): 1, (2, 'hadoop'): 2, (2, 'python'): 1})

  1. collect
    将RDD的每个分区的数据统一收集到Driver中,形成一个list对象。此算子在使用之前要保证结果数据不会太大,不然会导致Driver内存溢出。

  1. reduce
    按照传入的逻辑对RDD数据进行聚合
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
result = dataRDD.reduce(lambda a, b: a + b)
print('result: ', result)

# result:  45

  1. fold
    与reduce功能一样,不同的是聚合是带有初始值的,此初始值聚合会作用在:分区内聚合和分区间聚合
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
result = dataRDD.fold(10, lambda a, b: a + b)
print('result: ', result)

# result:  85

先每个分区将10作为初始值聚合,然后3个分区再以10为初始值进行聚合。


  1. first
    取出RDD中第一个元素
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
result = dataRDD.first()
print('result: ', result)

 # result:  1

  1. take
    取RDD的前n个元素并以list返回
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
result = dataRDD.take(4)
print('result: ', result)

# result:  [1, 2, 3, 4]

  1. top
    对RDD先进行降序排序再取前n个
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
result = dataRDD.top(5)
print('result: ', result)

# result:  [9, 8, 7, 6, 5]

  1. count
    统计RDD中数据的数量
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
result = dataRDD.count()
print('result: ', result)

# result:  9

  1. takeSample
    takeSample(self, withReplacement, num, seed=None)
    withReplacement:为True表示取出的数允许重复,Fasle则不重复
    num:表示取样的个数
    seed:表示随机数种子(一般不设)
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
result = dataRDD.takeSample(True, 6)
print('result: ', result)

# result:  [6, 7, 4, 6, 4, 4]

  1. takeOrdered
    按升序或可选键函数指定的 RDD 中获取 N 个元素。该算子不会改变RDD的值。仅当结果较小时才应使用此算子,因为所有数据都会加载到Driver的内存中。
    takeOrdered(self, num, key=None)
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)

# 按升序取4个
result = dataRDD.takeOrdered(4)
print('result: ', result)

# 按某种键函数先升序再取值
result = dataRDD.takeOrdered(4, lambda x: -x)
print('result: ', result)

# result:  [1, 2, 3, 4]
# result:  [9, 8, 7, 6]

  1. foreach
    按照提供的逻辑对RDD中的每一个元素进行 *** 作(类似map),但是该算子无返回值。此算子执行过程直接由所在分区的Executor执行,不经过Driver。
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)

def foreach_func(data):
    print(str(data * 10) + ',')

dataRDD.foreach(foreach_func)

10,50,
20,60,
30,70,
80,40,
90,

  1. saveAsTextFile
    将RDD的数据写入文本文件中,支持本地写出和hdfs等文件系统。
dataRDD = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)

dataRDD.saveAsTextFile('../data/outer')


该算子在保存文件时,是分布式执行的,执行过程不经过Driver,每个分区所在的Executor直接将数据写出到目标文件系统中,所以才会有多少个分区就会有多少个结果文件。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5668973.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存