大数据Spark之详细解读Spark-Sql

大数据Spark之详细解读Spark-Sql,第1张

数据Spark之详细解读Spark-Sql Spark Sql 简介 Shark
  • Shark 是基于 Spark 计算框架之上且兼容 Hive 语法的 SQL 执行引擎,由于底层的计算采用了
    Spark ,性能比 MapReduce 的 Hive 普遍快2倍以上,当数据全部加载在内存的话,将快10倍以上,因此 Shark 可以作为交互式查询应用服务来使用。
  • 除了基于 Spark 的特性外, Shark 是完全兼容 Hive 的语法,表结构以及 UDF 函数等,已有的
    Hive Sql 可以直接进行迁移至 Shark 上 Shark 底层依赖于 Hive 的解析器,查询优化器,但正是由于 shark 的整体设计架构对 Hive 的依赖性太强,难以支持其长远发展,比如不能和 Spark 的其他组件进行很好的集成,无法满足 Spark 的一站式解决大数据处理的需求。
Spark on Hive和Hive on Spark
  • Spark on Hive :Hive只作为储存角色,Spark负责sql解析优化,执行。
  • Hive on Spark :Hive即作为存储又负责sql的解析优化,Spark负责执行。
DataSet和Dataframe Dataframe
  • 可以理解为RDD+schema(元信息)

  • 特征

    • 在spark中,Dataframe是一种以RDD为基础的分布式数据集,类似传统数据库的二维表格
    • Dataframe带有schema元信息,即Dataframe所表示的二维表数据集的每一列都带有名称(如下图的 name age height)和类型(如下图的 string int double)
    • Dataframe可以从很多数据源构建对象 ,如已经存在的RDD、结构化文件、外部数据库、Hive表。
    • RDD可以把它的内部元素看成是一个java对象,Dataframe内部是一个个Row对象,它表示一行一行的数据
  • 优点

    • 提升执行效率
    • 减少数据读取
    • 执行优化
  • 创建方式

    • case class

    • object HelloSparkCaseClass {
          def main(args: Array[String]): Unit = {
        //创建对象
        val sparkSession: SparkSession =
      SparkSession.builder().master("local").appName("Hello02SparkSql").ge
      tOrCreate()
        //日志级别
        val sparkContext: SparkContext = sparkSession.sparkContext
        sparkContext.setLogLevel("ERROR")
        //读取数据
        val lines: RDD[String] =
      sparkContext.textFile("src/main/resources/dept.sql")
        val depts: RDD[Dept] = lines.map(_.split(",")).map(ele => new
      Dept(ele(0).toInt, ele(1), ele(2)))
        //开始隐式转换
        import sparkSession.implicits._
        val dataframe: Dataframe = depts.toDF()
        //打印信息-------DSL风格
        dataframe.show()
        dataframe.printSchema()
        println(dataframe.count())
        dataframe.columns.foreach(println)
        dataframe.select("deptno", "dname").show()
        dataframe.select("deptno", "dname").filter("deptno = 20").show()
        dataframe.groupBy("dname").count().show()
        //打印信息-------SQL风格
        dataframe.createOrReplaceTempView("t_dept")
        sparkSession.sql("select * from t_dept").show()
      }
      }
      case class Dept(deptno: Int, dname: String, loc: String)
      
    • StructType

    • object HelloSparkStruct {
       def main(args: Array[String]): Unit = {
        //创建对象
        val sparkSession: SparkSession =
      SparkSession.builder().master("local").appName("Hello02SparkSql").ge
      tOrCreate()
        //日志级别
        val sparkContext: SparkContext = sparkSession.sparkContext
        sparkContext.setLogLevel("ERROR")
        //读取数据
        val lines: RDD[String] =
      sparkContext.textFile("src/main/resources/dept.sql")
        val depts = lines.map(_.split(",")).map(ele => Row(ele(0).toInt,
      ele(1), ele(2)))
        //创建Dataframe
           val dataframe: Dataframe = sparkSession.createDataframe(depts,
      structType)
        //打印数据
        dataframe.show()
      }
       //创建表格的类型
       val structType: StructType = StructType(
        List(
         StructField("deptno", DataTypes.IntegerType),
         StructField("dname", DataTypes.StringType),
         StructField("loc", DataTypes.StringType)
       )
      )
      }
      
DataSet
  • DataSet是分布式的数据集合,DataSet提供了强类型支持,在RDD的每行数据加了类型约束

  • Datset是在spark1.6中新添加的接口。它集中了RDD的优点(强类型和可以使用强大的lambda函数)以及使用了sparkSQL优化的执行引擎。

  • Dataframe(在2.X之后)实际上是DataSet的一个特例,即对Dataset的元素为Row时起了一个别名

  • import org.apache.spark.sql.{Dataset, SparkSession}
    
    object HelloDataSetJson {
     def main(args: Array[String]): Unit = {
      //创建SQL环境
      val sparkSession =
    SparkSession.builder().master("local").appName("Hello02DataframeAvg").ge
    tOrCreate()
      import sparkSession.implicits._
      val dataSet: Dataset[DeptJson] =
    sparkSession.read.json("src/main/data/dept.json").as[DeptJson]
      //打印数据
      dataSet.show()
    }
    }
    
