在conda activate pyspark 的环境中运行更快
1. WordCount
from pyspark import SparkConf,SparkContext if __name__ == '__main__': conf = SparkConf().setAppName("WorldCount").setMaster("local") sc = SparkContext(conf=conf) file_rdd = sc.textFile("hdfs://47.100.210.24:8020/input1/input.txt") print(file_rdd) words_rdd = file_rdd.flatMap(lambda line:line.split(" ")) words_with_one_rdd = words_rdd.map(lambda x:(x,1)) result_rdd = words_with_one_rdd.reduceByKey(lambda a,b: a+b) print(result_rdd.collect())
- glom() 查看分区
rdd = sc.parallelize([1,2,3,4,5,6],3) print(rdd.glom().collect())
结果:
3. map()
rdd = sc.parallelize([1,2,3,4,5,6],3) print(rdd.map(lambda x: x*10).collect())
结果:
4. flatmap()
rdd = sc.parallelize(['a b c','a c e','a c a']) print(rdd.map(lambda x:x.split(" ")).collect()) rdd = sc.parallelize(['a b c','a c e','a c a']) print(rdd.flatMap(lambda x:x.split(" ")).collect())
5. reduceByKey()
预聚合 ==》分组 ==》聚合
6. mapValues()
7. groupBy
rdd = sc.parallelize([('a',1),('c',1),('a',1),('b',1),('c',1),('a',1),('b',1)]) rdd2 = rdd.groupBy(lambda t: t[0]) print(rdd2.collect())
print(rdd2.map(lambda x:(x[0],list(x[1]))).collect())
- filter 过滤出来想要的数据
rdd = sc.parallelize([1,2,3,4,5]) print(rdd.filter(lambda x: x%2 == 1).collect())
9. distinct
10. union
不会去重;
类型不同仍然可以合并
- join
x = sc.parallelize([(1001,"zhangsan"),(1002,"lisi"),(1003,"wangwu"),(1004,"zhaoliu")]) y = sc.parallelize([(1001,"sales"),(1002,"tech")]) print(x.join(y).collect()) print(x.leftOuterJoin(y).collect())
file_rdd = sc.textFile("hdfs://47.100.210.24:8020/input1/order.txt") jsons_rdd = file_rdd.flatMap(lambda line:line.split("|")) dict_rdd = jsons_rdd.map(lambda json_str:json.loads(json_str)) beijing_rdd = dict_rdd.filter(lambda d: d['areaName'] == '北京') category_rdd = beijing_rdd.map(lambda x: x['areaName'] + "_" + x['category']) result_rdd = category_rdd.distinct() print(result_rdd.collect())
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)