SparkSQL 创建 DataFrame 的方式

SparkSQL 创建 DataFrame 的方式,第1张

SparkSQL 创建 DataFrame 的方式 1.读取 json 格式的文件创建 Dataframe

注意:

  • 可以两种方式读取 json 格式的文件。
  • df.show()默认显示前 20 行数据。
  • Dataframe 原生 API 可以 *** 作 Dataframe。
  • 注册成临时表时,表中的列默认按 ascii 顺序显示列。
    df.createTempView(“mytable”)
    df.createOrReplaceTempView(“mytable”)
    df.createGlobalTempView(“mytable”)
    df.createOrReplaceGlobalTempView(“mytable”)
    Session.sql(“select * from global_temp.mytable ”).show()
  • Dataframe 是一个一个 Row 类型的 RDD,

json数据源

{"name":"科比","age":24}
{"name":"詹姆斯","age":23}
{"name":"杜兰特","age":25}
{"name":"保罗","age":26}
{"name":"库里","age":27}
{"name":"加索尔","age":28}
{"name":"朗多","age":29}
{"name":"皮尔斯"}
{"name":"雷阿伦"}
{"name":"奥多姆"}
{"name":"拜纳姆","age":24}
{"name":"科比","age":24}
{"name":"詹姆斯","age":23}
{"name":"杜兰特","age":25}
{"name":"保罗","age":26}
{"name":"库里","age":27}
{"name":"加索尔","age":28}
{"name":"朗多","age":29}
{"name":"皮尔斯"}
{"name":"雷阿伦"}
{"name":"奥多姆"}
{"name":"拜纳姆","age":24}
package sparkSql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}


object ReadJsonDataToDF2{
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().master("local").appName("name").getOrCreate()
    session.sparkContext.setLogLevel("Error")
//    val frame = session.read.json("data/jsondata")
    val frame = session.read.format("json").load("data/jsondata")
    frame.createTempView("t")
    val rdd: RDD[Row] = session.sql("select name,age from t where age is not null").rdd
    rdd.foreach(row=>{
      val name = row.getAs[String]("name")
      val age = row.getAs[Long]("age")
      println(s"name:$name,age$age")
    })
//    rdd.foreach(println)
//    frame.show()
  }
}

2.通过 json 格式的 RDD 创建 Dataframe
package sparkSql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}


object ReadJsonRDDToDF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().master("local").appName("name").getOrCreate()
    session.sparkContext.setLogLevel("Error")
    val jsonArr = Array[String](
      "{"name":"科比","age":24}",
      "{"name":"詹姆斯","age":23}",
      "{"name":"杜兰特","age":35}",
      "{"name":"保罗","age":3}"
    )
    import session.implicits._
    val jsonDataSet: Dataset[String] = jsonArr.toList.toDS()
    val frame = session.read.json(jsonDataSet)
    frame.createTempView("t")
    session.sql("select name,age from t").show()
    
//    val context = session.sparkContext
//    val jsonRDD: RDD[String] = context.parallelize(jsonArr)
//    val frame: Dataframe = session.read.json(jsonRDD)
//    frame.show()
  }
}

3.非 json 格式的 RDD 创建 Dataframe
  1. 通过反射的方式将非 json 格式的 RDD 转换成 Dataframe(不建议使 用)
    数据源:
1,科比,24,99
2,詹姆斯,6,100
3,杜兰特,35,100
4,哈登,13,80
5,乔丹,23,90

package sparkSql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataframe, SparkSession}


case class PersonInfo(id:Int,name:String,num:Int,score:Int)
object ReadRDDToDF {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().appName("test").master("local").getOrCreate()
    val context: SparkContext = session.sparkContext
    context.setLogLevel("Error")
    val person: RDD[String] = context.textFile("data/person")
    val personRDD: RDD[PersonInfo] = person.map(line => {
      val arr: Array[String] = line.split(",")
      val id: Int = arr(0).toInt
      val name: String = arr(1)
      val num: Int = arr(2).toInt
      val score: Int = arr(3).toInt
      PersonInfo(id, name, num, score)
    })
    import session.implicits._
    val frame: Dataframe = personRDD.toDF()
    frame.createTempView("t")
    val frame1: Dataframe = session.sql("select id,name,num,score from t")
    frame1.show()
    frame1.printSchema()
  }
}