DSL数据 *** 作 action
  • show

    • show 只显示前20条记录。
    • show(numRows: Int) 显示 numRows 条
    • show(truncate: Boolean) 是否最多只显示20个字符,默认为 true 。
    • show(numRows: Int, truncate: Boolean) 综合前面的显示记录条数,以及对过长字符串的显示格式。
  • collect 方法会将 jdbcDF 中的所有数据都获取到,并返回一个 Array 对象。

  • collectAsList:获取所有数据到List

  • describe(cols: String*):获取指定字段的统计信息

  • first, head, take, takeAsList:获取若干行记录

    • (1) first 获取第一行记录
    • (2) head 获取第一行记录, head(n: Int) 获取前n行记录
    • (3) take(n: Int) 获取前n行数据
    • (4) takeAsList(n: Int) 获取前n行数据,并以 List 的形式展现
查询
  • where(conditionExpr: String) :SQL语言中where关键字后的条件

    • 可以用 and 和 or 。得到Dataframe类型的返回结果
  • filter :根据字段进行筛选

    • 得到Dataframe类型的返回结果。和 where 使用条件相同
  • select :获取指定字段值

    • 根据传入的 String 类型字段名,获取指定字段的值,以Dataframe类型返回
  • selectExpr :可以对指定字段进行特殊处理

    • 可以直接对指定字段调用UDF函数,或者指定别名等。传入 String 类型参数,得到Dataframe对象。
  • col :获取指定字段

    • 只能获取一个字段,返回对象为Column类型。
  • apply :获取指定字段

    • 只能获取一个字段,返回对象为Column类型
  • drop :去除指定字段,保留其他字段

    • 返回一个新的Dataframe对象,其中不包含去除的字段,一次只能去除一个字段。
Limit
  • limit 方法获取指定Dataframe的前n行记录,得到一个新的Dataframe对象。
排序
  • orderBy 和 sort :按指定字段排序,默认为升序

    • 按指定字段排序。加个 - 表示降序排序。 sort 和 orderBy 使用方法相同
    • jdbcDF.orderBy(- jdbcDF(“c4”)).show(false)
    • jdbcDF.orderBy(jdbcDF(“c4”).desc).show(false)
  • sortWithinPartitions

    • 和上面的 sort 方法功能类似,区别在于 sortWithinPartitions 方法返回的是按Partition排
    • 好序的Dataframe对象。
组函数
  • groupBy
  • cube 和 rollup
  • GroupedData对象
去重
  • distinct :返回一个不包含重复记录的Dataframe

    • 返回当前Dataframe中不重复的Row记录。该方法和接下来的 dropDuplicates() 方法不传入指定字段时的结果相同。
  • dropDuplicates :根据指定字段去重

    • 根据指定字段去重。类似于 select distinct a, b *** 作
聚合
  • 聚合 *** 作调用的是 agg 方法,该方法有多种调用方式。一般与 groupBy 方法配合使用。
  • 以下示例其中最简单直观的一种用法,对 id 字段求最大值,对 c4 字段求和。
Union
  • unionAll 方法:对两个Dataframe进行组合 ,类似于 SQL 中的 UNIOn ALL *** 作
Join
  • 笛卡尔积

    • joinDF1.join(joinDF2)
  • using 一个字段形式

    • 下面这种join类似于 a join b using column1 的形式,需要两个Dataframe中有相同的一个列名,
    • joinDF1.join(joinDF2, “id”)
  • using 多个字段形式

    • 上面这种 using 一个字段的情况外,还可以 using 多个字段
save
  • save可以将data数据保存到指定的区域
SparkSQL的数据源 SparkSQL 的数据源可以是 JSON 类型的字符串, JDBC , Parquet , Hive , HDFS 等。 谓词下推
  • Predicate Pushdown简称谓词下推,简而言之,就是在不影响结果的情况下,尽量将过滤条件提前执行。谓词下推后,过滤条件在map端执行,减少了map端的输出,降低了数据在集群上传输的量,节约了集群的资源,也提升了任务的性能。
创建Dataset
  • parquet文件
  • JDBC
Hive
  • 拷贝配置文件

    • hdfs-site.xml
    • core-site.xml
    • hive-site.xml
  • 写代码

