pyspark 相关代码记录

pyspark 相关代码记录,第1张

pyspark 相关代码记录

在conda activate pyspark 的环境中运行更快
1. WordCount

from pyspark import SparkConf,SparkContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("WorldCount").setMaster("local")
    sc = SparkContext(conf=conf)

    file_rdd = sc.textFile("hdfs://47.100.210.24:8020/input1/input.txt")
    print(file_rdd)
    words_rdd = file_rdd.flatMap(lambda line:line.split(" "))
    words_with_one_rdd = words_rdd.map(lambda x:(x,1))
    result_rdd = words_with_one_rdd.reduceByKey(lambda a,b: a+b)
    print(result_rdd.collect())
    glom() 查看分区
	rdd = sc.parallelize([1,2,3,4,5,6],3)
    print(rdd.glom().collect())

结果:

3. map()

    rdd = sc.parallelize([1,2,3,4,5,6],3)
    print(rdd.map(lambda x: x*10).collect())

结果:

4. flatmap()

    rdd = sc.parallelize(['a b c','a c e','a c a'])
    print(rdd.map(lambda x:x.split(" ")).collect())


    rdd = sc.parallelize(['a b c','a c e','a c a'])
    print(rdd.flatMap(lambda x:x.split(" ")).collect())


5. reduceByKey()
预聚合 ==》分组 ==》聚合


6. mapValues()

7. groupBy

    rdd = sc.parallelize([('a',1),('c',1),('a',1),('b',1),('c',1),('a',1),('b',1)])
    rdd2 = rdd.groupBy(lambda t: t[0])
    print(rdd2.collect())

    print(rdd2.map(lambda x:(x[0],list(x[1]))).collect())

    filter 过滤出来想要的数据

rdd = sc.parallelize([1,2,3,4,5])
print(rdd.filter(lambda x: x%2 == 1).collect())


9. distinct

10. union


不会去重;
类型不同仍然可以合并

    join
    x = sc.parallelize([(1001,"zhangsan"),(1002,"lisi"),(1003,"wangwu"),(1004,"zhaoliu")])
    y = sc.parallelize([(1001,"sales"),(1002,"tech")])
    print(x.join(y).collect())
    print(x.leftOuterJoin(y).collect())
    intersection
    groupByKey
    sortBy

    局部有序分区内有序,如果想要全局有序的话,分区数设置为1.sortByKey
    案例1 日趋北京的数据
    file_rdd = sc.textFile("hdfs://47.100.210.24:8020/input1/order.txt")
    jsons_rdd = file_rdd.flatMap(lambda line:line.split("|"))
    dict_rdd = jsons_rdd.map(lambda json_str:json.loads(json_str))
    beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京')
    category_rdd = beijing_rdd.map(lambda x: x['areaName'] + "_" + x['category'])
    result_rdd = category_rdd.distinct()
    print(result_rdd.collect())

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5710702.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存