- spark框架:MapReduce
- 搭建sc环境:
- spark dataframe
- spark rdd
- 创建rdd
- rdd基本 *** 作
- 向spark传递函数
- python
- Scala
- java
【注:reduce的个数不一定和key的个数相等,可能n个key对应m个reduce】
shuffle:处于一个宽依赖,可以实现类似混洗的功能,将相同的 Key 分发至同一个 Reducer上进行处理。
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark dataframe与pandas的dataframe不同,是两种不同的数据类型,具有不同的函数和使用方法。
- 建立spark dataframe: df=spark_session.sql(‘sql’)
- 将spark dataframe转化为二维列表: df.collect()
外层数组是每一行数据(Row),里层数组是一行中每一列(Column)的数据
rdd是spark中的基础数据单元,每个rdd被分为多个分区,可以包含Python、Java、Scala中任意类型的对象。
创建rdd- 读取外部数据集
lines = sc.textFile(“README.md”) - 分发驱动器程序中的对象集合(如list、set)
lst=[(‘Alice’,1),(‘Bob’,2)]
rdd = sc.parallelize( lst )
3. 从spark dataframe转化
rdd = spark.createDataframe( lst ,[‘name’,‘age’] ).rdd
-
转化 *** 作transformation
转化 *** 作不进行实际计算和存储,只是记录计算的步骤(即惰性计算):
① map:将函数作用在rdd的每个元素中,函数返回结果作为结果rdd的值。
nums = sc.parallelize([1,2,3,4])
squared = nums.map(lambda x: x*x).collect()
② filter:将函数作用在每个rdd元素,返回符合函数条件的rdd
pythonlines = lines.filter(lambda line: “Python” in line)
③ flatMap:将函数作用在迭代器rdd上,将所有迭代器的返回值塞到同一个迭代器中返回
lines = sc.parallelize([”hello world“,“hi”])
words = lines.flatMap(lambda line: line.split(" "))
words.first() #返回“hello” -
行动 *** 作action
行动 *** 作进行实际计算,得出结果返回到驱动器程序中或者并存储到外部存储(如HDFS)中:
①reduce:接收两个同类型元素并将计算结果返回
sum = rdd.reduce(lambda x,y: x+y)
② pythonlines.first()
③ rdd.take(2) #取rdd的2个值,以列表的形式返回[(‘Alice’,1),(‘Bob’,2)]
④pythonlines.collect() #取全部值,会把整个rdd拉取到driver上,慎用,最好用.first(), take(n)等取代
⑤ pythonlines.count() #计数 -
持久化
Spark rdd惰性求值,每次调用行动 *** 作时都会将前面的依赖重新计算一边,为了避免重复计算,可以将rdd持久化。
result = nums.map(lambda x: x*x)
result.persist() # 可以选择持久化级别:MEMORY_ONLY,MEMORY_ONLY_SER,MEMORY_AND_DISK,MEMORY_AND_DISK_SER,DISK_ONLY
print(result.count())
print(result.first())
result.unpersist() -
遍历dataframe形式的rdd
rdd是不可iterable的类型,不可以用for循环遍历
①rdd_result = rdd.map(function)
②rdd_result = rdd.mapPartitions(function)
def function(iter): from multiprocessing.pool import ThreadPool* results = [] def f (r ): nonlocal results results += [ something ] # 如果r是row类型,即Row(id=' ',set=[]),可以取r['id'],r['set']等 # 如果r是tuple类型,即(Row(id=' ',set=[ ]),Row(id=' ',set=[ ])),可以取r[0],r[1],r[0]['id'],r[0]['set']等 with ThreadPool(8) as p: p.map(f, [r for r in iter]) # r是rdd的一行 return results
map和mapPartition的区别:
对于有3个元素,1个分区的rdd:
map是对rdd中的每一个元素进行 *** 作, *** 作一次的结果作为一个Row, *** 作3次;
mapPartitions则是对rdd中的每个分区的迭代器进行 *** 作, *** 作一次的结果为3个Row, *** 作1次。
上例中,map的返回结果为[Row(_1=324, _2=[‘a’, ‘b’, ‘c’]), Row(_1=100, _2=[‘e’, ‘g’]), Row(_1=555, _2=[‘a’, ‘e’, ‘m’])]
mapPartitions的返回结果为[Row(id=324, set=[‘a’, ‘b’, ‘c’]), Row(id=100, set=[‘e’, ‘g’]), Row(id=555, set=[‘a’, ‘e’, ‘m’])]
- lambda函数
word = rdd.filter(lambda s : “error” in s) - 定义局部函数
#自定义query函数,rdd作为参数传入 def function(iter): pass #将返回结果转化为df df = rdd.mapPartitions(query).toDF()
注意1:若在rdd函数中传递了某个对象的成员,spark会把成员所在整个对象都序列化发到工作节点上,传递的东西会比想象中大得多,如:
rdd.filter(lambda x: self.query in x) #会把整个self都保存到局部变量 query=self.query() rdd.filter(lambda x: query in x)
注意2:在function中定义的print语句不会输出到主机终端,会在其他工作节点输出,因此不要在调用函数内部输出调试。
Scala java欢迎分享,转载请注明来源:内存溢出
评论列表(0条)