两种方式:
1.从文件系统中加载数据创建RDD
Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是:
- 本地文件系统的地址
- 或者是分布式文件系统HDFS的地址
- 或者是Amazon S3的地址等等
2. 通过并行集合(列表)创建RDD
可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合 (列表)上创建。
举个栗子:
二、转换函数 1.filter()第一种:
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") or sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
>>> lines.foreach(print)
Hadoop is good
Spark is fast
Spark is bette第二种:
>>> array = [1,2,3,4,5]
>>> rdd = sc.parallelize(array)
>>> rdd.foreach(print)
2.map().filter(func):筛选出满足函数func的元素,并返回一个新的数据集
>>>lines = sc.textFile("file:/l/usr/local/spark/mycode/rdd/word.txt")
>>>linesWithSpark = lines.filter(lambda line: "Spark" in line)
>>> linesWithSpark.foreach(print)
Spark is fast
Spark is better
3.flatMap().map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集,一个纯粹的转换 *** 作
第一个栗子:
>>>data=[1,2,3,4,5]
>>> rdd1=sc.parallelize(data)
>>> rdd2=rdd1.map(lambda x:x+1)
>>> rdd2.foreach(print)第二个栗子:
>>>lines =sc.textFile("file:lllusr/local/spark/mycode/rdd/word.txt"')
>>>words = lines.map(lambda line:line.split(" "))
>>>words.foreach(print)
['Hadoop', ' is' , 'good']['Spark', 'is', ' fast]['Spark', 'is', ' better']
4.reduceByKey().flatMap(func):与map相似,但每个输入元素都可以映射到0或多个输出结果;先执行Map在执行flat拍扁其中的每个元素
第一个栗子:
>>>lines = sc.textFile("file:/llusr/localspark/mycode/rdd/word.txt")
>>>words = lines.flatMap(lambda line:line.split(" "))第二个栗子:
>>>words = sc.parallelize([("Hadoop",1),("is",1).("good",1),.... ("Spark",1).("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.groupByKey
>>> words1.foreach(print)
('Hadoop',)
('better',)
('fast',)
('good',)
('Spark',)
('is',)
.reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果
>>>words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), !.... ("is",1),("fast"",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.reduceByKey(lambda a,b:a+b)
>>> words1.foreach(print)
('good', 1)
('Hadoop', 1)('better', 1)('Spark', 2)('fast', 1)('is',3)
tips:
- groupByKey也是对每个key进行 *** 作,但只生成一个sequence, groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数 *** 作.
- reduceByKey用于对每个key对应的多个value进行merge *** 作,最重要的是它能够在本地先进行merge *** 作,并且merge *** 作可以通过函数自定义.
6.values().keys():返回键值
>>> list=[("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>>pairRDD= sc.parallelize(list)
>>> pairRDD.keys().foreach(print)
Hadoop
Spark
Hive
Spark
7.sortByKey().values():返回值
>>> list =[("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>> pairRDD = sc.parallelize(list)
>>>pairRDD.values(.foreach(print)
1111
8.sortBy().sortByKey():返回一个根据键排序的RDD,默认升序排序,降序sortByKey(False)
第一个栗子:
>>>list=[("Hadoop",1).("Spark",1),("Hive",1),(""Spark",1)]
>>>pairRDD= sc.parallelize(list)
>>> pairRDD.foreach(print)
('Hadoop', 1)
('Spark', 1)
('Hive', 1)
('Spark', 1)第二个栗子:
>>>pairRDD.sortByKey.foreach(print)
('Hadoop', 1)
('Hive', 1)('Spark', 1)('Spark', 1)
>>>d1.reduceByKey(lambda a,b:a+b).sortByKey(False).collect()
[('g',21), ('f,29), ('e',17), ('d', 9), ('c',27),('b',38), ('a',42)]
9.mapValues().sortBy(func):按func自定义排序
>>>d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x[0],False).collect(('g',21), ('f,29), ('e',17), ('d', 9), ('c',27), ('b',38), ('a', 42))
>>>d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x[1],False).collect()
[('a',42), ('b',38), ('f,29), ('c',27),('g', 21), ('e',17), ('d', 9)]
10.join().mapValues(func):对键值对RDD中的每个value都应用—个函数,key不会发生变化
>>> list=[("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
>>>pairRDD= sc.parallelize(list)
>>>pairRDD1 = pairRDD.mapValues(lambda x;x+1)
>>>pairRDD1.foreach(print)
('Hadoop',2)
('Spark',2)('Hive',2)('Spark', 2)
11.distinct().join(): join就表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个:(K,(V1,V2))类型的数据集。
>>> pairRDD1 = sc. parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)])
>>>pairRDD2= sc.parallelize([("spark","fast")))
>>>pairRDD3 = pairRDD1.join(pairRDD2)
>>>pairRDD3.foreach(print)
('spark', (1, 'fast'))
('spark', (2, 'fast'))
.distinct():去除重复值,一般用于在数据读入时执行该 *** 作
>>> RDD = sc. parallelize([("spark",1),("spark",1),("spark",2),("hadoop",3),("hadoop",5)]).map(lambda x:s.strip()).distinct()
>>>RDD.foreach(print)
("spark",1)
("spark",2)
("hadoop",3)
("hadoop",5)
二、常见的行动 *** 作:
- count(返回数据集中的元素个数
- collect0以数组的形式返回数据集中的所有元素
- first()返回数据集中的第一个元素
- take(n)以数组的形式返回数据集中的前n个元素
- reduce(func)通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
- foreach(func)将数据集中的每个元素传递到函数func中运行
三、题外话: A.持久化>>>rdd = sc.parallelize([1,2,3,4,5)
>>> rdd.countO
5
>>> rdd.first()
1
>>>rdd.take(3)
[1,2,3]
>>>rdd.reduce(lambda a,b:a+b)
15
>>> rdd.collect()
[1,2,3,4,5]
>>>rdd.foreach(lambda elem:print(elem))
12345
在Spark中,RDD采用惰性求值的机制,每次遇到行动 *** 作,都会从头开始执 行计算。每次调用行动 *** 作,都会触发一次从头开始的计算。这对于迭代计 算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据,通过持久化(缓存)机制避免这种重复计算的开销。
B.分区.persist():标记为持久化,在第一次行动 *** 作时执行----->.unpersist():手动地把持久化的RDD从缓存中移除
>>> list =["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>>rdd.cache) #会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
>>> print(rdd.count() #第一次行动 *** 作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
3
>>>print(','.join(rdd.collectO)) #第二次行动,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive
RDD是d性分布式数据集,通常RDD很大,会被分成很多 个分区,分别保存在不同的节点上。分区的作用主要是:增加并行度;减少通信开销。
分区原则:RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心 (core)数目
分区个数:
(1)创建RDD时手动指定分区个数
sc.textFile(path, partitionNum) 其中,path参数用于指定要加载的文件的地址,partitionNum参数用于 指定分区个数。例如:
(2)使用reparititon方法重新设置分区个数
通过转换 *** 作得到新 RDD 时,直接调用 repartition 方法即可。例如:
四、如何开始编写一个RDD启动Hadoop,打开pycharm新建文件就不说了,部分同学不知道sc是啥,我理解的是sparkContext相当于一个指挥官负责统筹调度计算RDD之间的依赖关系构建DAG(有向无环图),再有DAGScheduler负责将DAG图分解成多个阶段,每个阶段包含多个任务,每个任务又会被TaskScheduler分发给各个WorkerNode上的Executor去执行,再逐层返回最后得到结果,基本的思想还是MapReduce只是基于内存速度更快,不像Hadoop频繁的IO读写会有很大延迟,举个栗子:
from pyspark import SparkContext,SparkConf def fun1(x): arr = x.split() id = arr[0] name = arr[1:] return (id,name) def fun2(x): if x[0] == '2019110401': return False else: return True def fun3(x): key = x[0] value = int(int(x[1][1])/10) return (value,key) conf = SparkConf().setAppName('class 1').setMaster('local') sc = SparkContext(conf=conf) # alist=[1,2,3,4,5] # rdd0 = sc.parallelize(alist)#创建第一个RDD # print(rdd0) #path = 'hdfs://master:9000/test.txt' path = 'file:///home/mls/abc/test.txt' rdd0 = sc.textFile(path) print(rdd0) # rdd1 = rdd0.map(lambda x:x.strip()).distinct()#去空格后去重 print(rdd1.collect())#此时得到的是字符串 rdd2 = rdd1.map(lambda x:fun1(x))#map转换 print(rdd2.count()) print(rdd2.collect()) print(rdd2.take(4)) rddttest = rdd2.map(lambda x: fun3(x)) print(rddttest.collect()) rdd3 = rdd2.filter(fun2)#过滤 rdd2.filter(lambda x: fun2(x)) print(rdd3.count()) print(rdd3.collect()) print(rdd3.take(4)) print(rdd3.collect()[0][1][1])#打印第一个元素的第二维里的第二个元素 # rdd4 = rdd3.map(lambda x: fun3(x)) # print(rdd4.collect()) rdd5 = rdd3.groupByKey()#按照学号分组 print(rdd5.collect()) rddttest5 = rddttest.groupByKey()#按照成绩分组 print(rddttest5.collect()) rdd6 = rdd5.mapValues(lambda x:list(x))#len(list(x))只针对值 print(rdd6.collect()) rddttest6 = rddttest5.mapValues(lambda x:list(x)) print(rddttest6.collect()) print(rdd4.collect()) sc.stop()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)