pyspark学习笔记

pyspark学习笔记,第1张

pyspark学习笔记

文章目录
  • spark框架:MapReduce
  • 搭建sc环境:
  • spark dataframe
  • spark rdd
    • 创建rdd
    • rdd基本 *** 作
  • 向spark传递函数
    • python
    • Scala
    • java

spark框架:MapReduce


【注:reduce的个数不一定和key的个数相等,可能n个key对应m个reduce】
shuffle:处于一个宽依赖,可以实现类似混洗的功能,将相同的 Key 分发至同一个 Reducer上进行处理。

搭建sc环境:

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark dataframe

spark dataframe与pandas的dataframe不同,是两种不同的数据类型,具有不同的函数和使用方法。

  1. 建立spark dataframe: df=spark_session.sql(‘sql’)
  2. 将spark dataframe转化为二维列表: df.collect()
    外层数组是每一行数据(Row),里层数组是一行中每一列(Column)的数据
spark rdd

rdd是spark中的基础数据单元,每个rdd被分为多个分区,可以包含Python、Java、Scala中任意类型的对象。

创建rdd
  1. 读取外部数据集
    lines = sc.textFile(“README.md”)
  2. 分发驱动器程序中的对象集合(如list、set)
    lst=[(‘Alice’,1),(‘Bob’,2)]
    rdd = sc.parallelize( lst )
    3. 从spark dataframe转化
    rdd = spark.createDataframe( lst ,[‘name’,‘age’] ).rdd
rdd基本 *** 作
  1. 转化 *** 作transformation
    转化 *** 作不进行实际计算和存储,只是记录计算的步骤(即惰性计算):
    ① map:将函数作用在rdd的每个元素中,函数返回结果作为结果rdd的值。
    nums = sc.parallelize([1,2,3,4])
    squared = nums.map(lambda x: x*x).collect()
    ② filter:将函数作用在每个rdd元素,返回符合函数条件的rdd
    pythonlines = lines.filter(lambda line: “Python” in line)
    ③ flatMap:将函数作用在迭代器rdd上,将所有迭代器的返回值塞到同一个迭代器中返回
    lines = sc.parallelize([”hello world“,“hi”])
    words = lines.flatMap(lambda line: line.split(" "))
    words.first() #返回“hello”

  2. 行动 *** 作action
    行动 *** 作进行实际计算,得出结果返回到驱动器程序中或者并存储到外部存储(如HDFS)中:
    ①reduce:接收两个同类型元素并将计算结果返回
    sum = rdd.reduce(lambda x,y: x+y)
    ② pythonlines.first()
    ③ rdd.take(2) #取rdd的2个值,以列表的形式返回[(‘Alice’,1),(‘Bob’,2)]
    ④pythonlines.collect() #取全部值,会把整个rdd拉取到driver上,慎用,最好用.first(), take(n)等取代
    ⑤ pythonlines.count() #计数

  3. 持久化
    Spark rdd惰性求值,每次调用行动 *** 作时都会将前面的依赖重新计算一边,为了避免重复计算,可以将rdd持久化。
    result = nums.map(lambda x: x*x)
    result.persist() # 可以选择持久化级别:MEMORY_ONLY,MEMORY_ONLY_SER,MEMORY_AND_DISK,MEMORY_AND_DISK_SER,DISK_ONLY
    print(result.count())
    print(result.first())
    result.unpersist()

  4. 遍历dataframe形式的rdd
    rdd是不可iterable的类型,不可以用for循环遍历
    ①rdd_result = rdd.map(function)
    ②rdd_result = rdd.mapPartitions(function)

def function(iter):
	from multiprocessing.pool import ThreadPool*
	results = []
	def f (r ):
		nonlocal results	
		results += [ something ]
		# 如果r是row类型,即Row(id=' ',set=[]),可以取r['id'],r['set']等
		# 如果r是tuple类型,即(Row(id=' ',set=[ ]),Row(id=' ',set=[ ])),可以取r[0],r[1],r[0]['id'],r[0]['set']等
	with ThreadPool(8) as p:
		p.map(f, [r for r in iter])
		# r是rdd的一行
	return results

map和mapPartition的区别:
对于有3个元素,1个分区的rdd:
map是对rdd中的每一个元素进行 *** 作, *** 作一次的结果作为一个Row, *** 作3次;
mapPartitions则是对rdd中的每个分区的迭代器进行 *** 作, *** 作一次的结果为3个Row, *** 作1次。
上例中,map的返回结果为[Row(_1=324, _2=[‘a’, ‘b’, ‘c’]), Row(_1=100, _2=[‘e’, ‘g’]), Row(_1=555, _2=[‘a’, ‘e’, ‘m’])]
mapPartitions的返回结果为[Row(id=324, set=[‘a’, ‘b’, ‘c’]), Row(id=100, set=[‘e’, ‘g’]), Row(id=555, set=[‘a’, ‘e’, ‘m’])]

向spark传递函数 python
  1. lambda函数
    word = rdd.filter(lambda s : “error” in s)
  2. 定义局部函数
#自定义query函数,rdd作为参数传入
def function(iter):
	pass
#将返回结果转化为df
df = rdd.mapPartitions(query).toDF()

注意1:若在rdd函数中传递了某个对象的成员,spark会把成员所在整个对象都序列化发到工作节点上,传递的东西会比想象中大得多,如:

rdd.filter(lambda x: self.query in x) #会把整个self都保存到局部变量
query=self.query()
rdd.filter(lambda x: query in x)

注意2:在function中定义的print语句不会输出到主机终端,会在其他工作节点输出,因此不要在调用函数内部输出调试。

Scala java

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4677248.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存