sparkSQL案例(第三章)

sparkSQL案例(第三章),第1张

sparkSQL案例(第三章)
    SparkSession
#coding:utf8
from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    df = spark.read.csv('hdfs://101.133.232.96:8020/input/stu_score.txt',sep=',',header=False)
    df2 = df.toDF("id","name","score")
    df2.printSchema()
    df2.show()


    df2.createTempView("score")
    spark.sql("""
        select * from score where name = '语文' limit 5
    """
    ).show()


2. Dataframe代码的构建-基于RDD方式1

#coding:utf8
from pyspark.sql import SparkSession

# /opt/module/spark/bin/spark-submit /opt/Code/SparkSQL.py
if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    
    rdd = sc.textFile('hdfs://101.133.232.96:8020/input/people.txt').
    map(lambda x:x.split(',')).map(lambda x:(x[0],int(x[1])))
    # 构建Dataframe对象
    # 参数1 被转换的RDD
    # 参数2 指令列名,通过list的形式指定
    df = spark.createDataframe(rdd,schema = ['name','age'])


    df.printSchema()
    df.show()

基于RDD方式2

if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    
    rdd = sc.textFile('hdfs://101.133.232.96:8020/input/people.txt').
    map(lambda x:x.split(',')).map(lambda x:(x[0],int(x[1])))
    
    schema = StructType().add('name',StringType(),nullable=True)
        .add('age',IntegerType(),nullable=False)
    df = spark.createDataframe(rdd,schema = schema)


    df.printSchema()
    df.show()

基于RDD方式3

if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    
    rdd = sc.textFile('hdfs://101.133.232.96:8020/input/people.txt').
    map(lambda x:x.split(',')).map(lambda x:(x[0],int(x[1])))
    
    # toDF方式构建Dataframe
    df1 = rdd.toDF(["name","age"])
    df1.printSchema()
    df1.show()

    # toDF 的方式2构建Dataframe

    scheme = StructType().add('name',StringType(),nullable=True)
            .add('age',IntegerType(),nullable=True)
    df2 = rdd.toDF(scheme)
    df2.printSchema()
    df2.show()

Dataframe的代码构建 - 基于Pandas的Dataframe

if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    
    # 基于Pandas的Dataframe构建SparkSQL的dataframe

    pdf = pd.Dataframe({
        "id":[1,2,3],
        'name':['张大仙','王小小','吕不韦'],
        'age':[11,21,32]
    })

    df = spark.createDataframe(pdf)
    df.printSchema()
    df.show()

if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    
    schema = StructType().add('data',StringType(),nullable=True)
    df = spark.read.format('text').schema(schema=schema).
        load('hdfs://101.133.232.96/input/people.txt')
    
    df.printSchema()
    df.show()

if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    df = spark.read.format('json').load('hdfs://101.133.232.96/input/people.json')
    
    df.printSchema()
    df.show()

if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    df = spark.read.format('csv')
        .option('sep',';')
        .option('header',True)
        .option('encoding',"utf-8")
        .schema('name STRING,age INT,job STRING')
        .load('hdfs://101.133.232.96/input/people.csv')
    
    df.printSchema()
    df.show()

    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    df = spark.read.format('parquet')
        .load('hdfs://101.133.232.96/input/users.parquet')
    
    df.printSchema()
    df.show()


DSL -select,filter和where,group by

if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    df = spark.read.format('csv')
        .schema('id INT,subject STRING,score INT')
        .load('hdfs://101.133.232.96/input/stu_score.txt')
    
    # DLS 风格演示
    df.select(["id","subject"]).show()
    df.select("id","subject").show()

    # Column对象的获取
    id_column = df['id']
    subject_column = df['subject']

    df.select(id_column,subject_column).show()    

    #filter
    df.filter("score < 99").show()
    df.filter(df['score']<99).show()

    #group by 
    # 调用聚合方法后,返回值依旧是dataframe
    df.groupBy("subject").count().show()
    df.groupBy(df['subject']).count().show()

