前言:本文主要介绍Spark中结构化API的使用。
一、数据源Read API的结构:
DataframeReader.format(文件类型).option(属性,值).schema(自定义的模式).load(文件路径)
format、schema、一系列option选项,每一步转换都会返回一个DataframeReader。
例如:
spark.read.format("csv") .option("headr",true) .option("mode",FAILFAST) .option("inferSchema",true) .load("D:/data/spark-data.csv")
读取模式:通过 option("mode",值)设置
permissive:当遇到错误格式的记录时,将所有字段设置为null并将所有错误格式的记录放在名为_corrupt_record字符串列中dropMalformed:删除包含错误格式记录的行failFast:遇到错误格式记录后立即返回失败
Write API:
DataframeWriter.format(文件格式) .option(属性,值) .partitionBy(字段) .bucketBy(字段) .sortBy(字段) .save(路径)
1.CSV:
读文件
spark.read.format("csv") .option("headr",true) .option("mode",FAILFAST) .option("inferSchema",true) .load("D:/data/spark-data.csv")
写文件
df.write.repartition(1) .format("csv") .mode("overwrite") .option("sep","t") .save("D:/spark/data/spark-data.csv")
2.JSON
读文件
spark.read.format("json") .option("header",true") .option("inferSchmea",true) .load("D:/spark/data/spark-data.json")
写文件
df.write.repartition(1) .format("json") .mode("overwrite") .option("sep","t") .save("D:/spark/data/spark-data.json")
数据的读取格式基本都一样。
二、API *** 作本文使用的数据为
航班数据
1
1.取列:col() column()$"列名" '列名
2.selectExpr函数
可以利用selectExpr来构建复杂的表达式来创建Dataframe
df.selectExpr("*","(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)
3.添加列:withColumn(列名,列)
df.withColumn("numberOne",lit(1)).show(2)
4.重命名列 withColumnRename(原列名,新列名)
df.withColumnRenamed("DEST_COUNTRY_NAME","DEST").show(2)
5. 删除列:drop(列名)
df.drop("DEST_COUNTRY_NAME").show(2)
6.过滤行:filter、where
df.filter(col("count")<2).show(2) df.where("count<2").show(2)
7.去重 distinct()
//scala df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").distinct().count() //sql select count(distinct(DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME)) from dfTable
8.连接连个Dataframe,要求具有相同的Schema
df.union(newDF)
9.行排序 sort和orderBy 默认是升序
df.sort("count").show(3) df.orderBy("count","DEST_COUNTRY_NAME").show(3) import org.apache.spark.sql.functions.{desc,asc} df.sort(desc("count")).show(2) df.orderBy(desc("count")).show(2)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)