SparkSql on Hive自定义函数 UDF
  • one to one,进来一个出去一个,row mapping。是row级别 *** 作,如:upper、substr函数

    package com.yjxxt
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.DataTypes
    object HelloHiveUDF {
     def main(args: Array[String]): Unit = {
      //搭建环境
      val sparkSession =
    SparkSession.builder().master("local").appName("HelloHiveUDF").enableHiveSup
    port().getOrCreate()
      //定义UDF
      sparkSession.udf.register("strLen", (x: String) => {
       x.size
     }, DataTypes.IntegerType)
      //SQL
      sparkSession.sql("use yjx")
      val dataframe = sparkSession.sql("select id,strLen(uname) from t_user")
      //打印数据
      dataframe.show()
      //关闭
      sparkSession.stop()
    }
    }
    
UDAF
  • many to one,进来多个出去一个,row mapping。是row级别 *** 作,如sum/min。

  • 实现UDAF函数如果要自定义类要实现UserDefinedAggregateFunction类实现其中的方法。

  • package com.yjxxt
    import org.apache.spark.sql.{Dataframe, Row, SparkSession}
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
    UserDefinedAggregateFunction}
    import org.apache.spark.sql.types.{DataType, DoubleType, LongType,
    StructField, StructType}
    object HelloHiveUDAF {
     def main(args: Array[String]): Unit = {
      //搭建环境
      val sparkSession =
    SparkSession.builder().master("local").appName("HelloHiveUDAF").enableHiveSu
    pport().getOrCreate()
      //注册函数
      sparkSession.udf.register("yjxAvg", YjxAvgUDAF)
      //开始查询
      sparkSession.sql("use yjx")
      sparkSession.sql("select id,uname from t_user").show()
      val dataframe: Dataframe = sparkSession.sql("select gender,yjxAvg(age)
    from t_user group by gender")
      dataframe.show()
      //关闭
      sparkSession.stop()
    }
    }
    object YjxAvgUDAF extends UserDefinedAggregateFunction {
     // 聚合函数的输入数据结构
     override def inputSchema: StructType = StructType(StructField("input",
    LongType) :: Nil)
     // 缓存区数据结构
     override def bufferSchema: StructType = StructType(StructField("sum",
    LongType) :: StructField("count", LongType) :: Nil)
     // 聚合函数返回值数据结构
     override def dataType: DataType = DoubleType
     // 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
     override def deterministic: Boolean = true
     // 初始化缓冲区
     override def initialize(buffer: MutableAggregationBuffer): Unit = {
      //求和
      buffer(0) = 0L
      //计数
      buffer(1) = 0L
    }
     // 给聚合函数传入一条新数据进行处理
     override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
    {
      if (input.isNullAt(0)) return
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
     // 合并聚合函数缓冲区
     override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
    = {
      buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
      buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
    }
     // 计算最终结果
     override def evaluate(buffer: Row): Any = buffer.getLong(0).toDouble /
    buffer.getLong(1)
    }
    
UDTF
  • UDTF:one to mang,进来一个出去多行。如lateral view 与 explode,T:table-generating
  • 通过实现抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDTF来自定义UDTF算子
package com.yjjxt
import org.apache.hadoop.hive.ql.exec.{UDFArgumentException,
UDFArgumentLengthException}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspe
ctorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector,
ObjectInspectorFactory, StructObjectInspector}
import org.apache.spark.sql.SparkSession
object HelloHiveUDTF {
 def main(args: Array[String]): Unit = {
  //搭建环境
  val sparkSession =
SparkSession.builder().master("local").appName("HelloHiveUDTF").enableHiveSu
pport().getOrCreate()
  //注册utdf算子,这里无法使用sparkSession.udf.register()
  sparkSession.sql("CREATE TEMPORARY FUNCTION UserDefinedUDTF as
'com.yjjxt.UserDefinedUDTF'")
  sparkSession.sql("use yjx")
  sparkSession.sql("select id , UserDefinedUDTF(uname) from
t_user").show()
  //关闭
  sparkSession.stop()
}
}
class UserDefinedUDTF extends GenericUDTF {
 
 override def initialize(args: Array[ObjectInspector]):
StructObjectInspector = {
  if (args.length != 1) {
   throw new UDFArgumentLengthException("UserDefinedUDTF takes only one
argument")
 }
  if (args(0).getCategory() != ObjectInspector.Category.PRIMITIVE) {
   throw new UDFArgumentException("UserDefinedUDTF takes string as a
parameter")
 }
  val fieldNames = new java.util.ArrayList[String]
  val fieldOIs = new java.util.ArrayList[ObjectInspector]
  //这里定义的是输出列默认字段名称
  fieldNames.add("col1")
  //这里定义的是输出列字段类型
  fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
  ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs)
}
 //这是处理数据的方法,入参数组里只有1行数据,即每次调用process方法只处理一行数据
 override def process(args: Array[AnyRef]): Unit = {
  //将字符串切分成单个字符的数组
  val strLst = args(0).toString.split("")
  for (i <- strLst) {
   var tmp: Array[String] = new Array[String](1)
   tmp(0) = i
   //调用forward方法,必须传字符串数组,即使只有一个元素
   forward(tmp)
 }
}
 override def close(): Unit = {}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5668938.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存