- 前提:
- 使用:
在spark集成数仓开发中,一般会使用spark-core和spark-sql一起协同开发,当有些场景为了处理方便,就会提前将DF转化为内存表,但是有时候可能并不需要转化,就直接使用了DF进行一些内置算子也可以实现SQL的功能,这就是spark中org.apache.spark.sql中所包含的功能。
使用:在使用两个DF关联的时候,可以指定多条件,如下:
⚠️⚠️⚠️注意:这种方式是错误的,”===” 方式连接也只适用于单条件的关联
// 01) 加载数据分别为两个DF var videoList01 = spark.sql(querySql01) var videoList02 = spark.sql(querySql02) // 02)关联DF获取相关信息 val videoIdfTrkWghtDF: Dataframe = videoList01.join(videoList02, videoList01.col("a_id") === videoList02.col("b_id") .and(videoList01.col("a_keys") === videoList02.col("b_keys")), "left" ) .select("a_id", "a_title", "a_fc", "a_keys", "a_weight", "b_weight") .na.fill(1, cols = Array("b_weight")) .withColumn("wght", col("a_weight") * col("b_weight")
如果要实现多条件连接,需要在使用and的时候,同时引入equalTo 来实现。
注意:下面是正确的实现:
// 01) 加载数据分别为两个DF var videoList01 = spark.sql(querySql01) var videoList02 = spark.sql(querySql02) // 02)关联DF获取相关信息 val videoIdfTrkWghtDF: Dataframe = videoList01.join(videoList02, (videoList01.col("a_id").equalTo(videoList02.col("b_id"))) .and(videoList01.col("a_keys").equalTo(videoList02.col("b_keys"))), "left" ) .select("a_id", "a_title", "a_fc", "a_keys", "a_weight", "b_weight") .na.fill(1, cols = Array("b_weight")) .withColumn("wght", col("a_weight") * col("b_weight")
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)