package com.huc.sparkSql import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataframe, Dataset, SparkSession} object Test03_RDDAndDataSet { def main(args: Array[String]): Unit = { // 1. 创建sparkSession配置对象 val conf: SparkConf = new SparkConf().setAppName("sparkSql").setMaster("local[*]") // 2. 创建一个sparkSession val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() val sc: SparkContext = spark.sparkContext // 3. 使用sparkSession val userRdd: RDD[User] = sc.makeRDD(List(User("zhangsan", 10), User("lisi", 20))) // 相互转换的时候,需要导入隐式转换 import spark.implicits._ val ds: Dataset[User] = userRdd.toDS() ds.show() // 将样例类的ds转换回Rdd // 不会丢失类型 val rdd: RDD[User] = ds.rdd for (elem <- rdd.collect()) { println(elem.name) println(elem.age) } // 如果是普通类型的rdd转换为ds // 一般用于样例类之间的相互转化,不然意义不大 val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 2) val ds1: Dataset[Int] = rdd1.toDS() ds1.show() // 4. 关闭sparkSession spark.close() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)