package com.aisainfo import org.apache.spark.sql.{Column, Dataframe, SparkSession} import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ object Test { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("test").master("local[2]").getOrCreate() val linesDF: Dataframe = spark.read.json("E:\work_space\work_space\work_asiainfo\java\scala\kafka_spark_hive_json\src\main\scala\com\aisainfo\json") val columns1: List[Column] = getAllColumns(linesDF).toList linesDF.createOrReplaceTempView("test") spark.sql(s"select ${columns1.mkString(", ")} from test").show() println(columns1.mkString(", ")) println(linesDF.schema.fields.mkString(", ")) //{"asd":{"device_id": 0, "device_type": "sensor-ipad", "ip": {"fgh":123, "456":456}},"qwe":"zxc"} //linesDF.select("asd.ip.fgh").show() } def getAllColumns(df: Dataframe) = { df.schema.fields.flatMap { data: StructField => recursiveSolution(data) } } private def recursiveSolution(data: StructField, prefix: String = ""): List[Column] = { data match { case column_info if column_info.dataType.isInstanceOf[StructType] => { column_info.dataType.asInstanceOf[StructType].fields.flatMap { field => if (prefix != "") recursiveSolution(field, s"$prefix.${column_info.name}") else recursiveSolution(field, s"${column_info.name}") }.toList } case column_info => { //json字段名称 if (prefix != "") List(col(s"$prefix.${column_info.name}")) //json路径拼接字段名称 //if (prefix != "") List(col(s"$prefix.${column_info.name} as ${prefix.replace(".", "_")}_${column_info.name}")) else List(col(s"${column_info.name}")) } } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)