IO流的方式读取数据
1、字节流的方式读取数据(一次读取一个字节)
InputStream in = new FileInputStream("input")
2、字节缓冲流的方式读取数据(把数据缓存起来,批量读取)
InputStream in = new BufferedInputStream(FileInputStream("input"))
3、一次读取一行的缓存流的方式读取数据
BufferedReader reader = new BufferedReader(new InputStreamReader( new FileInputStream("input"), "UTF-8") )
流的读取,都是在调用read。称之为惰性加载。
IO流和RDD之间关系
//读取外部文件
val textRDD: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\")
//对读取到的内容进行切割并进行扁平化处理
val flatMapRDD: RDD[String] = textRDD.flatMap(_.split(" "))
//对数据集中的内容进行结构转换
val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))
//对相同的单词key的value进行聚合
val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
//将执行结果进行收集
val res: Array[(String, Int)] = reduceRDD.collect()
res.foreach(println)
1)产生一个lineRdd
2)flatMap包装了lineRdd,返回wordRdd
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))}
3)map包装了wordRdd ,返回wordToOneRdd
def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}
4)只有最后调用collect()才会执行
2.RDDRDD(Resilient Distributed Dataset)叫做d性分布式数据集,是Spark中最基本的数据抽象。 代码中是一个抽象类,它代表一个d性的、不可变、可分区、里面的元素可并行计算的集合。
1)d性
存储的d性:内存与磁盘的自动切换;
容错的d性:数据丢失可以自动恢复;
计算的d性:计算出错重试机制;
分片的d性:可根据需要重新分片。
2)分布式
数据存储在大数据集群不同节点上
3)数据集
RDD封装了计算逻辑,并不保存数据
4)数据抽象
RDD是一个抽象类,需要子类具体实现
5)不可变
RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
6)可分区、并行计算
3.RDD特性Internally, each RDD is characterized by five main properties: - A list of partitions - A function for computing each split - A list of dependencies on other RDDs - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
(1)一组分区(Partition),即是数据集的基本组成单位;
protected def getPartitions:Array[Partition]
(2)一个计算每个分区的函数;
def compute(split: Partition, context: TaskContext): Interator[T]
(3)RDD之间的依赖关系;
protected def getDependencies: Seq[Dependency[_]] = deps
(4)一个Partitioner,即RDD的分片函数;控制分区的数据流向(键值对)
val partitioner : scala.Option[org.apache.spark.Partitioner]
(5)一个列表,存储存取每个Partition的优先位置(preferred location)。 移动数据不如移动计算,除非资源不够。
protected def getPreferredLocations(split : Partition) : scala.Seq[String]
A list of partitions
多个分区,分区可以看成是数据集的基本组成单位
对于 RDD 来说, 每个分区都会被一个计算任务处理, 并决定了并行计算的粒度。
用户可以在创建 RDD 时指定 RDD 的分区数, 如果没有指定, 那么就会采用默认值。 默认值就是程序所分配到的 CPU Core 的数目.
每个分配的存储是由BlockManager 实现的, 每个分区都会被逻辑映射成 BlockManager 的一个 Block,,而这个 Block 会被一个 Task 负责计算。
A function for computing each split
- (分区)的函数.
- 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现compute函数以达到这个目的
- A list of dependencies on other RDDs
- RDD 之间的依赖关系
- 的每次转换都会生成一个新的 RDD, 所以 RDD 之间会形成类似于流水线一样的前后依赖关系。 在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- RDD,还有一个可选的分区器
- key-value的 RDD,才会有 Partitioner, 非key-value的 RDD 的 Partitioner 的值是 None;Partitiner 不但决定了 RDD 的本区数量, 也决定了 parent RDD Shuffle 输出时的分区数量
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
- (preferred location)位置的列表
- HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置. 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置.
RDD编程_asd623444055的博客-CSDN博客
三、数据读取与保存Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;
文件系统分为:本地文件系统、HDFS以及数据库。
1.文件类数据读取与保存 1.Text文件1)数据读取:textFile(String)
2)数据保存:saveAsTextFile(String)
3)代码实现
object Operate_Text {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
//3.2 保存数据
inputRDD.saveAsTextFile("output")
//4.关闭连接
sc.stop()
}
}
4)注意:如果是集群路径:hdfs://hadoop102:8020/input/1.txt
2.Json文件如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
1)数据准备
在input目录下创建1.txt文件,里面存储如下内容
{"name": "zhangsan","age": 10}
{"name": "lisi","age": 19}
{"name": "wangwu","age": 25}
2)代码实现
/**
* 读取Json
*/
object Spark03_readJson {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("D:\mywork\IDEAproject\spark-11\src\main\input\test.json")
val resRDD: RDD[Option[Any]] = rdd.map(JSON.parseFull)
resRDD.foreach(println)// {"name": "zhangsan","age": 18}
// {"name": "lisi","age": 19}
//释放资源
sc.stop()
}
}
注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。
3.Sequence文件SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。
1)代码实现
object Operate_Sequence {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 创建rdd
val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(3,4),(5,6)))
//3.2 保存数据为SequenceFile
dataRDD.saveAsSequenceFile("output")
//3.3 读取SequenceFile文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2)注意:SequenceFile文件只针对PairRDD
4.Object对象文件对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。
1)代码实现
object Operate_Object {
def main(args: Array[String]): Unit = {
//1.创建SparkConf并设置App名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
//2.创建SparkContext,该对象是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3.1 创建RDD
val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4))
//3.2 保存数据
dataRDD.saveAsObjectFile("output")
//3.3 读取数据
sc.objectFile[(Int)]("output").collect().foreach(println)
//4.关闭连接
sc.stop()
}
}
2.文件系统类数据读取与保存
1.HDFS
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建 *** 作接口。对于外部存储创建 *** 作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口。
2.MySQL支持通过Java JDBC访问关系型数据库。需要通过JdbcRDD进行,示例如下:
(1)添加依赖
mysql
mysql-connector-java
5.1.27
(2)从Mysql读取数据
package com.spark.day06
import java.sql.DriverManager
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.parsing.json.JSON
/**
* 读取MySQL数据
*
* sc: SparkContext, spark程序执行入口,上下文对象
* getConnection: () => Connection, 获取数据库连接
* sql: String, 执行SQL语句
* lowerBound: Long, 查询的起始位置
* upperBound: Long, 查询的结束位置
* numPartitions: Int, 分区数
* mapRow: (ResultSet) => T 对结果集的处理
*
*
* 注册驱动
* 获取连接
* 创建数据库 *** 作对象PrepareStatement
* 执行SQL
* 处理结果集
* 关闭连接
*/
object Spark04_MySQL_read {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
//数据库连接4要素
var driver = "com.mysql.jdbc.Driver"
var url = "jdbc:mysql://hadoop102:3306/test"
var username = "root"
var password = "000000"
var sql:String = "select * from user where id >= ? and id <= ?"
val resRDD = new JdbcRDD(
sc,
() => {
//注册驱动
Class.forName(driver)
//获取连接
DriverManager.getConnection(url, username, password)
},
sql, 3, 20,
2,
rs => (rs.getInt(1), rs.getString(2), rs.getInt(3))
)
resRDD.collect().foreach(println)
//(3,wangwu,20)
//(4,zhaoliu,21)
//释放资源
sc.stop()
}
}
(3)往Mysql写入数据
package com.spark.day06
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 向MySQL写入数据
*
* sc: SparkContext, spark程序执行入口,上下文对象
* getConnection: () => Connection, 获取数据库连接
* sql: String, 执行SQL语句
* lowerBound: Long, 查询的起始位置
* upperBound: Long, 查询的结束位置
* numPartitions: Int, 分区数
* mapRow: (ResultSet) => T 对结果集的处理
*
*
* 注册驱动
* 获取连接
* 创建数据库 *** 作对象PrepareStatement
* 执行SQL
* 处理结果集
* 关闭连接
*/
object Spark05_MySQL_write {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
//数据库连接4要素
var driver = "com.mysql.jdbc.Driver"
var url = "jdbc:mysql://hadoop102:3306/test"
var username = "root"
var password = "000000"
val rdd: RDD[(Int, String, Int)] = sc.makeRDD(List((9, "shiyi", 24), (10, "shier", 25)))
//在循环体中创建连接对象,每次变量出RDD中的一个元素都要创建一个连接对象,效率较低,不推荐使用
/*rdd.foreach {
case (id, name, age) => {
//注册驱动
Class.forName(driver)
//获取连接
val conn: Connection = DriverManager.getConnection(url, username, password)
//声明数据库 *** 作的SQL语句
var sql: String = "insert into user(id,name,age)values(?,?,?)"
//创建数据库 *** 作对象PrepareStatement
val ps: PreparedStatement = conn.prepareStatement(sql)
//给参数赋值
ps.setInt(1,id)
ps.setString(2,name)
ps.setInt(3,age)
//执行SQL
ps.executeUpdate()
//关闭连接
ps.close()
conn.close()
}
}
*/
//在循环体外创建连接对象,需要让ps实现序列化,但是Ps不是自定义的类型,没有办法实现
//注册驱动
/*Class.forName(driver)
//获取连接
val conn: Connection = DriverManager.getConnection(url, username, password)
//声明数据库 *** 作的SQL语句
var sql: String = "insert into user(id,name,age)values(?,?,?)"
//创建数据库 *** 作对象PrepareStatement
val ps: PreparedStatement = conn.prepareStatement(sql)
//获取分区数据
rdd.foreach {
case (id, name, age) => {
//给参数赋值
ps.setInt(1,id)
ps.setString(2,name)
ps.setInt(3,age)
//执行SQL
ps.executeUpdate()
}
}
//关闭连接
ps.close()
conn.close()
*/
rdd.foreachPartition {
//获取RDD一个分区数据
datas => {
//注册驱动
Class.forName(driver)
//获取连接
val conn: Connection = DriverManager.getConnection(url, username, password)
//声明数据库 *** 作的SQL语句
var sql: String = "insert into user(id,name,age)values(?,?,?)"
//创建数据库 *** 作对象PrepareStatement
val ps: PreparedStatement = conn.prepareStatement(sql)
//对当前分区内的数据进行遍历
//注意:这个foreach不是算子,是集合的方法
datas.foreach {
case (id, name, age) => {
ps.setInt(1, id)
ps.setString(2, name)
ps.setInt(3, age)
//执行SQL
ps.executeUpdate()
}
}
//关闭连接
ps.close()
conn.close()
}
}
//释放资源
sc.stop()
}
}
四、累加器
累加器:分布式共享只写变量。(Task和Task之间不能读数据)
累加器用来对信息进行聚合,通常在向Spark传递函数时,比如使用map()函数或者用 filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。
1.系统累加器1)代码实现
package com.spark.day06
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
/**
* Accumulator累加器
* 分布式共享只写变量--------task之间不能读取数据
*/
object Spark06_Accumulator {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
//创建一个RDD,对其进行求和 ,如果是单值RDD,直接对数据进行求和,内存空间占用小,不存在Shuffle的过程
/*val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
println(rdd.sum())
println(rdd.reduce(_ + _))
*/
//val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 4), ("a", 3)))
/*(k,v)RDD计算求和存在Shuffle 效率较低
val resRDD: RDD[(String, Int)] = rdd.reduceByKey(_ + _)
resRDD.map(a=>a._2).collect().foreach(println)
*/
//如果定义一个普通的遍历,那么在Driver定义,Executor会创建变量的副本,算子都是对副本进行 *** 作,Driver端的变量不会更新
/*var sum:Int = 0
rdd.foreach{
case (word,count)=>{
sum += count
}
}
println(sum)//0
*/
//如果需要通过Executor对Driver端定义的变量进行更新,需要定义累加器
//累加器和普通的变量相比,会把Executor端的结果,收集到Driver端继续汇总
//创建累加器
//val sum: Accumulator[Int] = sc.accumulator(10)过期了
val sum: LongAccumulator = sc.longAccumulator("MyAcc")
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 4), ("a", 3)),2)
rdd.foreach{
case (word,count)=>{
sum.add(count)
println(" ..... "+sum.value)
}
}
println(sum.value)
//释放资源
sc.stop()
}
}
通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。Spark闭包里的执行器代码可以使用累加器的+=方法(在Java中是 add)增加累加器的值。驱动器程序可以调用累加器的value属性(在Java中使用value()或setValue())来访问累加器的值。
注意:
(1)工作节点上的任务不能相互访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。
(2)对于要在行动 *** 作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动 *** 作中。转化 *** 作中累加器可能会发生不止一次更新。
2.自定义累加器自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。
1)自定义累加器步骤
(1)继承AccumulatorV2,设定输入、输出泛型
(2)重写方法
2)需求:自定义累加器,统计集合中首字母为“H”单词出现的次数。
List("Hello", "Hello", "Hello", "Hello", "Hello", "Spark", "Spark", "Hadoop", "High")
3)代码实现
package com.spark.day06
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
/**
* Accumulator自定义累加器
* 统计出RDD中,以"H"起始的单词
*/
object Spark07_MyAccumulator {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "Hello", "Hello", "Hello", "Spark", "Spark", "Hadoop", "High"))
//创建累加器对象
val myAcc = new MyAccumulator
//注册累加器
sc.register(myAcc)
//使累加器
rdd.foreach {
word => {
myAcc.add(word)
}
}
//输出累加器结果
println(myAcc.value)
//释放资源
sc.stop()
}
}
//定义一个类,继承AccumulatorV2
//泛型累加器输入和输出的类型
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
//定义一个集合,记录单词及出现次数
var map = mutable.Map[String, Int]()
//是否为初始状态 判断集合是否为空
override def isZero: Boolean = map.isEmpty
//拷贝
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
val newAcc = new MyAccumulator
newAcc.map = this.map
newAcc
}
//重置 清空集合
override def reset(): Unit = map.clear()
//向累加器中添加元素
override def add(v: String): Unit = {
if (v.startsWith("H")) {
map(v) = map.getOrElse(v, 0) + 1
}
}
//合并
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
//当前Executor的map
var map1 = map
//另一个Executor的map
var map2 = other.value
map = map1.foldLeft(map2) {
//mm表示map2,kv表示map1中的每一个元素
(mm, kv) => {
//指定合并规则
val key: String = kv._1
val value: Int = kv._2
//根据map1中的key,到map2中去找value
mm(key) = mm.getOrElse(key, 0) + value
mm
}
}
}
//获取累加器中的值
override def value: mutable.Map[String, Int] = map
}
五、广播变量
广播变量:分布式共享只读变量。
在多个并行 *** 作中(Executor)使用同一个变量,Spark默认会为每个任务(Task)分别发送,这样如果共享比较大的对象,会占用很大工作节点的内存。
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark *** 作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。
1)使用广播变量步骤:
(1)通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象,任何可序列化的类型都可以这么实现。
(2)通过value属性访问该对象的值(在Java中为value()方法)。
(3)变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。
2)代码实现
package com.spark.day06
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 广播变量
* 分布式共享只读变量
*/
object Spark08_Broadcast {
def main(args: Array[String]): Unit = {
//创建Spark配置文件对象
val conf: SparkConf = new SparkConf().setAppName("Spark01_CreateRDD_mem").setMaster("local[*]")
//创建SparkContext对象,该对象时提交Spark App的入口
val sc = new SparkContext(conf)
//实现类似join效果 (a,(1,4),(b,(2,5)),(c,(3,6)))
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val list = List(("a", 4), ("b", 5), ("c", 6))
//list需要发往每一个task 占用内存,不建议使用
/*val resRDD: RDD[(String, (Int, Int))] = rdd.map {
case (k1, v1) => {
var v3 = 0
for ((k2, v2) <- list) {
if (k1 == k2) {
v3 = v2
}
}
(k1, (v1, v3))
}
}
resRDD.collect().foreach(println)
*/
//创建广播变量 broadcastList会发往每个executor task之间从executor读取变量
val broadcastList: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resRDD: RDD[(String, (Int, Int))] = rdd.map {
case (k1, v1) => {
var v3 = 0
for ((k2, v2) <- broadcastList.value) {
if (k1 == k2) {
v3 = v2
}
}
(k1, (v1, v3))
}
}
resRDD.collect().foreach(println)
//释放资源
sc.stop()
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)