spark[2]: 关于partition的相关 *** 作(帮助理解RDD)

spark[2]: 关于partition的相关 *** 作(帮助理解RDD),第1张

spark[2]: 关于partition的相关 *** 作(帮助理解RDD)

准备工作

# PySpark is the Spark API for Python. we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Creating a spark context class
# sc = SparkContext()

# Creating a spark session
spark = SparkSession 
    .builder 
    .appName("Python Spark IBM Cloud Example") 
    .config("spark.some.config.option", "some-value") 
    .getOrCreate()

创建RDD

nums = [i for i in range(10)]

rdd = sc.parallelize(nums)
    
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

使用getNumPartitions()查看分区数量,使用glom()查看分区内容。

output:


函数parallelize()指定分区数量

    分区为2
rdd = sc.parallelize(nums, 2)
    
print("Default parallelism: {}".format(sc.defaultParallelism))
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

output:

    分区为15
rdd = sc.parallelize(nums, 15)

print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

output:

使用partitionBy()函数

其中,数据集需要是具有键/值对的元组,因为默认分区器使用键的哈希值将元素分配给分区。

rdd = sc.parallelize(nums) 
        .map(lambda el: (el, el)) 
        .partitionBy(2) 
        .persist()
    
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

j=0
for i in rdd.glom().collect():
    j+=1
    print("partition: " + str(j) + " "+ str(i))

output:

显式分区

准备数据:

transactions = [
    {'name': 'Bob', 'amount': 100, 'country': 'United Kingdom'},
    {'name': 'James', 'amount': 15, 'country': 'United Kingdom'},
    {'name': 'Marek', 'amount': 51, 'country': 'Poland'},
    {'name': 'Johannes', 'amount': 200, 'country': 'Germany'},
    {'name': 'Thomas', 'amount': 30, 'country': 'Germany'},
    {'name': 'Paul', 'amount': 75, 'country': 'Poland'},
    {'name': 'Pierre', 'amount': 120, 'country': 'France'},
    {'name': 'Frank', 'amount': 180, 'country': 'France'}
]

为了使得每一个国家在一个node中,我们使用自定义分区:

# Dummy implementation assuring that data for each country is in one partition
def country_partitioner(country):
    return hash(country)% (10**7+1)
    #return portable_hash(country)

rdd = sc.parallelize(transactions) 
        .map(lambda el: (el['country'], el)) 
        .partitionBy(3, country_partitioner)
    
print("Number of partitions: {}".format(rdd.getNumPartitions()))
print("Partitioner: {}".format(rdd.partitioner))
print("Partitions structure: {}".format(rdd.glom().collect()))

print("n--n")
for i, j in enumerate(rdd.glom().collect()):
    print("npartition: " + str(i+1) + "n"+ str(j))

output:


利用partition做计算

使用mapPartitions()函数可以单独对每个partition进行map运算。

def sum_sales(iterator):
    yield sum(transaction[1]['amount'] for transaction in iterator)

by_country = sc.parallelize(transactions) 
        .map(lambda el: (el['country'], el)) 
        .partitionBy(5, country_partitioner)
    
# Sum sales in each partition
sum_amounts = by_country 
    .mapPartitions(sum_sales) 
    .collect()

print("Total sales for each partition: {}".format(sum_amounts))

output:

每个国家对应的amounts总数


使用dataframe查看partition效果

df = spark.createDataframe(transactions)

for i, j in enumerate(df.rdd.glom().collect()):
    print("partition: " + str(i+1) + "n"+ str(j))

output:

可知数据分布在两个partition。

使用repartition函数

可以直接指定列名:

# Repartition by column
df2 = df.repartition(10,"country")

print("nAfter 'repartition()'")
print("Number of partitions: {}".format(df2.rdd.getNumPartitions()))
print("Partitioner: {}".format(df2.rdd.partitioner))
print("Partitions structure: {}".format(df2.rdd.glom().collect()))

outoput:


查看每个partition中的数据:

for i, j in enumerate(df2.rdd.glom().collect()):
    print("partition: " + str(i+1) + "n"+ str(j))

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701776.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存