if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    df = spark.read.format('csv')
        .schema('id INT,subject STRING,score INT')
        .load('hdfs://101.133.232.96/input/stu_score.txt')
    
    # 注册成临时表
    df.createTempView("score")
    df.createOrReplaceTempView("score_2")
    df.createGlobalTempView("score_3")

    # 可以通过SparkSession对象的sql api

    spark.sql("""
        select subject,count(*) as cnt from score group by subject
    """).show()

    spark.sql("""
        select subject,count(*) as cnt from score_2 group by subject
    """).show()

    spark.sql("""
        select subject,count(*) as cnt from global_tmp.score_3 group by subject
    """).show()

词频统计案例练习

if __name__ == '__main__':
    spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    rdd = sc.textFile('file:///opt/Data/sql/words.txt')
            .flatMap(lambda x:x.split(" ")).map(lambda x:[x])
    
    df = rdd.toDF(["word"])
    
    df.createTempView("words")
    spark.sql("select word,count(*) as cnt from words group by word order by cnt desc").show()
        spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
        df = spark.read.format('text').load('file:///opt/Data/sql/words.txt').show()

 df.select(F.split(df['value']," ")).show()

df.select(F.explode(F.split(df['value']," "))).show()

if __name__ == '__main__':
        spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
        df = spark.read.format('text').load('file:///opt/Data/sql/words.txt')
        
        # withColumn 如果列名一样就替换,不一样就拓展一个列
        df2 = df.withColumn('value',F.explode(F.split(df['value']," ")))
        # 返回dataframe 列
        df2.groupBy('value').count()
                .withColumnRenamed("count","cnt")
                .orderBy('cnt',ascending=False)
                .show()



1. 用户平均分

if __name__ == '__main__':
        spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
        
        schema = StructType().add('user_id',StringType(),nullable=True).
                add('movie_id',IntegerType(),nullable=True).
                add('rank',IntegerType(),nullable=True).
                add('ts',StringType(),nullable=True)
        df = spark.read.format('csv').
                option('sep','t').
                option('header',False).
                option('encoding','utf-8').
                schema(schema=schema).
                load('file:///opt/Data/ml-100k/u.data')
        # 用户平均分
        df.groupBy('user_id').avg('rank').withColumnRenamed('avg(rank)','avg_rank').withColumn('avg_rank',F.round('avg_rank',2)).
                orderBy('avg_rank',ascending=False).show()

2. 电影平均分

    df.createTempView('movie')
    # 电影平均分

    spark.sql("""
            select movie_id,round(avg(rank),2) as avg_rank
            from movie
            group by movie_id
            order by avg_rank desc
    """).show()

    大于电影平均分的电影的数量
        print('大于平均分电影的数量')
        print(df.select(F.avg(df['rank'])).first()['avg(rank)'])
        print(df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())

4. 查询高分电影中>3打分次数最多的用户,并求出此人打的平均分

        # 查询高分电影中>3打分次数最多的用户,并求出此人打的平均分

        user_id = df.where(df['rank'] > 3).groupBy('user_id').count().withColumnRenamed("count","cnt").orderBy("cnt",ascending=False).
                first()['user_id']
        
        df.filter(df['user_id'] == user_id).select(F.round(F.avg('rank'),2)).show()

5. 查询每个用户的平均打分,最低打分,最高打分

# 查询每个用户的平均打分,最低打分,最高打分

        spark.sql("""
                select user_id,min(rank) as min_rank,max(rank) as max_rank,avg(rank) as avg_rank
                        from movie group by user_id
        """).show()

        df.groupby('user_id').agg(
                F.round(F.avg('rank'),2).alias('avg_rank'),
                F.round(F.min('rank'),2).alias('min_rank'),
                F.round(F.max('rank'),2).alias('max_rank')
        ).show()
    查询被评分超过100次的电影的平均分排名TOP10
spark.sql("""
                select movie_id,count(movie_id) as cnt,avg(rank) as avg_rank
                from movie
                group by movie_id
                having cnt > 100
                order by avg_rank desc
                limit 10
        """).show()

        df.groupby('movie_id').agg(
                F.count('movie_id').alias('cnt'),
                F.round(F.avg('rank'),2).alias('avg_rank'),
        ).where('cnt>100').orderBy('avg_rank',ascending=False).limit(10).show()

电影评分数据分析总体代码

