演示的日志文件为 op.log,内容为
按照日志格式进行切割
val conf = new SparkConf().set("spark.testing.memory","2147480000").setMaster("local[*]").setAppName("sparkDemo1") val sc = SparkContext.getOrCreate(conf) val spark = SparkSession.builder().appName("SparkJson").master("local[*]").getOrCreate() import spark.implicits._ import org.apache.spark.sql.functions._ val rdd: RDD[String] = sc.textFile("in/op.log") val df: Dataframe = rdd.map(x=>x.split('|')).map(x=>(x(0),x(1))).toDF() // df.printSchema() // df.show() //修改字段名 val df2: Dataframe = df.withColumnRenamed("_1","id").withColumnRenamed("_2","value") // df2.printSchema() // df2.show()
接下来查看每个字段里面的每一个字段代表的内容
这是json解析后的内容,先查询value下面的ap,cm,et
//查询value下第一层字段内容, val df3: Dataframe = df2.select($"id", get_json_object($"value", "$.ap").as("ap"), get_json_object($"value", "$.cm").as("cm"), get_json_object($"value", "$.et").as("et")) df3.printSchema() df3.show(2,false)
接着查询cm下面的所有内容,因为ap里面就一个,就无需查询
//查询df3下,cm字段里面的所有内容 val df4: Dataframe = df3.select($"id", $"ap", get_json_object($"cm", "$.ln").as("ln"), get_json_object($"cm", "$.sv").as("sv"), get_json_object($"cm", "$.os").as("os"), get_json_object($"cm", "$.g").as("g"), get_json_object($"cm", "$.mid").as("mid"), get_json_object($"cm", "$.nw").as("nw"), get_json_object($"cm", "$.l").as("l"), get_json_object($"cm", "$.vc").as("vc"), get_json_object($"cm", "$.hw").as("hw"), get_json_object($"cm", "$.ar").as("ar"), get_json_object($"cm", "$.uid").as("uid"), get_json_object($"cm", "$.t").as("t"), get_json_object($"cm", "$.la").as("la"), get_json_object($"cm", "$.md").as("md"), get_json_object($"cm", "$.vn").as("vn"), get_json_object($"cm", "$.ba").as("ba"), get_json_object($"cm", "$.sr").as("sr"), $"et") df4.printSchema() df4.show()
接下来处理et,et是个数组,最好进行单独处理
//因为et类型比较复杂,这里可以用复杂类型进行schema应用,这里用from_json方法取字段 val schema: ArrayType = ArrayType(StructType(StructField("ett",StringType)::StructField("en",StringType)::StructField("kv",StringType)::Nil)) val df6: Dataset[Row] = df5.select(from_json($"et",schema).as("et")) df6.printSchema() df6.show(2,false) //此时et输出来是一个二维数组,下面进行降维 val df7: Dataframe = df6.withColumn("et",explode(col("et"))) df7.printSchema() df7.show(2,false) df7.select($"et.ett",$"et.en",$"et.kv").show()
最后综合处理
//综合处理 val df8: Dataframe = df4.select($"id", $"ap", $"ln", $"sv", $"os", $"g", $"mid", $"nw", $"l", $"vc", $"hw", $"ar", $"uid", $"t", $"la", $"md", $"vn", $"ba", $"sr", from_json($"et", schema).as("et")) .withColumn("et", explode(col("et"))) .select($"id", $"ap", $"ln", $"sv", $"os", $"g", $"mid", $"nw", $"l", $"vc", $"hw", $"ar", $"uid", $"t", $"la", $"md", $"vn", $"ba", $"sr", $"et.ett", $"et.en", $"et.kv") df8.printSchema() df8.show(2)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)