RDD(Resilient Distributed Datasets)
- 是一个容错的,并行的数据结构,可以让用户显式的将数据存储到磁盘和内存中,并能控制数据的分区
- 提供了一组丰富的 *** 作来 *** 作数据
- 本质是一个只读的分区记录集合,一个RDD可以包含多个分区,每个分区是一个DataSet片段
- RDD之间可以相互依赖(窄依赖,宽依赖)
- 通过不同的分区,将数据实际映射到不同的Spark节点上
- 只读不能修改:只能通过转换 *** 作生成一个新的RDD
- 分布式存储:一个RDD通过分区可以分布在多台机器上进行并行数据处理
- 内存计算:可以将全部或部分数据缓存在内存中,且可在多次计算过程中重用
- 具有d性:在计算过程中,当内存不足时,可以将一部分数据落到磁盘上处理
import findspark findspark.init() from pyspark import SparkConf, SparkContext #local[*]表示在本地运行Spark,其工作线程数与逻辑线程数相同 conf = SparkConf().setMaster("local[*]").setAppName('RDD_create_demo') sc = SparkContext(conf)parallelize
list1 = [1, 2, 3, 4, 5] #sc.parallelize将一个list转成一个RDD对象 #numSlices表示分几个区 rdd1 = sc.parallelize(list1, numSlices = 3) #collect()将RDD对象转成一个list rdd1.collect() #glom()展示每个分区 rdd1.glom().collect() result : [[1], [2, 3], [4, 5]]range
#第一个参数表示开始值,第二个参数表示结束值(不包含),第三个参数表示步长 rdd2 = sc.range(1, 20, 2, numSlieces = 3)textFile
#textFile() #支持访问文件夹,如sc.textFile("hdfs:///dataset") #支持访问压缩文件,如sc.textFile("hdfs:///dataset/words.gz") #支持通过通配符访问,如sc.textFile("hdfs:///dataset/*.txt") #读取的数据每一行是一个元素 #第二个参数是指定的最小分区数 rdd3 = sc.textFile('./wordcount.txt', 2) rdd3.collect() result : ['hadoop spark flume', 'spark hadoop', 'flume hadoop']通过RDD衍生
wordsRDD = rdd3.flatMap(lambda line:line.split(" ")) wordsRDD.collect() result : ['hadoop', 'spark', 'flume', 'spark', 'hadoop', 'flume', 'hadoop'] sc.stop()RDD算子 RDD算子分类
- Transformation(转换) *** 作:在一个已经存在的RDD上创建一个新的RDD,将旧的RDD数据转换为另外一种形式后放入新的RDD。如:map, flatMap, filter
- Action(动作) *** 作:执行各个分区的计算任务,将得到的结果返回到driver中。如reduce, collect,show
- 惰性求值:Spark中所有的Transformation是Lazy的,它们不会立即执行获得结果。它们只会记录在数据集上要应用的 *** 作,只有当需要返回结果给Driver时才会执行这些 *** 作,通过DAGScheduler和TaskScheduler分发到集群中运行
- 默认情况下,每一个Action运行的时候,其所关联的所有Transformation RDD都会重新计算,但是也可以使用缓存将RDD持久化到磁盘或内存中,这个是为了下次可以更快的访问,会把数据保存到集群上
# 创建SparkContext conf = SparkConf().setMaster("local[*]") sc = SparkContext(conf=conf)Transformation *** 作 Map算子
#rdd.map(func, preservesPartitioning=False) #对RDD每个元素按照func定义的逻辑进行一对一处理 rdd1 = sc.range(5) rdd2 = rdd1.map(lambda x:x * 2) rdd2.collect() result : [0, 2, 4, 6, 8]flatMap算子
#rdd.flatMap(func, preservesPartitioning=False) #对RDD中每个元素按照func函数定义的处理逻辑进行 *** 作,并将结果进行扁平化处理 list1 = ['Hello Lily', 'Hello Lucy', 'Hello Tim'] rdd1 = sc.parallelize(list1) rdd2 = rdd1.map(lambda x:x.split(' ')) rdd3 = rdd1.flatMap(lambda x:x.split(' ')) print(rdd2.collect()) result : [['Hello', 'Lily'], ['Hello', 'Lucy'], ['Hello', 'Tim']] print(rdd3.collect()) result : ['Hello', 'Lily', 'Hello', 'Lucy', 'Hello', 'Tim']reduceByKey算子
#rdd.reduceByKey(func, numPartitions=None, partitionFunc=filter算子) #func: 执行数据处理的函数, 传入两个参数, 一个是当前值, 一个是局部汇总, 最终就是按照Key的汇总结果 #按照函数func的逻辑对元素格式为KV的RDD中的数据进行运算,以减少元素个数 list1 = ['Hello Lily', 'Hello Lucy', 'Hello Tim'] rdd1 = sc.parallelize(list1) rdd1 = rdd1.flatMap(lambda x:x.split(' ')) rdd1 = rdd1.map(lambda x:(x, 1)) rdd1.collect() result : [('Hello', 1), ('Lily', 1), ('Hello', 1), ('Lucy', 1), ('Hello', 1), ('Tim', 1)] rdd2 = rdd1.reduceByKey(lambda x, y:x+y) rdd2.collect() result : [('Lily', 1), ('Hello', 3), ('Lucy', 1), ('Tim', 1)]
#rdd.filter(f) #根据过滤函数func的逻辑定义来对原RDD中的元素进行过滤,并返回一个新的RDD,新RDD是由满足过滤函数为True的元素构成。 rdd = sc.parallelize([1, 2, 3, 4, 5, 6]) # 保留func返回值为True的元素 rdd2 = rdd.filter(lambda x: x%2 == 0) print(rdd2.collect()) result : [2, 4, 6]groupBy算子
#rdd.groupBy(f, numPartitions=None, partitionFunc=) #接收一个函数func,这个函数返回的值作为Key,然后通过这个Key来对其中的元素进行分组,并返回一个新的RDD对象。 rdd = sc.parallelize([1, 2, 3, 4, 5, 10]) rdd2 = rdd.groupBy(lambda x: x%2) print(rdd2.collect()) result : [(0, ), (1, )] for k, v in rdd2.collect(): print(k) print(sorted(v)) result : 0 [2, 4, 10] 1 [1, 3, 5]
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)