编程实现Spark的WordCount的单词统计

编程实现Spark的WordCount的单词统计,第1张

编程实现Spark的WordCount的单词统计 需求一:使用PyCharm编程实现SparkCore的WordCount单词统计,并保存在HDFS中
from pyspark import SparkConf,SparkContext

import os

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    # 1.创建上下文,指定应用的名字和用谁的资源来跑
    conf=SparkConf().setAppName("first_wordcount").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    # 2.加载words.txt文件形成一个RDD,RDD的每个元素是文本的每一行
    rdd1=sc.textFile('file://file:///export/servers/data/words.txt')
    # 过滤空行
    rdd1 = rdd1.filter(lambda line: len(line.strip()) > 0)
    # 3.进一步将文本内容打散成单词
    rdd2=rdd1.flatMap(lambda line:line.split(" "))
    # 4.为每个单词标记上1,形成一个元组,具有键值对数据结构,方便做bykey的 *** 作
    rdd3=rdd2.map(lambda word:(word,1))
    # 5.进一步做reduceByKey,得到wordcount结果
    rdd4=rdd3.reduceByKey(lambda x,y:x+y)
    # 6.结果打印到控制台
    arr=rdd4.collect()
    print('wordcount结果是:',arr)
    # 7.结果输出到本地文件
    rdd4.saveAsTextFile("hdfs://node1:8020/output/file1")
需求二:使用PyCharm编程实现SparkSQL的DSL和SQL方式WordCount单词统计

 

from pyspark.sql import SparkSession,Row
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    # 1.创建上下文对象
    spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    file_df=spark.read.text('file:///export/pyworkspace/pyspark_sz26/pyspark-sparksql-3.1.2/data/words.txt')
    file_df.printSchema()
    file_df.show(truncate=False)
    # 3.注册成临时表
    file_df.createOrReplaceTempView('words_t')
    # 4.做wordcount
    print('SQL风格做wordcount')
    spark.sql('''
    select t.word,
           count(*) as cnt
    from
        (select explode(split(value,' ')) as word from words_t) t
    group by t.word
    order by cnt desc ''').show()

    print('DSL做wordcount')
    file_df.select(F.explode(F.split('value',' ')).alias('word')) 
        .groupBy('word') 
        .count() 
        .orderBy('count',ascending=False) 
        .show()

    spark.stop()
 需求三:使用PySpark读取json数据格式,多种方式查询字段并进行统计分析

 

from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
import os

os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python3.8"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    # 1-创建上下文对象
    spark = SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    df = spark.read.json('file:///export/pyworkspace/pyspark_sz26/pyspark-sparksql-3.1.2/data/employee.json')
    # (1)查询所有数据;
    df.show()
    # (2)查询所有数据,并去除重复的数据;
    df.distinct().show()
    # (3)查询所有数据,打印时去除id字段;
    df.drop('id').show()
    # (4)筛选出age>30的记录;
    df.where('age>30').show()
    # (5)将数据按age分组;
    df.groupBy('age').count().show()
    # (6)将数据按name升序排列;
    df.orderBy('name').show()
    # (7)取出前3行数据;
    print(df.take(3))
    # (8)查询所有记录的name列,并为其取别名为username;
    df.select(df.name.alias('username')).show()
    # (9)查询年龄age的平均值;
    from pyspark.sql.functions import avg
    df.agg(avg('age')).show()
    # (10)查询年龄age的最小值。
    from pyspark.sql.functions import min
    df.agg(min('age')).show()

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5721385.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存