- Shark 是基于 Spark 计算框架之上且兼容 Hive 语法的 SQL 执行引擎,由于底层的计算采用了
Spark ,性能比 MapReduce 的 Hive 普遍快2倍以上,当数据全部加载在内存的话,将快10倍以上,因此 Shark 可以作为交互式查询应用服务来使用。 - 除了基于 Spark 的特性外, Shark 是完全兼容 Hive 的语法,表结构以及 UDF 函数等,已有的
Hive Sql 可以直接进行迁移至 Shark 上 Shark 底层依赖于 Hive 的解析器,查询优化器,但正是由于 shark 的整体设计架构对 Hive 的依赖性太强,难以支持其长远发展,比如不能和 Spark 的其他组件进行很好的集成,无法满足 Spark 的一站式解决大数据处理的需求。
- Spark on Hive :Hive只作为储存角色,Spark负责sql解析优化,执行。
- Hive on Spark :Hive即作为存储又负责sql的解析优化,Spark负责执行。
-
可以理解为RDD+schema(元信息)
-
特征
- 在spark中,Dataframe是一种以RDD为基础的分布式数据集,类似传统数据库的二维表格
- Dataframe带有schema元信息,即Dataframe所表示的二维表数据集的每一列都带有名称(如下图的 name age height)和类型(如下图的 string int double)
- Dataframe可以从很多数据源构建对象 ,如已经存在的RDD、结构化文件、外部数据库、Hive表。
- RDD可以把它的内部元素看成是一个java对象,Dataframe内部是一个个Row对象,它表示一行一行的数据
-
优点
- 提升执行效率
- 减少数据读取
- 执行优化
-
创建方式
-
case class
-
object HelloSparkCaseClass { def main(args: Array[String]): Unit = { //创建对象 val sparkSession: SparkSession = SparkSession.builder().master("local").appName("Hello02SparkSql").ge tOrCreate() //日志级别 val sparkContext: SparkContext = sparkSession.sparkContext sparkContext.setLogLevel("ERROR") //读取数据 val lines: RDD[String] = sparkContext.textFile("src/main/resources/dept.sql") val depts: RDD[Dept] = lines.map(_.split(",")).map(ele => new Dept(ele(0).toInt, ele(1), ele(2))) //开始隐式转换 import sparkSession.implicits._ val dataframe: Dataframe = depts.toDF() //打印信息-------DSL风格 dataframe.show() dataframe.printSchema() println(dataframe.count()) dataframe.columns.foreach(println) dataframe.select("deptno", "dname").show() dataframe.select("deptno", "dname").filter("deptno = 20").show() dataframe.groupBy("dname").count().show() //打印信息-------SQL风格 dataframe.createOrReplaceTempView("t_dept") sparkSession.sql("select * from t_dept").show() } } case class Dept(deptno: Int, dname: String, loc: String)
-
StructType
-
object HelloSparkStruct { def main(args: Array[String]): Unit = { //创建对象 val sparkSession: SparkSession = SparkSession.builder().master("local").appName("Hello02SparkSql").ge tOrCreate() //日志级别 val sparkContext: SparkContext = sparkSession.sparkContext sparkContext.setLogLevel("ERROR") //读取数据 val lines: RDD[String] = sparkContext.textFile("src/main/resources/dept.sql") val depts = lines.map(_.split(",")).map(ele => Row(ele(0).toInt, ele(1), ele(2))) //创建Dataframe val dataframe: Dataframe = sparkSession.createDataframe(depts, structType) //打印数据 dataframe.show() } //创建表格的类型 val structType: StructType = StructType( List( StructField("deptno", DataTypes.IntegerType), StructField("dname", DataTypes.StringType), StructField("loc", DataTypes.StringType) ) ) }
-
-
DataSet是分布式的数据集合,DataSet提供了强类型支持,在RDD的每行数据加了类型约束
-
Datset是在spark1.6中新添加的接口。它集中了RDD的优点(强类型和可以使用强大的lambda函数)以及使用了sparkSQL优化的执行引擎。
-
Dataframe(在2.X之后)实际上是DataSet的一个特例,即对Dataset的元素为Row时起了一个别名
-
import org.apache.spark.sql.{Dataset, SparkSession} object HelloDataSetJson { def main(args: Array[String]): Unit = { //创建SQL环境 val sparkSession = SparkSession.builder().master("local").appName("Hello02DataframeAvg").ge tOrCreate() import sparkSession.implicits._ val dataSet: Dataset[DeptJson] = sparkSession.read.json("src/main/data/dept.json").as[DeptJson] //打印数据 dataSet.show() } }
-
show
- show 只显示前20条记录。
- show(numRows: Int) 显示 numRows 条
- show(truncate: Boolean) 是否最多只显示20个字符,默认为 true 。
- show(numRows: Int, truncate: Boolean) 综合前面的显示记录条数,以及对过长字符串的显示格式。
-
collect 方法会将 jdbcDF 中的所有数据都获取到,并返回一个 Array 对象。
-
collectAsList:获取所有数据到List
-
first, head, take, takeAsList:获取若干行记录
- (1) first 获取第一行记录
- (2) head 获取第一行记录, head(n: Int) 获取前n行记录
- (3) take(n: Int) 获取前n行数据
- (4) takeAsList(n: Int) 获取前n行数据,并以 List 的形式展现
-
where(conditionExpr: String) :SQL语言中where关键字后的条件
- 可以用 and 和 or 。得到Dataframe类型的返回结果
-
filter :根据字段进行筛选
- 得到Dataframe类型的返回结果。和 where 使用条件相同
-
select :获取指定字段值
- 根据传入的 String 类型字段名,获取指定字段的值,以Dataframe类型返回
-
selectExpr :可以对指定字段进行特殊处理
- 可以直接对指定字段调用UDF函数,或者指定别名等。传入 String 类型参数,得到Dataframe对象。
-
col :获取指定字段
- 只能获取一个字段,返回对象为Column类型。
-
apply :获取指定字段
- 只能获取一个字段,返回对象为Column类型
-
drop :去除指定字段,保留其他字段
- 返回一个新的Dataframe对象,其中不包含去除的字段,一次只能去除一个字段。
- limit 方法获取指定Dataframe的前n行记录,得到一个新的Dataframe对象。
-
orderBy 和 sort :按指定字段排序,默认为升序
- 按指定字段排序。加个 - 表示降序排序。 sort 和 orderBy 使用方法相同
- jdbcDF.orderBy(- jdbcDF(“c4”)).show(false)
- jdbcDF.orderBy(jdbcDF(“c4”).desc).show(false)
-
sortWithinPartitions
- 和上面的 sort 方法功能类似,区别在于 sortWithinPartitions 方法返回的是按Partition排
- 好序的Dataframe对象。
- groupBy
- cube 和 rollup
- GroupedData对象
-
distinct :返回一个不包含重复记录的Dataframe
- 返回当前Dataframe中不重复的Row记录。该方法和接下来的 dropDuplicates() 方法不传入指定字段时的结果相同。
-
dropDuplicates :根据指定字段去重
- 根据指定字段去重。类似于 select distinct a, b *** 作
- 聚合 *** 作调用的是 agg 方法,该方法有多种调用方式。一般与 groupBy 方法配合使用。
- 以下示例其中最简单直观的一种用法,对 id 字段求最大值,对 c4 字段求和。
- unionAll 方法:对两个Dataframe进行组合 ,类似于 SQL 中的 UNIOn ALL *** 作
-
笛卡尔积
- joinDF1.join(joinDF2)
-
using 一个字段形式
- 下面这种join类似于 a join b using column1 的形式,需要两个Dataframe中有相同的一个列名,
- joinDF1.join(joinDF2, “id”)
-
using 多个字段形式
- 上面这种 using 一个字段的情况外,还可以 using 多个字段
- save可以将data数据保存到指定的区域
- Predicate Pushdown简称谓词下推,简而言之,就是在不影响结果的情况下,尽量将过滤条件提前执行。谓词下推后,过滤条件在map端执行,减少了map端的输出,降低了数据在集群上传输的量,节约了集群的资源,也提升了任务的性能。
- parquet文件
- JDBC
-
拷贝配置文件
- hdfs-site.xml
- core-site.xml
- hive-site.xml
-
写代码
-
one to one,进来一个出去一个,row mapping。是row级别 *** 作,如:upper、substr函数
package com.yjxxt import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.DataTypes object HelloHiveUDF { def main(args: Array[String]): Unit = { //搭建环境 val sparkSession = SparkSession.builder().master("local").appName("HelloHiveUDF").enableHiveSup port().getOrCreate() //定义UDF sparkSession.udf.register("strLen", (x: String) => { x.size }, DataTypes.IntegerType) //SQL sparkSession.sql("use yjx") val dataframe = sparkSession.sql("select id,strLen(uname) from t_user") //打印数据 dataframe.show() //关闭 sparkSession.stop() } }
-
many to one,进来多个出去一个,row mapping。是row级别 *** 作,如sum/min。
-
实现UDAF函数如果要自定义类要实现UserDefinedAggregateFunction类实现其中的方法。
-
package com.yjxxt import org.apache.spark.sql.{Dataframe, Row, SparkSession} import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType} object HelloHiveUDAF { def main(args: Array[String]): Unit = { //搭建环境 val sparkSession = SparkSession.builder().master("local").appName("HelloHiveUDAF").enableHiveSu pport().getOrCreate() //注册函数 sparkSession.udf.register("yjxAvg", YjxAvgUDAF) //开始查询 sparkSession.sql("use yjx") sparkSession.sql("select id,uname from t_user").show() val dataframe: Dataframe = sparkSession.sql("select gender,yjxAvg(age) from t_user group by gender") dataframe.show() //关闭 sparkSession.stop() } } object YjxAvgUDAF extends UserDefinedAggregateFunction { // 聚合函数的输入数据结构 override def inputSchema: StructType = StructType(StructField("input", LongType) :: Nil) // 缓存区数据结构 override def bufferSchema: StructType = StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) // 聚合函数返回值数据结构 override def dataType: DataType = DoubleType // 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出 override def deterministic: Boolean = true // 初始化缓冲区 override def initialize(buffer: MutableAggregationBuffer): Unit = { //求和 buffer(0) = 0L //计数 buffer(1) = 0L } // 给聚合函数传入一条新数据进行处理 override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (input.isNullAt(0)) return buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } // 合并聚合函数缓冲区 override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // 计算最终结果 override def evaluate(buffer: Row): Any = buffer.getLong(0).toDouble / buffer.getLong(1) }
- UDTF:one to mang,进来一个出去多行。如lateral view 与 explode,T:table-generating
- 通过实现抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDTF来自定义UDTF算子
package com.yjjxt import org.apache.hadoop.hive.ql.exec.{UDFArgumentException, UDFArgumentLengthException} import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspe ctorFactory import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory, StructObjectInspector} import org.apache.spark.sql.SparkSession object HelloHiveUDTF { def main(args: Array[String]): Unit = { //搭建环境 val sparkSession = SparkSession.builder().master("local").appName("HelloHiveUDTF").enableHiveSu pport().getOrCreate() //注册utdf算子,这里无法使用sparkSession.udf.register() sparkSession.sql("CREATE TEMPORARY FUNCTION UserDefinedUDTF as 'com.yjjxt.UserDefinedUDTF'") sparkSession.sql("use yjx") sparkSession.sql("select id , UserDefinedUDTF(uname) from t_user").show() //关闭 sparkSession.stop() } } class UserDefinedUDTF extends GenericUDTF { override def initialize(args: Array[ObjectInspector]): StructObjectInspector = { if (args.length != 1) { throw new UDFArgumentLengthException("UserDefinedUDTF takes only one argument") } if (args(0).getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("UserDefinedUDTF takes string as a parameter") } val fieldNames = new java.util.ArrayList[String] val fieldOIs = new java.util.ArrayList[ObjectInspector] //这里定义的是输出列默认字段名称 fieldNames.add("col1") //这里定义的是输出列字段类型 fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector) ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs) } //这是处理数据的方法,入参数组里只有1行数据,即每次调用process方法只处理一行数据 override def process(args: Array[AnyRef]): Unit = { //将字符串切分成单个字符的数组 val strLst = args(0).toString.split("") for (i <- strLst) { var tmp: Array[String] = new Array[String](1) tmp(0) = i //调用forward方法,必须传字符串数组,即使只有一个元素 forward(tmp) } } override def close(): Unit = {} }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)