#coding:utf8
from audioop import add
from cmd import IDENTCHARS
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
import pandas as pd
from pyspark.sql import functions as F

# /opt/module/spark/bin/spark-submit /opt/Code/SparkSQL.py
if __name__ == '__main__':
        spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate()
        
        schema = StructType().add('user_id',StringType(),nullable=True).
                add('movie_id',IntegerType(),nullable=True).
                add('rank',IntegerType(),nullable=True).
                add('ts',StringType(),nullable=True)
        df = spark.read.format('csv').
                option('sep','t').
                option('header',False).
                option('encoding','utf-8').
                schema(schema=schema).
                load('file:///opt/Data/ml-100k/u.data')
        # # 用户平均分
        # df.groupBy('user_id').avg('rank').withColumnRenamed('avg(rank)','avg_rank').withColumn('avg_rank',F.round('avg_rank',2)).
        #         orderBy('avg_rank',ascending=False).show()

        df.createTempView('movie')

        # # 电影平均分

        # spark.sql("""
        #         select movie_id,round(avg(rank),2) as avg_rank
        #         from movie
        #         group by movie_id
        #         order by avg_rank desc
        # """).show()

        # # 大于平均分的电影数量
        # # F.avg(df['rank'])) ==》 dataframe对象
        # # F.avg(df['rank'])).first() ==》 row对象
        # # F.avg(df['rank'])).first()['avg(rank)'] ==》数值
        # print('大于平均分电影的数量')
        # print(df.select(F.avg(df['rank'])).first()['avg(rank)'])
        # print(df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())

        # # 查询高分电影中>3打分次数最多的用户,并求出此人打的平均分

        # user_id = df.where(df['rank'] > 3).groupBy('user_id').count().withColumnRenamed("count","cnt").orderBy("cnt",ascending=False).
        #         first()['user_id']
        
        # df.filter(df['user_id'] == user_id).select(F.round(F.avg('rank'),2)).show()

        # 查询每个用户的平均打分,最低打分,最高打分

        # spark.sql("""
        #         select user_id,min(rank) as min_rank,max(rank) as max_rank,avg(rank) as avg_rank
        #                 from movie group by user_id
        # """).show()

        # df.groupby('user_id').agg(
        #         F.round(F.avg('rank'),2).alias('avg_rank'),
        #         F.round(F.min('rank'),2).alias('min_rank'),
        #         F.round(F.max('rank'),2).alias('max_rank')
        # ).show()


        spark.sql("""
                select movie_id,count(movie_id) as cnt,avg(rank) as avg_rank
                from movie
                group by movie_id
                having cnt > 100
                order by avg_rank desc
                limit 10
        """).show()

        df.groupby('movie_id').agg(
                F.count('movie_id').alias('cnt'),
                F.round(F.avg('rank'),2).alias('avg_rank'),
        ).where('cnt>100').orderBy('avg_rank',ascending=False).limit(10).show()

if __name__ == '__main__':
        spark = SparkSession.builder.appName('create df').master('local[*]').
                config('spark.sql.shuffle.partition','2').
                getOrCreate()
        
        df = spark.read.format('csv').option('sep',';').option('header',True).
                load('file:///opt/Data/sql/people.csv')

        df.dropDuplicates().show()
        df.dropDuplicates(['age','job']).show()


        df.fillna('loss').show()
        df.fillna('loss',subset = ['job']).show()
        df.fillna({"name":"未知姓名","age":1,"job":"worker"}).show()

if __name__ == '__main__':
        spark = SparkSession.builder.appName('create df').master('local[*]').
                config('spark.sql.shuffle.partition','2').
                getOrCreate()
        
        df = spark.read.format('csv').option('sep',';').option('header',True).
                load('file:///opt/Data/sql/people.csv')

        df.select(F.concat_ws('_','name','age','job')).
                write.mode('overwrite').format('text').save('file:///opt/Data/sql/output/text')

        df.write.mode('overwrite').format('csv').option('sep',";").option("header",True).
                save('file:///opt/Data/sql/output/csv')
        
        df.write.mode('overwrite').format('parquet').save('file:///opt/Data/sql/output/parquet')

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716627.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存