- SparkSession
#coding:utf8 from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext df = spark.read.csv('hdfs://101.133.232.96:8020/input/stu_score.txt',sep=',',header=False) df2 = df.toDF("id","name","score") df2.printSchema() df2.show()
df2.createTempView("score") spark.sql(""" select * from score where name = '语文' limit 5 """ ).show()
2. Dataframe代码的构建-基于RDD方式1
#coding:utf8 from pyspark.sql import SparkSession # /opt/module/spark/bin/spark-submit /opt/Code/SparkSQL.py if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext rdd = sc.textFile('hdfs://101.133.232.96:8020/input/people.txt'). map(lambda x:x.split(',')).map(lambda x:(x[0],int(x[1]))) # 构建Dataframe对象 # 参数1 被转换的RDD # 参数2 指令列名,通过list的形式指定 df = spark.createDataframe(rdd,schema = ['name','age']) df.printSchema() df.show()
基于RDD方式2
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext rdd = sc.textFile('hdfs://101.133.232.96:8020/input/people.txt'). map(lambda x:x.split(',')).map(lambda x:(x[0],int(x[1]))) schema = StructType().add('name',StringType(),nullable=True) .add('age',IntegerType(),nullable=False) df = spark.createDataframe(rdd,schema = schema) df.printSchema() df.show()
基于RDD方式3
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext rdd = sc.textFile('hdfs://101.133.232.96:8020/input/people.txt'). map(lambda x:x.split(',')).map(lambda x:(x[0],int(x[1]))) # toDF方式构建Dataframe df1 = rdd.toDF(["name","age"]) df1.printSchema() df1.show() # toDF 的方式2构建Dataframe scheme = StructType().add('name',StringType(),nullable=True) .add('age',IntegerType(),nullable=True) df2 = rdd.toDF(scheme) df2.printSchema() df2.show()
Dataframe的代码构建 - 基于Pandas的Dataframe
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext # 基于Pandas的Dataframe构建SparkSQL的dataframe pdf = pd.Dataframe({ "id":[1,2,3], 'name':['张大仙','王小小','吕不韦'], 'age':[11,21,32] }) df = spark.createDataframe(pdf) df.printSchema() df.show()
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext schema = StructType().add('data',StringType(),nullable=True) df = spark.read.format('text').schema(schema=schema). load('hdfs://101.133.232.96/input/people.txt') df.printSchema() df.show()
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext df = spark.read.format('json').load('hdfs://101.133.232.96/input/people.json') df.printSchema() df.show()
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext df = spark.read.format('csv') .option('sep',';') .option('header',True) .option('encoding',"utf-8") .schema('name STRING,age INT,job STRING') .load('hdfs://101.133.232.96/input/people.csv') df.printSchema() df.show()
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext df = spark.read.format('parquet') .load('hdfs://101.133.232.96/input/users.parquet') df.printSchema() df.show()
DSL -select,filter和where,group by
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext df = spark.read.format('csv') .schema('id INT,subject STRING,score INT') .load('hdfs://101.133.232.96/input/stu_score.txt') # DLS 风格演示 df.select(["id","subject"]).show() df.select("id","subject").show() # Column对象的获取 id_column = df['id'] subject_column = df['subject'] df.select(id_column,subject_column).show() #filter df.filter("score < 99").show() df.filter(df['score']<99).show() #group by # 调用聚合方法后,返回值依旧是dataframe df.groupBy("subject").count().show() df.groupBy(df['subject']).count().show()
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext df = spark.read.format('csv') .schema('id INT,subject STRING,score INT') .load('hdfs://101.133.232.96/input/stu_score.txt') # 注册成临时表 df.createTempView("score") df.createOrReplaceTempView("score_2") df.createGlobalTempView("score_3") # 可以通过SparkSession对象的sql api spark.sql(""" select subject,count(*) as cnt from score group by subject """).show() spark.sql(""" select subject,count(*) as cnt from score_2 group by subject """).show() spark.sql(""" select subject,count(*) as cnt from global_tmp.score_3 group by subject """).show()
词频统计案例练习
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() sc = spark.sparkContext rdd = sc.textFile('file:///opt/Data/sql/words.txt') .flatMap(lambda x:x.split(" ")).map(lambda x:[x]) df = rdd.toDF(["word"]) df.createTempView("words") spark.sql("select word,count(*) as cnt from words group by word order by cnt desc").show()
spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() df = spark.read.format('text').load('file:///opt/Data/sql/words.txt').show()
df.select(F.split(df['value']," ")).show()
df.select(F.explode(F.split(df['value']," "))).show()
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() df = spark.read.format('text').load('file:///opt/Data/sql/words.txt') # withColumn 如果列名一样就替换,不一样就拓展一个列 df2 = df.withColumn('value',F.explode(F.split(df['value']," "))) # 返回dataframe 列 df2.groupBy('value').count() .withColumnRenamed("count","cnt") .orderBy('cnt',ascending=False) .show()
1. 用户平均分
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() schema = StructType().add('user_id',StringType(),nullable=True). add('movie_id',IntegerType(),nullable=True). add('rank',IntegerType(),nullable=True). add('ts',StringType(),nullable=True) df = spark.read.format('csv'). option('sep','t'). option('header',False). option('encoding','utf-8'). schema(schema=schema). load('file:///opt/Data/ml-100k/u.data') # 用户平均分 df.groupBy('user_id').avg('rank').withColumnRenamed('avg(rank)','avg_rank').withColumn('avg_rank',F.round('avg_rank',2)). orderBy('avg_rank',ascending=False).show()
2. 电影平均分
df.createTempView('movie') # 电影平均分 spark.sql(""" select movie_id,round(avg(rank),2) as avg_rank from movie group by movie_id order by avg_rank desc """).show()
- 大于电影平均分的电影的数量
print('大于平均分电影的数量') print(df.select(F.avg(df['rank'])).first()['avg(rank)']) print(df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())
4. 查询高分电影中>3打分次数最多的用户,并求出此人打的平均分
# 查询高分电影中>3打分次数最多的用户,并求出此人打的平均分 user_id = df.where(df['rank'] > 3).groupBy('user_id').count().withColumnRenamed("count","cnt").orderBy("cnt",ascending=False). first()['user_id'] df.filter(df['user_id'] == user_id).select(F.round(F.avg('rank'),2)).show()
5. 查询每个用户的平均打分,最低打分,最高打分
# 查询每个用户的平均打分,最低打分,最高打分 spark.sql(""" select user_id,min(rank) as min_rank,max(rank) as max_rank,avg(rank) as avg_rank from movie group by user_id """).show() df.groupby('user_id').agg( F.round(F.avg('rank'),2).alias('avg_rank'), F.round(F.min('rank'),2).alias('min_rank'), F.round(F.max('rank'),2).alias('max_rank') ).show()
- 查询被评分超过100次的电影的平均分排名TOP10
spark.sql(""" select movie_id,count(movie_id) as cnt,avg(rank) as avg_rank from movie group by movie_id having cnt > 100 order by avg_rank desc limit 10 """).show() df.groupby('movie_id').agg( F.count('movie_id').alias('cnt'), F.round(F.avg('rank'),2).alias('avg_rank'), ).where('cnt>100').orderBy('avg_rank',ascending=False).limit(10).show()
电影评分数据分析总体代码
#coding:utf8 from audioop import add from cmd import IDENTCHARS from pyspark.sql import SparkSession from pyspark.sql.types import StructType,StringType,IntegerType import pandas as pd from pyspark.sql import functions as F # /opt/module/spark/bin/spark-submit /opt/Code/SparkSQL.py if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]').getOrCreate() schema = StructType().add('user_id',StringType(),nullable=True). add('movie_id',IntegerType(),nullable=True). add('rank',IntegerType(),nullable=True). add('ts',StringType(),nullable=True) df = spark.read.format('csv'). option('sep','t'). option('header',False). option('encoding','utf-8'). schema(schema=schema). load('file:///opt/Data/ml-100k/u.data') # # 用户平均分 # df.groupBy('user_id').avg('rank').withColumnRenamed('avg(rank)','avg_rank').withColumn('avg_rank',F.round('avg_rank',2)). # orderBy('avg_rank',ascending=False).show() df.createTempView('movie') # # 电影平均分 # spark.sql(""" # select movie_id,round(avg(rank),2) as avg_rank # from movie # group by movie_id # order by avg_rank desc # """).show() # # 大于平均分的电影数量 # # F.avg(df['rank'])) ==》 dataframe对象 # # F.avg(df['rank'])).first() ==》 row对象 # # F.avg(df['rank'])).first()['avg(rank)'] ==》数值 # print('大于平均分电影的数量') # print(df.select(F.avg(df['rank'])).first()['avg(rank)']) # print(df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count()) # # 查询高分电影中>3打分次数最多的用户,并求出此人打的平均分 # user_id = df.where(df['rank'] > 3).groupBy('user_id').count().withColumnRenamed("count","cnt").orderBy("cnt",ascending=False). # first()['user_id'] # df.filter(df['user_id'] == user_id).select(F.round(F.avg('rank'),2)).show() # 查询每个用户的平均打分,最低打分,最高打分 # spark.sql(""" # select user_id,min(rank) as min_rank,max(rank) as max_rank,avg(rank) as avg_rank # from movie group by user_id # """).show() # df.groupby('user_id').agg( # F.round(F.avg('rank'),2).alias('avg_rank'), # F.round(F.min('rank'),2).alias('min_rank'), # F.round(F.max('rank'),2).alias('max_rank') # ).show() spark.sql(""" select movie_id,count(movie_id) as cnt,avg(rank) as avg_rank from movie group by movie_id having cnt > 100 order by avg_rank desc limit 10 """).show() df.groupby('movie_id').agg( F.count('movie_id').alias('cnt'), F.round(F.avg('rank'),2).alias('avg_rank'), ).where('cnt>100').orderBy('avg_rank',ascending=False).limit(10).show()
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]'). config('spark.sql.shuffle.partition','2'). getOrCreate() df = spark.read.format('csv').option('sep',';').option('header',True). load('file:///opt/Data/sql/people.csv') df.dropDuplicates().show() df.dropDuplicates(['age','job']).show()
df.fillna('loss').show() df.fillna('loss',subset = ['job']).show() df.fillna({"name":"未知姓名","age":1,"job":"worker"}).show()
if __name__ == '__main__': spark = SparkSession.builder.appName('create df').master('local[*]'). config('spark.sql.shuffle.partition','2'). getOrCreate() df = spark.read.format('csv').option('sep',';').option('header',True). load('file:///opt/Data/sql/people.csv') df.select(F.concat_ws('_','name','age','job')). write.mode('overwrite').format('text').save('file:///opt/Data/sql/output/text') df.write.mode('overwrite').format('csv').option('sep',";").option("header",True). save('file:///opt/Data/sql/output/csv') df.write.mode('overwrite').format('parquet').save('file:///opt/Data/sql/output/parquet')
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)