Dataframe相当于在RDD的基础上添加了结构,DataSet相当于在Dataframe的基础上添加了类型
1 rdd->DF,DS// 创建rdd scala> val rdd = sc.makeRDD(List(("zhangsan",18))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[23] at makeRDD at2 DF->RDD,DS:28 scala> rdd.collect().foreach(println) (zhangsan,18) // rdd->df rdd.toDF(列名1,列名2,...) scala> val df = rdd.toDF("name","age") df: org.apache.spark.sql.Dataframe = [name: string, age: int] // 打印输出 scala> df.show +--------+---+ | name|age| +--------+---+ |zhangsan| 18| +--------+---+ // rdd->ds 样例类RDD.toDS() // 创建样例类 case class User(name: String, age: Int) scala> val rdd1 = rdd.map(data=>User(data._1,data._2)) rdd1: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[27] at map at :31 scala> val ds = rdd1.toDS() ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int] scala> ds.show +--------+---+ | name|age| +--------+---+ |zhangsan| 18| +--------+---+
// df -> RDD df.rdd scala> val dfToRDD = df.rdd dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[35] at rdd at3 DS->RDD,DF:29 scala> dfToRDD.collect().foreach(println) [zhangsan,18] // df -> ds ds.as[样例类] scala> val dfTods = df.as[User] dfTods: org.apache.spark.sql.Dataset[User] = [name: string, age: int] scala> dfTods.show +--------+---+ | name|age| +--------+---+ |zhangsan| 18| +--------+---+
// ds -> RDD ds.rdd scala> val dsToRDD = ds.rdd dsToRDD: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[44] at rdd at:29 scala> dsToRDD.collect().foreach(println) User(zhangsan,18) // ds -> df ds.toDF scala> val dsTodf = ds.toDF dsTodf: org.apache.spark.sql.Dataframe = [name: string, age: int] scala> dsTodf.show +--------+---+ | name|age| +--------+---+ |zhangsan| 18| +--------+---+
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)