SparkSql 相比较 HiveSql 具有更快的运行速度和更高的灵活性,平常使用中经常需要进行数据转换,常见的有 RDD[T] -> Dataframe,Dataframe -> RDD[T] 还有 RDD[row] -> sql.dataframe,下面简单介绍下常用用法。
初始化 SparkSession :
// 1.配置Spark val conf = { if (local) new SparkConf().setAppName("spark_sql").setMaster("local[*]") else new SparkConf().setAppName("spark_sql") } val spark = SparkSession .builder .config(conf) .getOrCreate() val sc = spark.sparkContext val sqlContext = new SQLContext(sc) import sqlContext.implicits._二.RDD[T] 转 DataFrmae
这里 parallelize 模拟一个用户 RDD 包含 name 和 age 属性,通常情况下 map 返回 pairRdd,这里通过隐式转换 sqlContext.implicits._,使得 RDD 转换为 Dataframe,注意这里列名 cols 需要与 pariRdd 元祖的大小匹配。后续通过 createOrReplcaeTempView 可以构建临时表,随后通过 .sql 方法按照 hiveSql 语句进行任务执行命令。
// 2.RDD[T] 转 Dataframe val userInfo = sc.parallelize(Array[(String, String)]( ("A", "90s"), ("B", "00s"), ("C", "10s"), ("D", "20s"))) val userDf = userInfo.map(info => { val name = info._1 val age = info._2 (name, age) }).toDF("name", "age") userDf.createOrReplaceTempView("tmp_user_info") sqlContext.sql("select * from tmp_user_info where name != 'A'").show(10)
+----+---+ |name|age| +----+---+ | B|00s| | C|10s| | D|20s| +----+---+三.Dataframe 转 RDD[T]
spark 提供 read csv 方法可以将 Text 文本文件 or CSV 文件直接读取为 Dataframe,dataframe 每一行数据为 row,有些同学需要转换为 Array 或者其他类执行后续代码,可以通过下述方法实现:
// 3.Dataframe 转 RDD[T] val userRdd = spark.read.option("header", "false") .option("delimiter", "t") .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") .csv(".user.gz").rdd.map(row => { val info = row.toSeq.toArray.map(f => if (f == null) "NULL" else f.toString) info }) userRdd.take(5).foreach(arr => println(arr.mkString(",")))
这里 header 代表是否保留列名,适用于 csv 文件,delimiter 代表文件的分隔符,这里需要注意 timestampFormat 参数,如果这个参数不设置,会报如下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX
四.RDD[row] 转 Dataframe除了上述场景,还有对 dataframe 过滤的场景,一方面可以采用 sqlContext 作为临时表进行过滤,也可以采取转 RDD 解析字符串进行解析,这时 Dataframe 就会转变为 RDD[row],如果需要转回 Dataframe,可以执行以下 *** 作:
// 4.RDD[row] 转 sql.dataframe val nullable: Boolean = true val schema = StructType( List( StructField("name", StringType, nullable), StructField("age", StringType, nullable) ) ) val filterUserRdd = userDf.rdd.filter(line => { val info = line.toSeq.map(_.toString) val age = info(1) age.equals("90s") }) val userSqlDf = sqlContext.createDataframe(filterUserRdd, schema) userSqlDf.createOrReplaceTempView("new_df") sqlContext.sql("select * from new_df").show(5)
StructField 标识字段类型,除了 StringType 外,还有其他常用的类型比如 DoubleType,BooleanType 等等,有需要进 rg.apache.spark.sql.types 类寻找即可,这里通过定义列名与列属性结合原始 DF,可以实现 row -> dataframe 的转变,后续可以继续 HiveSql 相关 *** 作。
+----+---+ |name|age| +----+---+ | A|90s| +----+---+
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)