注意:
- 可以两种方式读取 json 格式的文件。
- df.show()默认显示前 20 行数据。
- Dataframe 原生 API 可以 *** 作 Dataframe。
- 注册成临时表时,表中的列默认按 ascii 顺序显示列。
df.createTempView(“mytable”)
df.createOrReplaceTempView(“mytable”)
df.createGlobalTempView(“mytable”)
df.createOrReplaceGlobalTempView(“mytable”)
Session.sql(“select * from global_temp.mytable ”).show() - Dataframe 是一个一个 Row 类型的 RDD,
json数据源
{"name":"科比","age":24} {"name":"詹姆斯","age":23} {"name":"杜兰特","age":25} {"name":"保罗","age":26} {"name":"库里","age":27} {"name":"加索尔","age":28} {"name":"朗多","age":29} {"name":"皮尔斯"} {"name":"雷阿伦"} {"name":"奥多姆"} {"name":"拜纳姆","age":24} {"name":"科比","age":24} {"name":"詹姆斯","age":23} {"name":"杜兰特","age":25} {"name":"保罗","age":26} {"name":"库里","age":27} {"name":"加索尔","age":28} {"name":"朗多","age":29} {"name":"皮尔斯"} {"name":"雷阿伦"} {"name":"奥多姆"} {"name":"拜纳姆","age":24}
package sparkSql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} object ReadJsonDataToDF2{ def main(args: Array[String]): Unit = { val session = SparkSession.builder().master("local").appName("name").getOrCreate() session.sparkContext.setLogLevel("Error") // val frame = session.read.json("data/jsondata") val frame = session.read.format("json").load("data/jsondata") frame.createTempView("t") val rdd: RDD[Row] = session.sql("select name,age from t where age is not null").rdd rdd.foreach(row=>{ val name = row.getAs[String]("name") val age = row.getAs[Long]("age") println(s"name:$name,age$age") }) // rdd.foreach(println) // frame.show() } }2.通过 json 格式的 RDD 创建 Dataframe
package sparkSql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataframe, Dataset, SparkSession} object ReadJsonRDDToDF { def main(args: Array[String]): Unit = { val session = SparkSession.builder().master("local").appName("name").getOrCreate() session.sparkContext.setLogLevel("Error") val jsonArr = Array[String]( "{"name":"科比","age":24}", "{"name":"詹姆斯","age":23}", "{"name":"杜兰特","age":35}", "{"name":"保罗","age":3}" ) import session.implicits._ val jsonDataSet: Dataset[String] = jsonArr.toList.toDS() val frame = session.read.json(jsonDataSet) frame.createTempView("t") session.sql("select name,age from t").show() // val context = session.sparkContext // val jsonRDD: RDD[String] = context.parallelize(jsonArr) // val frame: Dataframe = session.read.json(jsonRDD) // frame.show() } }3.非 json 格式的 RDD 创建 Dataframe
- 通过反射的方式将非 json 格式的 RDD 转换成 Dataframe(不建议使 用)
数据源:
1,科比,24,99 2,詹姆斯,6,100 3,杜兰特,35,100 4,哈登,13,80 5,乔丹,23,90
package sparkSql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataframe, SparkSession} case class PersonInfo(id:Int,name:String,num:Int,score:Int) object ReadRDDToDF { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder().appName("test").master("local").getOrCreate() val context: SparkContext = session.sparkContext context.setLogLevel("Error") val person: RDD[String] = context.textFile("data/person") val personRDD: RDD[PersonInfo] = person.map(line => { val arr: Array[String] = line.split(",") val id: Int = arr(0).toInt val name: String = arr(1) val num: Int = arr(2).toInt val score: Int = arr(3).toInt PersonInfo(id, name, num, score) }) import session.implicits._ val frame: Dataframe = personRDD.toDF() frame.createTempView("t") val frame1: Dataframe = session.sql("select id,name,num,score from t") frame1.show() frame1.printSchema() } }
2. 动态创建 Schema 将非 json 格式的 RDD 转换成 Dataframe
package sparkSql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Dataframe, Row, SparkSession} object ReadRDDtoDF2 { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder().master("local").appName("test1").getOrCreate() val context: SparkContext = session.sparkContext context.setLogLevel("Error") val personRDD: RDD[String] = context.textFile("data/person") val rowRDD: RDD[Row] = personRDD.map(line => { val arr: Array[String] = line.split(",") val id: Int = arr(0).toInt val name: String = arr(1) val num: Int = arr(2).toInt val score: Double = arr(3).toDouble Row(id, name, num, score) }) val struct = StructType(List[StructField]( StructField("id",DataTypes.IntegerType,true), StructField("name",DataTypes.StringType,true), StructField("num",DataTypes.IntegerType,true), StructField("score",DataTypes.DoubleType,true) )) val frame: Dataframe = session.createDataframe(rowRDD, struct) frame.show() frame.printSchema() frame.createTempView("t") val frame1: Dataframe = session.sql( """ |select id,name,num,score from t |""".stripMargin) frame1.show() } }4.读取 parquet 文件创建 Dataframe
package sparkSql import org.apache.spark.SparkContext import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession} object ReadParquetFileToDF { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder().appName("ttt").master("local").getOrCreate() val context: SparkContext = session.sparkContext context.setLogLevel("Error") val frame: Dataframe = session.read.json("data/jsondata") frame.write.mode(SaveMode.Overwrite).parquet("data/parquet") val frame1: Dataframe = session.read.parquet("data/parquet") frame1.show(22) println(frame1.count()) // frame1.write.json("data/json") } }4.读取 cvs 格式的数据加载 Dataframe
csv数据源
id,name,age,score 1,科比,40,100 2,詹姆斯,37,100 3,乔丹,55,100 4,杜兰特,33,99 5,库里,34,99
package sparkSql import org.apache.spark.sql.{Dataframe, SparkSession} object ReadCSVDataToDF { def main(args: Array[String]): Unit = { val session: SparkSession = SparkSession.builder().master("local").appName("eee").getOrCreate() session.sparkContext.setLogLevel("Error") val frame: Dataframe = session.read.option("header",true).csv("data/data.csv") frame.show() } }5.读取Tuple类型的Dataset加载Dataframe
数据源比较大,不粘贴
package sparkSql import org.apache.spark.sql.{Dataframe, Dataset, SparkSession} object ReadTupleDatasetToDF { def main(args: Array[String]): Unit = { val session = SparkSession.builder().appName("rrr").master("local").getOrCreate() session.sparkContext.setLogLevel("Error") val dt: Dataset[String] = session.read.textFile("data/pvuvdata") import session.implicits._ val value: Dataset[(String, String, String, String, String, String, String)] = dt.map(line => { val arr: Array[String] = line.split("t") val ip = arr(0) val local = arr(1) val date = arr(2) val ts = arr(3) val uid = arr(4) val site = arr(5) val operator = arr(6) (ip, local, date, ts, uid, site, operator) }) val frame: Dataframe = value.toDF("ip","local","date","ts","uid","site","operator") frame.createTempView("t") session.sql( """ |select site,count(*) as site_count from t group by site order by site_count |""".stripMargin).show() session.sql( """ |select site,count(*) uv from (select distinct ip,site from t) t1 group by site order by uv |""".stripMargin).show() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)