Spark - RDDROWsql.DataFrame 互转

Spark - RDDROWsql.DataFrame 互转,第1张

Spark - RDD / ROW / sql.DataFrame 互转 一.引言

SparkSql 相比较 HiveSql 具有更快的运行速度和更高的灵活性,平常使用中经常需要进行数据转换,常见的有 RDD[T] -> Dataframe,Dataframe -> RDD[T] 还有 RDD[row] -> sql.dataframe,下面简单介绍下常用用法。

初始化 SparkSession :

    // 1.配置Spark
    val conf = {
      if (local)
        new SparkConf().setAppName("spark_sql").setMaster("local[*]")
      else
        new SparkConf().setAppName("spark_sql")
    }
    val spark = SparkSession
      .builder
      .config(conf)
      .getOrCreate()

    val sc = spark.sparkContext
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

二.RDD[T] 转 DataFrmae

这里 parallelize 模拟一个用户 RDD 包含 name 和 age 属性,通常情况下 map 返回 pairRdd,这里通过隐式转换 sqlContext.implicits._,使得 RDD 转换为 Dataframe,注意这里列名 cols 需要与 pariRdd 元祖的大小匹配。后续通过 createOrReplcaeTempView 可以构建临时表,随后通过 .sql 方法按照 hiveSql 语句进行任务执行命令。

    // 2.RDD[T] 转 Dataframe
    val userInfo = sc.parallelize(Array[(String, String)](
    ("A", "90s"), 
    ("B", "00s"),
    ("C", "10s"), 
    ("D", "20s")))
    val userDf = userInfo.map(info => {
      val name = info._1
      val age = info._2
      (name, age)
    }).toDF("name", "age")
    userDf.createOrReplaceTempView("tmp_user_info")
    sqlContext.sql("select * from tmp_user_info where name != 'A'").show(10)
+----+---+
|name|age|
+----+---+
|   B|00s|
|   C|10s|
|   D|20s|
+----+---+

三.Dataframe 转 RDD[T]

spark 提供 read csv 方法可以将 Text 文本文件 or CSV 文件直接读取为 Dataframe,dataframe 每一行数据为 row,有些同学需要转换为 Array 或者其他类执行后续代码,可以通过下述方法实现:

    // 3.Dataframe 转 RDD[T]
    val userRdd = spark.read.option("header", "false")
      .option("delimiter", "t")
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .csv(".user.gz").rdd.map(row => {
      val info = row.toSeq.toArray.map(f => if (f == null) "NULL" else f.toString)
      info
    })
    userRdd.take(5).foreach(arr => println(arr.mkString(",")))

这里 header 代表是否保留列名,适用于 csv 文件,delimiter 代表文件的分隔符,这里需要注意 timestampFormat 参数,如果这个参数不设置,会报如下错误:

Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX

四.RDD[row] 转 Dataframe

除了上述场景,还有对 dataframe 过滤的场景,一方面可以采用 sqlContext 作为临时表进行过滤,也可以采取转 RDD 解析字符串进行解析,这时 Dataframe 就会转变为 RDD[row],如果需要转回 Dataframe,可以执行以下 *** 作:

   // 4.RDD[row] 转 sql.dataframe
    val nullable: Boolean = true
    val schema = StructType(
      List(
        StructField("name", StringType, nullable),
        StructField("age", StringType, nullable)
      )
    )
    val filterUserRdd = userDf.rdd.filter(line => {
      val info = line.toSeq.map(_.toString)
      val age = info(1)
      age.equals("90s")
    })
    val userSqlDf = sqlContext.createDataframe(filterUserRdd, schema)
    userSqlDf.createOrReplaceTempView("new_df")
    sqlContext.sql("select * from new_df").show(5)

StructField 标识字段类型,除了 StringType 外,还有其他常用的类型比如 DoubleType,BooleanType 等等,有需要进 rg.apache.spark.sql.types 类寻找即可,这里通过定义列名与列属性结合原始 DF,可以实现 row -> dataframe 的转变,后续可以继续 HiveSql 相关 *** 作。

+----+---+
|name|age|
+----+---+
|   A|90s|
+----+---+

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5664808.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存