Spark 结构化API-DataFram、SparkSQL

Spark 结构化API-DataFram、SparkSQL,第1张

Spark 结构化API-DataFram、SparkSQL

前言:本文主要介绍Spark中结构化API的使用。

一、数据源

Read API的结构:

DataframeReader.format(文件类型).option(属性,值).schema(自定义的模式).load(文件路径)

format、schema、一系列option选项,每一步转换都会返回一个DataframeReader。

例如:

spark.read.format("csv")
            .option("headr",true)
            .option("mode",FAILFAST)
            .option("inferSchema",true)
            .load("D:/data/spark-data.csv")

读取模式:通过 option("mode",值)设置

permissive:当遇到错误格式的记录时,将所有字段设置为null并将所有错误格式的记录放在名为_corrupt_record字符串列中dropMalformed:删除包含错误格式记录的行failFast:遇到错误格式记录后立即返回失败

Write API:

DataframeWriter.format(文件格式)
                .option(属性,值)
                .partitionBy(字段)
                .bucketBy(字段)
                .sortBy(字段)
                .save(路径)

1.CSV:

文件

spark.read.format("csv")
            .option("headr",true)
            .option("mode",FAILFAST)
            .option("inferSchema",true)
            .load("D:/data/spark-data.csv")

写文件

df.write.repartition(1)
        .format("csv")
        .mode("overwrite")
        .option("sep","t")
        .save("D:/spark/data/spark-data.csv")

2.JSON

读文件

spark.read.format("json")
          .option("header",true")
          .option("inferSchmea",true)
          .load("D:/spark/data/spark-data.json")

写文件

df.write.repartition(1)
        .format("json")
        .mode("overwrite")
        .option("sep","t")
        .save("D:/spark/data/spark-data.json")

数据的读取格式基本都一样。

二、API *** 作

本文使用的数据为

航班数据

DEST_COUNTRY_NAMEORIGIN_COUNTRY_NAMEcountUnited States              Romania15..................United StatesCroatia

1

1.取列:col()    column()$"列名"   '列名

 2.selectExpr函数

可以利用selectExpr来构建复杂的表达式来创建Dataframe

df.selectExpr("*","(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry").show(2)

 3.添加列:withColumn(列名,列)

df.withColumn("numberOne",lit(1)).show(2)

4.重命名列 withColumnRename(原列名,新列名)

df.withColumnRenamed("DEST_COUNTRY_NAME","DEST").show(2)

 

5. 删除列:drop(列名)

df.drop("DEST_COUNTRY_NAME").show(2)

 

 6.过滤行:filter、where

df.filter(col("count")<2).show(2)
df.where("count<2").show(2)

 7.去重 distinct()

//scala
df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").distinct().count()

//sql
select count(distinct(DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME)) from dfTable

 8.连接连个Dataframe,要求具有相同的Schema

df.union(newDF)

9.行排序 sort和orderBy 默认是升序

df.sort("count").show(3)
df.orderBy("count","DEST_COUNTRY_NAME").show(3)

import org.apache.spark.sql.functions.{desc,asc}

df.sort(desc("count")).show(2)
df.orderBy(desc("count")).show(2)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5715902.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存