一、项目环境前言
使用GROUPLENS的电影评价的大数据集,Windows中IDEA2020环境中SPARK做两表关联测试学习。
个人用户学习大数据,一般会搭建的基于Linux虚拟机的HDFS集群。而SPARK主要运行在内存中,若在虚拟机的内存中运行没有在Windows中直接运行有效率吧。所以建议SPARK的学习就在Windows中了。若想在Linux运行,写好的程序也可以修改(主要是SparkSession和读取的文件的路径)后再在Linux虚拟机的HDFS集群上运行。
Windows: IDEA2020
JDK: java version 1.8.0_231
Python: 3.8.3
Spark:spark-3.2.1-bin-hadoop2.7.tgz
http://files.grouplens.org/datasets/movielens/
2.测试中用到的CSV数据文件
2.1 电影名称 movies.csv(示例):
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
2.2 电影评价 ratings.csv(示例):
userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
三、在IDEA中用Python编写 1.引入库
# coding:utf8
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType,DoubleType
from pyspark.sql import functions as F
2.创建SparkSession的执行环境入口
if __name__ == '__main__':
# 构建SparkSession的执行环境入口对象
spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
# 通过SparkSession对象获取SparkContext 对象
sc = spark.sparkContext
3.读取电影评价ratings.csv数据集,创建DataFrame
# todo 1:读取电影评价ratings.csv数据集
# 定义结构1
# userId,movieId,rating,timestamp
schemaRank = StructType().add("userId", StringType(), nullable=True).\
add("movieId",IntegerType(),nullable=True). \
add("rating",DoubleType(),nullable=True). \
add("timestamp",StringType(),nullable=True)
# 用,分割读取CSV文件1
dfRank = spark.read.format("csv").option("sep", ",").\
option("header",True).\
option("encoding","utf-8").\
schema(schema=schemaRank).\
load("../data/input/ratings.csv")
# 文件内容样例
# userId,movieId,rating,timestamp
# 1,1,4.0,964982703
# 1,3,4.0,964981247
# 1,6,4.0,964982224
# 1,47,5.0,964983815
4.读取电影名称movies.csv数据集,创建DataFrame
# 定义结构2
# movieId,title,genres
schemaMovie = StructType().add("movieId",IntegerType(),nullable=True). \
add("title", StringType(), nullable=True). \
add("genres", StringType(), nullable=True)
# 用,分割读取CSV文件2
dfMovie = spark.read.format("csv").option("sep", ","). \
option("header",True). \
option("encoding","utf-8"). \
schema(schema=schemaMovie). \
load("../data/input/movies.csv")
# 文件内容样例
# movieId,title,genres
# 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
# 2,Jumanji (1995),Adventure|Children|Fantasy
# 3,Grumpier Old Men (1995),Comedy|Romance
5.用电影名称DataFrame,建表 movies表
dfMovie.createOrReplaceTempView("movies")
# 验证movies表内容
spark.sql("SELECT movieId,title,genres FROM movies").show()
# 查询结果样例
# +-------+--------------------+--------------------+
# |movieId| title| genres|
# +-------+--------------------+--------------------+
# | 1| Toy Story (1995)|Adventure|Animati...|
# | 2| Jumanji (1995)|Adventure|Childre...|
# | 3|Grumpier Old Men ...| Comedy|Romance|
6.查询 评价次数超100次的电影,平均分排名 Top10的DataFrame
# todo 3 查询 评价次数超100次的电影,平均分排名 Top10
print("查询 评价次数超100次的电影,平均分排名 Top10 ")
dfRank2 = dfRank.groupBy("movieId").agg(
F.count("movieId").alias("cnt"),
F.round(F.avg("rating"),2).alias("avgRank")
).where("cnt >100").\
orderBy("avgRank",ascending=False).\
limit(10)
dfRank2.show()
# 查询结果样例
# +-------+---+-------+
# |movieId|cnt|avgRank|
# +-------+---+-------+
# | 318|317| 4.43|
# | 858|192| 4.29|
# | 2959|218| 4.27|
# todo 用电影评价,建表 ranks
dfRank2.createOrReplaceTempView("ranks")
7.两个dataframe关联,取电影名称
# todo 4 两个dataframe关联,取电影名称
dfRank2.join(dfMovie, "movieId", "inner").createOrReplaceTempView("movieRankTable")
print("DataFrame风格 前10名电影名字 ")
# todo 字段内容全部显示 show(10,False)
spark.sql("SELECT movieId,title,avgRank,genres "
"FROM movieRankTable ").show(10,False)
# 查询结果样例
# +-------+--------------------------------+-------+---------------------------------------+
# |movieId|title |avgRank|genres |
# +-------+--------------------------------+-------+---------------------------------------+
# |50 |Usual Suspects, The (1995) |4.24 |Crime|Mystery|Thriller |
8.两张表关联,取电影名称
print("SQL风格 前10名电影名字 ")
spark.sql("SELECT r.movieId,m.title,r.avgRank,m.genres "
" FROM movies m,ranks r"
" WHERE m.movieId =r.movieId").show(10,False)
# 查询结果样例
# +-------+--------------------------------+-------+---------------------------------------+
# |movieId|title |avgRank|genres |
# +-------+--------------------------------+-------+---------------------------------------+
# |50 |Usual Suspects, The (1995) |4.24 |Crime|Mystery|Thriller |
# |318 |Shawshank Redemption, The (1994)|4.43 |Crime|Drama |
# |527 |Schindler's List (1993) |4.23 |Drama|War |
四、运行结果
五.遇到的问题 两个dataframe关联后, 出现重复列,字段相同时,抽出时报错
pyspark.sql.utils.AnalysisException: Reference 'movieId' is ambiguous, # could be: movieranktable.movieId, movieranktable.movieId.; line 1 pos 7
pyspark.sql.utils.AnalysisException: Reference 'movieId' is ambiguous,could be: movieranktable.movieId, movieranktable.movieId.; line 1 pos 7
对策 :join(dfMovie,"movieId","inner")的方式
# @错误的写法: dfRank2.join(dfMovie, dfRank2.movieId == dfMovie.movieId).createOrReplaceTempView("movieRankTable") # @正确的写法: dfRank2.join(dfMovie, "movieId", "inner").createOrReplaceTempView("movieRankTable")
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)