Spark高级 *** 作之JSON文件实 *** 练习

Spark高级 *** 作之JSON文件实 *** 练习,第1张

Spark高级 *** 作之JSON文件实 *** 练习

演示的日志文件为 op.log,内容

 按照日志格式进行切割

 val conf = new SparkConf().set("spark.testing.memory","2147480000").setMaster("local[*]").setAppName("sparkDemo1")

    val sc = SparkContext.getOrCreate(conf)

    val spark = SparkSession.builder().appName("SparkJson").master("local[*]").getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._
    val rdd: RDD[String] = sc.textFile("in/op.log")
    val df: Dataframe = rdd.map(x=>x.split('|')).map(x=>(x(0),x(1))).toDF()
//    df.printSchema()
//    df.show()
    //修改字段名
    val df2: Dataframe = df.withColumnRenamed("_1","id").withColumnRenamed("_2","value")
//    df2.printSchema()
//    df2.show()

 

接下来查看每个字段里面的每一个字段代表的内容

这是json解析后的内容,先查询value下面的ap,cm,et

 

 //查询value下第一层字段内容,
    val df3: Dataframe = df2.select($"id", get_json_object($"value", "$.ap").as("ap"),
      get_json_object($"value", "$.cm").as("cm"),
      get_json_object($"value", "$.et").as("et"))
    df3.printSchema()
    df3.show(2,false)

接着查询cm下面的所有内容,因为ap里面就一个,就无需查询

 

    //查询df3下,cm字段里面的所有内容
    val df4: Dataframe = df3.select($"id", $"ap", get_json_object($"cm", "$.ln").as("ln"), get_json_object($"cm", "$.sv").as("sv"),
      get_json_object($"cm", "$.os").as("os"), get_json_object($"cm", "$.g").as("g"), get_json_object($"cm", "$.mid").as("mid"),
      get_json_object($"cm", "$.nw").as("nw"), get_json_object($"cm", "$.l").as("l"), get_json_object($"cm", "$.vc").as("vc"),
      get_json_object($"cm", "$.hw").as("hw"), get_json_object($"cm", "$.ar").as("ar"), get_json_object($"cm", "$.uid").as("uid"),
      get_json_object($"cm", "$.t").as("t"), get_json_object($"cm", "$.la").as("la"), get_json_object($"cm", "$.md").as("md"),
      get_json_object($"cm", "$.vn").as("vn"), get_json_object($"cm", "$.ba").as("ba"), get_json_object($"cm", "$.sr").as("sr"), $"et")
    df4.printSchema()
    df4.show()

接下来处理et,et是个数组,最好进行单独处理

 

    //因为et类型比较复杂,这里可以用复杂类型进行schema应用,这里用from_json方法取字段
    val schema: ArrayType = ArrayType(StructType(StructField("ett",StringType)::StructField("en",StringType)::StructField("kv",StringType)::Nil))

    val df6: Dataset[Row] = df5.select(from_json($"et",schema).as("et"))

    df6.printSchema()
    df6.show(2,false)
    //此时et输出来是一个二维数组,下面进行降维
    val df7: Dataframe = df6.withColumn("et",explode(col("et")))

    df7.printSchema()
    df7.show(2,false)
    df7.select($"et.ett",$"et.en",$"et.kv").show()

最后综合处理

    //综合处理

    val df8: Dataframe = df4.select($"id", $"ap", $"ln", $"sv", $"os",
      $"g", $"mid", $"nw", $"l",
      $"vc", $"hw", $"ar", $"uid",
      $"t", $"la", $"md", $"vn",
      $"ba", $"sr", from_json($"et", schema).as("et"))
      .withColumn("et", explode(col("et")))
      .select($"id", $"ap", $"ln", $"sv", $"os",
        $"g", $"mid", $"nw", $"l",
        $"vc", $"hw", $"ar", $"uid",
        $"t", $"la", $"md", $"vn",
        $"ba", $"sr", $"et.ett", $"et.en", $"et.kv")
    df8.printSchema()
    df8.show(2)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5665104.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存