原表中的数据分为7个类别,类别字段名称为:vehicle_type;
每个类别下有若干的元素,每个类别下元素个数相同,这些元素存于一列中,列名为name_en,列内的值为元素的名称;
每个元素有相同的名称的属性若干,每个属性名称作为一列;
现在想要将一个类别下的元素、属性收集到一个JSON中
结构如下:
{ "元素名称1":{ "属性名称":"属性值" }, "元素名称1":{ "属性名称":"属性值" }, "元素名称1":{ "属性名称":"属性值" }, .... }
val df = spark.read.format("jdbc").options(Map("url" -> MYSQL_URL, "dbtable" -> "factor_algo_config")).load() df.show() df.cache() df.checkpoint() implicit val formats = Serialization.formats(NoTypeHints) val vehicle_type = df.select("vehicle_type").distinct().collect().map(_.get(0)) vehicle_type.foreach(x => { val vehicle_df = df.select(df.schema.names.filter(!Array("id", "vehicle_type").contains(_)).map(col(_).cast("string")): _*) .where(df("vehicle_type") === s"$x") .na.fill("") val vehivle_col_names = vehicle_df.schema.names val vehicle_array_json = vehicle_df.collect().map(x => { val dic = mutable.Map[String, String]() (1 until vehivle_col_names.length).map(i => { dic += (vehivle_col_names.apply(i) -> x.get(i).toString) }) val res = Map(x.get(0).toString -> dic) Json(DefaultFormats).write(res) }) val json_string=vehicle_array_json.mkString(",").replace("},{",",") val ds = spark.createDataset(Seq(json_string)) val json_df = spark.read.json(ds) json_df.show() })
在将Map转为JSON时发现,用scala自带的JSONObject.toString方法时,Map中嵌套的Map仍被打印成"{“a”:Map(“b”:“c”,“d”:“e”)}"
于是使用
import org.json4s._
import org.json4s.jackson.{Json, Serialization}
Json(DefaultFormats).write(res)
顺利的将JSON转为了JSON字符串,
后面将所有的JSON字符串收集到数组中后使用mkString对其进行合并
将合并后的JSON字符串中的},{替换为,达到将多条JSON合并为一条的效果
后面用createDataset解析JSON字符串,将其转为DataSet后再将其转为Dataframe
参考资料Scala-scalaMap转JSON字符串和javaMap
https://blog.csdn.net/zsyoung/article/details/88844702
Spark中json字符串和Dataframe相互转换
https://blog.csdn.net/xuejianbest/article/details/80694073
【scala】Json与Scala类型的相互转换处理
https://blog.csdn.net/lbf_ml/article/details/100534942
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)