parquet文件链接:https://pan.baidu.com/s/1dmugj-ty47Hgi6WLAPaiGQ?pwd=yyds
提取码:yyds
--来自百度网盘超级会员V2的分享
======>转换后
package com.sz.table_ddl.test import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.functions.{struct, to_json} object col_json { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL") //1.todo 建立和spark框架的链接 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() //禁用广播 spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1") //2. todo 读取文件 val string_quet=spark.read.format("parquet").load("D:\a\users.parquet") //创建虚拟表 string_quet.createOrReplaceTempView("users") //显示parquet文件的数据 string_quet.show(5) //3. todo 计算指标 val rs=spark.sql( """ |select |s.name as name, |s.favorite_color as color, |s.favorite_numbers as numbers |from users s |""".stripMargin) rs.show() //导包--> import org.apache.spark.sql.functions.{struct, to_json} val finalDF = rs.withColumn("Newcol", to_json(struct("color","numbers"))) //一定要用sql得到列的别名 //selectExpr查询指定列 finalDF.selectExpr("Newcol","numbers").show(false) //当然得到的结果也可以导入到mysql //1. todo mysql数据库建表 // 建表中`numbers` enum,没有测过,思路是这样 //2. todo 导入数据到mysql finalDF.selectExpr("Newcol","numbers").write.format("jdbc") // todo option:jdbc里面的四大金刚 url,table,user,password .option("url", "jdbc:mysql://localhost:3306/names?&useUnicode=true&characterEncoding=utf8") .option("dbtable", "tab_json02") .option("user", "root") .option("password", "root") // todo 更新 // .mode(SaveMode.Overwrite) // todo 追加 .mode(SaveMode.Append) .save() // todo 释放资源 spark.stop() } }3.代码结果
参考:
Spark SQL指定特定的列转换为Json_呼呼的小窝-CSDN博客_spark sql 结果转为json
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)