IDEA中PYSPARK的两表关联(字段名相同)

IDEA中PYSPARK的两表关联(字段名相同),第1张

前言

       使用GROUPLENS的电影评价的大数据集,Windows中IDEA2020环境中SPARK做两表关联测试学习

       个人用户学习大数据,一般会搭建的基于Linux虚拟机的HDFS集群。而SPARK主要运行在内存中,若在虚拟机的内存中运行没有在Windows中直接运行有效率吧。所以建议SPARK的学习就在Windows中了。若想在Linux运行,写好的程序也可以修改(主要是SparkSession和读取的文件的路径)后再在Linux虚拟机的HDFS集群上运行。

一、项目环境

    Windows: IDEA2020

    JDK: java version 1.8.0_231

    Python: 3.8.3

    Spark:spark-3.2.1-bin-hadoop2.7.tgz

二、电影评价 大数据集下载 1.下载地址

    http://files.grouplens.org/datasets/movielens/

 

2.测试中用到的CSV数据文件
 2.1 电影名称 movies.csv(示例):
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
 2.2 电影评价 ratings.csv(示例):
userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
 
三、在IDEA中用Python编写 
1.引入库 
# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,DoubleType
from pyspark.sql import functions as F
2.创建SparkSession的执行环境入口
if __name__ == '__main__':
    # 构建SparkSession的执行环境入口对象
    spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()

    # 通过SparkSession对象获取SparkContext 对象
    sc = spark.sparkContext
 3.读取电影评价ratings.csv数据集,创建DataFrame
    # todo 1:读取电影评价ratings.csv数据集
    # 定义结构1
    # userId,movieId,rating,timestamp
    schemaRank = StructType().add("userId", StringType(), nullable=True).\
        add("movieId",IntegerType(),nullable=True). \
        add("rating",DoubleType(),nullable=True). \
        add("timestamp",StringType(),nullable=True)

    # 用,分割读取CSV文件1
    dfRank = spark.read.format("csv").option("sep", ",").\
        option("header",True).\
        option("encoding","utf-8").\
        schema(schema=schemaRank).\
        load("../data/input/ratings.csv")
            # 文件内容样例
            # userId,movieId,rating,timestamp
            # 1,1,4.0,964982703
            # 1,3,4.0,964981247
            # 1,6,4.0,964982224
            # 1,47,5.0,964983815
 4.读取电影名称movies.csv数据集,创建DataFrame
    # 定义结构2
    # movieId,title,genres
    schemaMovie = StructType().add("movieId",IntegerType(),nullable=True). \
        add("title", StringType(), nullable=True). \
        add("genres", StringType(), nullable=True)

    # 用,分割读取CSV文件2
    dfMovie = spark.read.format("csv").option("sep", ","). \
        option("header",True). \
        option("encoding","utf-8"). \
        schema(schema=schemaMovie). \
        load("../data/input/movies.csv")
        # 文件内容样例
        # movieId,title,genres
        # 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
        # 2,Jumanji (1995),Adventure|Children|Fantasy
        # 3,Grumpier Old Men (1995),Comedy|Romance
 5.用电影名称DataFrame,建表 movies表
    dfMovie.createOrReplaceTempView("movies")
    # 验证movies表内容
    spark.sql("SELECT movieId,title,genres FROM movies").show()
        # 查询结果样例
        # +-------+--------------------+--------------------+
        # |movieId|               title|              genres|
        # +-------+--------------------+--------------------+
        # |      1|    Toy Story (1995)|Adventure|Animati...|
        # |      2|      Jumanji (1995)|Adventure|Childre...|
        # |      3|Grumpier Old Men ...|      Comedy|Romance|
  6.查询 评价次数超100次的电影,平均分排名 Top10的DataFrame
    # todo 3 查询 评价次数超100次的电影,平均分排名 Top10
    print("查询 评价次数超100次的电影,平均分排名 Top10 ")
    dfRank2 = dfRank.groupBy("movieId").agg(
        F.count("movieId").alias("cnt"),
        F.round(F.avg("rating"),2).alias("avgRank")
    ).where("cnt >100").\
        orderBy("avgRank",ascending=False).\
        limit(10)
    dfRank2.show()
        # 查询结果样例
        # +-------+---+-------+
        # |movieId|cnt|avgRank|
        # +-------+---+-------+
        # |    318|317|   4.43|
        # |    858|192|   4.29|
        # |   2959|218|   4.27|
    # todo 用电影评价,建表 ranks
    dfRank2.createOrReplaceTempView("ranks")
  7.两个dataframe关联,取电影名称
    # todo 4 两个dataframe关联,取电影名称

    dfRank2.join(dfMovie, "movieId", "inner").createOrReplaceTempView("movieRankTable")
    print("DataFrame风格 前10名电影名字 ")
    # todo 字段内容全部显示 show(10,False)
    spark.sql("SELECT movieId,title,avgRank,genres "
              "FROM movieRankTable ").show(10,False)

        # 查询结果样例
        # +-------+--------------------------------+-------+---------------------------------------+
        # |movieId|title                           |avgRank|genres                                 |
        # +-------+--------------------------------+-------+---------------------------------------+
        # |50     |Usual Suspects, The (1995)      |4.24   |Crime|Mystery|Thriller                 |

 8.两张表关联,取电影名称
    print("SQL风格 前10名电影名字  ")
    spark.sql("SELECT r.movieId,m.title,r.avgRank,m.genres "
              " FROM movies m,ranks r"
              " WHERE m.movieId =r.movieId").show(10,False)

        # 查询结果样例
        # +-------+--------------------------------+-------+---------------------------------------+
        # |movieId|title                           |avgRank|genres                                 |
        # +-------+--------------------------------+-------+---------------------------------------+
        # |50     |Usual Suspects, The (1995)      |4.24   |Crime|Mystery|Thriller                 |
        # |318    |Shawshank Redemption, The (1994)|4.43   |Crime|Drama                            |
        # |527    |Schindler's List (1993)         |4.23   |Drama|War                              |

四、运行结果

五.遇到的问题    两个dataframe关联后, 出现重复列,字段相同时,抽出时报错

pyspark.sql.utils.AnalysisException: Reference 'movieId' is ambiguous, # could be: movieranktable.movieId, movieranktable.movieId.; line 1 pos 7

pyspark.sql.utils.AnalysisException: Reference 'movieId' is ambiguous,could be: movieranktable.movieId, movieranktable.movieId.; line 1 pos 7

 对策 :join(dfMovie,"movieId","inner")的方式

# @错误的写法: dfRank2.join(dfMovie, dfRank2.movieId == dfMovie.movieId).createOrReplaceTempView("movieRankTable")
# @正确的写法: dfRank2.join(dfMovie, "movieId", "inner").createOrReplaceTempView("movieRankTable")

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/921781.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存