2. 动态创建 Schema 将非 json 格式的 RDD 转换成 Dataframe

package sparkSql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Dataframe, Row, SparkSession}


object ReadRDDtoDF2 {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().master("local").appName("test1").getOrCreate()
    val context: SparkContext = session.sparkContext
    context.setLogLevel("Error")
    val personRDD: RDD[String] = context.textFile("data/person")
    val rowRDD: RDD[Row] = personRDD.map(line => {
      val arr: Array[String] = line.split(",")
      val id: Int = arr(0).toInt
      val name: String = arr(1)
      val num: Int = arr(2).toInt
      val score: Double = arr(3).toDouble
      Row(id, name, num, score)
    })
    val struct = StructType(List[StructField](
      StructField("id",DataTypes.IntegerType,true),
      StructField("name",DataTypes.StringType,true),
      StructField("num",DataTypes.IntegerType,true),
      StructField("score",DataTypes.DoubleType,true)
    ))
    val frame: Dataframe = session.createDataframe(rowRDD, struct)
    frame.show()
    frame.printSchema()
    frame.createTempView("t")
    val frame1: Dataframe = session.sql(
      """
        |select id,name,num,score from t
        |""".stripMargin)
    frame1.show()
  }
}

4.读取 parquet 文件创建 Dataframe
package sparkSql

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}


object ReadParquetFileToDF {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().appName("ttt").master("local").getOrCreate()
    val context: SparkContext = session.sparkContext
    context.setLogLevel("Error")
    val frame: Dataframe = session.read.json("data/jsondata")
    
    frame.write.mode(SaveMode.Overwrite).parquet("data/parquet")
    val frame1: Dataframe = session.read.parquet("data/parquet")
    frame1.show(22)
    println(frame1.count())
//    frame1.write.json("data/json")
  }
}

4.读取 cvs 格式的数据加载 Dataframe

csv数据源

id,name,age,score
1,科比,40,100
2,詹姆斯,37,100
3,乔丹,55,100
4,杜兰特,33,99
5,库里,34,99
package sparkSql

import org.apache.spark.sql.{Dataframe, SparkSession}


object ReadCSVDataToDF {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder().master("local").appName("eee").getOrCreate()
    session.sparkContext.setLogLevel("Error")
    val frame: Dataframe = session.read.option("header",true).csv("data/data.csv")
    frame.show()
  }
}

5.读取Tuple类型的Dataset加载Dataframe

数据源比较大,不粘贴

package sparkSql

import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}


object ReadTupleDatasetToDF {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("rrr").master("local").getOrCreate()
    session.sparkContext.setLogLevel("Error")
    val dt: Dataset[String] = session.read.textFile("data/pvuvdata")
    import session.implicits._
    val value: Dataset[(String, String, String, String, String, String, String)] = dt.map(line => {
      val arr: Array[String] = line.split("t")
      val ip = arr(0)
      val local = arr(1)
      val date = arr(2)
      val ts = arr(3)
      val uid = arr(4)
      val site = arr(5)
      val operator = arr(6)
      (ip, local, date, ts, uid, site, operator)
    })
    val frame: Dataframe = value.toDF("ip","local","date","ts","uid","site","operator")
    frame.createTempView("t")
    session.sql(
      """
        |select site,count(*) as site_count from t group by site order by site_count
        |""".stripMargin).show()
    session.sql(
      """
        |select site,count(*) uv from (select distinct ip,site from t) t1 group by site order by uv
        |""".stripMargin).show()
  }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5664895.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存