在spark程序实际开发过程中遇到需要对文件内容做join *** 作,使用createOrReplaceTempView 方式将读取的文件创建临时表,然后通过 spark.sql()
方式利用sql语句做join *** 作,但是数据量稍微大点时候,就会导致join效率很慢。查询资料得知,这里有优化的空间,利用 cache() 或者 persist() 方法。
createOrReplaceTempView是 transformation 算子,而transformation是lazy模式的,也就是spark不会立即计算结果,而只是简单地记住所有对数据集的转换 *** 作逻辑,需要有action算子来触发spark应用程序,最简单的action算子:show()。例如:spark.sql("select *** from XXX ").createOrReplaceTempView(“tmp_test_table”).show()。
这样每次执行.show()算子时候,都需要先执行createOrReplaceTempView *** 作,导致效率很慢。肯有直接将读取出来的数据缓存起来,或者将createOrReplaceTempView之后的数据缓存到内存中。
缓存的方式有两种,具体使用不在此做赘述
1) cache()方法表示:使用非序列化的方式将RDD的数据全部尝试持久化到内存中,cache()只是一个transformtion,是lazy的,必须通过一个action触发,才能真正的将该RDD cache到内存中。
2)persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
数据量:math1.txt 5000行;sports1.txt 270行
机器:windows 16G内存
读取文件代码,参考:spark读写hdfs文件
不加缓存:耗时约8秒
//新建两个视图 println("创建视图 math_df_table") math_df.createOrReplaceTempView("math_df_table") println("创建视图 sports_df_table") sports_df.createOrReplaceTempView("sports_df_table") //作join *** 作 println("join结果如下: ") val startStamp = new Date().getTime val start = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(startStamp) sparkSession.sql("select a.name,a.age,a.skills,b.role,b.team from sports_df_table a left join math_df_table b on a.name=b.name").show(10) sparkSession.sql("select count(*) from sports_df_table a left join math_df_table b " + "on a.name=b.name").show() val endStamp = new Date().getTime val end = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(endStamp) println("开始时间:" + start + " ;结束时间:"+ end + " ;耗时约:" + (endStamp-startStamp)/1000 + "秒")
结果如下
+------+---+----------+----+------+ | name|age| skills|role| team| +------+---+----------+----+------+ | 科比| 40| FADEWAY| SG|LAKERS| | 韦德| 33|LIGHTENING|null| null| | 麦迪| 41| null|null| null| | 哈登| 29| | SG| NETS| | 罗斯| 30| SPEED| PG|KNICKS| | 威少| 30| DUNK| PG|LAKERS| |艾弗森| 41| CROSSOVER|null| null| |杜兰特| 30| SCORE|null| null| | | 30| |null| null| | 老詹| 33| KING| SF|LAKERS| +------+---+----------+----+------+ +--------+ |count(1)| +--------+ | 920| +--------+ 开始时间:2022-01-17 10:32:51 ;结束时间:2022-01-17 10:32:59 ;耗时约:8秒
加缓存:耗时约3秒
//新建两个视图 //方式一:建视图之前缓存。如果只是需要对视图做 *** 作的话,缓存dataframe效果不如直接缓存视图明显 println("创建临时表 math_df_table") math_df.cache() math_df.createOrReplaceTempView("math_df_table") ss.table("math_df_table").cache()//这里最好也缓存下 //方式二:建视图之后缓存 println("创建临时表 sports_df_table") sports_df.createOrReplaceTempView("sports_df_table") ss.table("sports_df_table").cache() //作join *** 作 println("join结果如下: ") val startStamp = new Date().getTime val start = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(startStamp) sparkSession.sql("select a.name,a.age,a.skills,b.role,b.team from sports_df_table a left join math_df_table b on a.name=b.name").show(10) sparkSession.sql("select count(*) from sports_df_table a left join math_df_table b " + "on a.name=b.name").show() val endStamp = new Date().getTime val end = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(endStamp) println("开始时间:" + start + " ;结束时间:"+ end + " ;耗时约:" + (endStamp-startStamp)/1000 + "秒")
+------+---+----------+----+------+ | name|age| skills|role| team| +------+---+----------+----+------+ | 科比| 40| FADEWAY| SG|LAKERS| | 韦德| 33|LIGHTENING|null| null| | 麦迪| 41| null|null| null| | 哈登| 29| | SG| NETS| | 罗斯| 30| SPEED| PG|KNICKS| | 威少| 30| DUNK| PG|LAKERS| |艾弗森| 41| CROSSOVER|null| null| |杜兰特| 30| SCORE|null| null| | | 30| |null| null| | 老詹| 33| KING| SF|LAKERS| +------+---+----------+----+------+ +--------+ |count(1)| +--------+ | 920| +--------+ 开始时间:2022-01-17 11:33:39 ; 结束时间:2022-01-17 11:33:43 ; 耗时约:3秒
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)