- Yarn 模式
- Yarn 模式概述
- Yarn 模式配置
- 日志服务
- 几种运行模式的对比
- EXecuter&Core&并行度
- 理解 RDD
- RDD 特点
- d性
- 分区
- 只读
- 依赖(血缘)
- 缓存
- checkpoint
- RDD 编程
- RDD 编程模型
- RDD 的创建
- 从内存中创建RDD
- RDD的并行度&分区
- 从文件中创建RDD
- 分区数量的计算方式
- 数据分区的分配
Spark 客户端可以直接连接 Yarn,不需要额外构建Spark集群。
有 client 和 cluster 两种模式,主要区别在于:Driver 程序的运行节点不同。
•client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出
•cluster:Driver程序运行在由 RM(ResourceManager)启动的 AM(AplicationMaster)上, 适用于生产环境。
工作模式介绍:
步骤1: 修改 hadoop 配置文件 yarn-site.xml, 添加如下内容:
由于咱们的测试环境的虚拟机内存太少, 防止将来任务被意外杀死, 配置所以做如下配置.
yarn.nodemanager.pmem-check-enabled false yarn.nodemanager.vmem-check-enabled false
修改后分发配置文件.
步骤2: 复制 spark, 并命名为spark-yarn
cp -r spark-standalone spark-yarn
步骤3: 修改spark-evn.sh文件
去掉 master 的 HA 配置, 日志服务的配置保留着.
并添加如下配置: 告诉 spark 客户端 yarn 相关配置
YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop
步骤4: 执行一段程序
bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client ./examples/jars/spark-examples_2.11-2.1.1.jar 100 http://hadoop202:8088日志服务
在前面的页面中点击 history 无法直接连接到 spark 的日志.
可以在spark-default.conf中添加如下配置达到上述目的
spark.yarn.historyServer.address=hadoop103:18080 spark.history.ui.port=18080
可能碰到的问题:
如果在 yarn 日志端无法查看到具体的日志, 则在yarn-site.xml中添加如下配置
几种运行模式的对比 EXecuter&Core&并行度yarn.log.server.url http://hadoop201:19888/jobhistory/logs
package com.atguigu.bigdata.sparkcore.wc.test import java.io.{ObjectOutput, ObjectOutputStream, OutputStream} import java.net.Socket object Driver { def main(args: Array[String]): Unit = { //连接服务器 val client1 =new Socket("localhost",9999) val client2 =new Socket("localhost",8888) val task=new Task() val out1:OutputStream= client1.getOutputStream val objOut1=new ObjectOutputStream(out1) val subTask=new SubTask() subTask.logic=task.logic subTask.datas=task.datas.take(2) objOut1.writeObject(subTask) objOut1.flush() objOut1.close() client1.close() val out2:OutputStream= client2.getOutputStream val objOut2=new ObjectOutputStream(out2) val subTask1=new SubTask() subTask1.logic=task.logic subTask1.datas=task.datas.takeRight(2) objOut2.writeObject(subTask1) objOut2.flush() objOut2.close() client2.close() println("客户端数据发送完毕") } }
package com.atguigu.bigdata.sparkcore.wc.test import java.io.{InputStream, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor { def main(args: Array[String]): Unit = { //启动服务器,接收数据 val server =new ServerSocket(9999) println("服务器启动,等待接收数据") //等待客户端的连接 val client:Socket=server.accept() val in:InputStream=client.getInputStream val objIn=new ObjectInputStream(in) val task:SubTask=objIn.readObject().asInstanceOf[SubTask] val ints:List[Int]=task.compute() println("计算节点[9999]计算的结果为:"+ints) objIn.close() client.close() server.close() } }
package com.atguigu.bigdata.sparkcore.wc.test import java.io.{InputStream, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor2 { def main(args: Array[String]): Unit = { //启动服务器,接收数据 val server =new ServerSocket(8888) println("服务器启动,等待接收数据") //等待客户端的连接 val client:Socket=server.accept() val in:InputStream=client.getInputStream val objIn=new ObjectInputStream(in) val task:SubTask=objIn.readObject().asInstanceOf[SubTask] val ints:List[Int]=task.compute() println("计算节点[8888]计算的结果为:"+ints) objIn.close() client.close() server.close() } }
package com.atguigu.bigdata.sparkcore.wc.test class Task extends Serializable { val datas=List(1,2,3,4) val logic =( num:Int)=>{num*2} }
package com.atguigu.bigdata.sparkcore.wc.test class SubTask extends Serializable { var datas:List[Int]=_ var logic :Int=>Int=_ //计算 def compute()={ datas .map(logic) } }理解 RDD
一个 RDD 可以简单的理解为一个分布式的元素集合.
RDD 表示只读的分区的数据集,对 RDD 进行改动,只能通过 RDD 的转换 *** 作, 然后得到新的 RDD, 并不会对原 RDD 有任何的影响
在 Spark 中, 所有的工作要么是创建 RDD, 要么是转换已经存在 RDD 成为新的 RDD, 要么在 RDD 上去执行一些 *** 作来得到一些计算结果.
每个 RDD 被切分成多个分区(partition), 每个分区可能会在集群中不同的节点上进行计算.
•存储的d性:内存与磁盘的自动切换;
•容错的d性:数据丢失可以自动恢复;
•计算的d性:计算出错重试机制;
•分片的d性:可根据需要重新分片。
RDD 逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。
如果 RDD 是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果 RDD 是通过其他 RDD 转换而来,则 compute函数是执行转换逻辑将其他 RDD 的数据进行转换。
RDD 是只读的,要想改变 RDD 中的数据,只能在现有 RDD 基础上创建新的 RDD。
由一个 RDD 转换到另一个 RDD,可以通过丰富的转换算子实现,不再像 MapReduce 那样只能写map和reduce了。
RDD 的 *** 作算子包括两类,
•一类叫做transformation,它是用来将 RDD 进行转化,构建 RDD 的血缘关系;
•另一类叫做action,它是用来触发 RDD 进行计算,得到 RDD 的相关计算结果或者 保存 RDD 数据到文件系统中.
RDDs 通过 *** 作算子进行转换,转换得到的新 RDD 包含了从其他 RDDs 衍生所必需的信息,RDDs 之间维护着这种血缘关系,也称之为依赖。
如下图所示,依赖包括两种,
•一种是窄依赖,RDDs 之间分区是一一对应的,
•另一种是宽依赖,下游 RDD 的每个分区与上游 RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
如果在应用程序中多次使用同一个 RDD,可以将该 RDD 缓存起来,该 RDD 只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该 RDD 的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。
如下图所示,RDD-1 经过一系列的转换后得到 RDD-n 并保存到 hdfs,RDD-1 在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的 RDD-1 转换到 RDD-m 这一过程中,就不会计算其之前的 RDD-0 了。
虽然 RDD 的血缘关系天然地可以实现容错,当 RDD 的某个分区数据计算失败或丢失,可以通过血缘关系重建。
但是对于长时间迭代型应用来说,随着迭代的进行,RDDs 之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。
为此,RDD 支持checkpoint 将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint 后的 RDD 不需要知道它的父 RDDs 了,它可以从 checkpoint 处拿到数据。
本章介绍 RDD 的编程技术
RDD 编程模型在 Spark 中,RDD 被表示为对象,通过对象上的方法调用来对 RDD 进行转换。
经过一系列的transformations定义 RDD 之后,就可以调用 actions 触发 RDD 的计算
action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。
在Spark中,只有遇到action,才会执行 RDD 的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
要使用 Spark,开发者需要编写一个 Driver 程序,它被提交到集群以调度运行 Worker
Driver 中定义了一个或多个 RDD,并调用 RDD 上的 action,Worker 则执行 RDD 分区计算任务。
package com.atguigu.bigdata.sparkcore.wc.rdd.builder import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_Memory { def main(args: Array[String]): Unit = { //TODO 准备环境 val sparkConf =new SparkConf().setMaster("local[*]").setAppName("RDD") val sc=new SparkContext(sparkConf) //TODO 创建RDD //从内存中创建RDD,将内存中集合的数据作为处理的数据源 val seq=Seq[Int](1,2,3,4) //parallelize:并行 // val rdd:RDD[Int]=sc.parallelize(seq) //makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法 val rdd:RDD[Int]=sc.makeRDD(seq) rdd.collect().foreach(println) //TODO 关闭环境 sc.stop() } }RDD的并行度&分区
package com.atguigu.bigdata.sparkcore.wc.rdd.builder import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_Memory_Par { def main(args: Array[String]): Unit = { //TODO 准备环境 val sparkConf =new SparkConf().setMaster("local[*]").setAppName("RDD") sparkConf.set("spark.dafault.parallelism","5") val sc=new SparkContext(sparkConf) //TODO 创建RDD //RDD的并行度&分区 //makeRDD方法可以传递第二个参数,这个参数表示分区的数量 //第二个参数可以不传递的,那么makeRDD方法会使用默认值:defaultParallelism(默认并行度) // val rdd=sc.makeRDD(List(1,2,3,4),2) val rdd=sc.makeRDD(List(1,2,3,4)) //将处理的数据保存成分区文件 rdd.saveAsTextFile("output") //TODO 关闭环境 sc.stop() } }
package com.atguigu.bigdata.sparkcore.wc.rdd.builder import org.apache.spark.{SparkConf, SparkContext} object Spark01_RDD_Memory_Par1 { def main(args: Array[String]): Unit = { //TODO 准备环境 val sparkConf =new SparkConf().setMaster("local[*]").setAppName("RDD") val sc=new SparkContext(sparkConf) //TODO 创建RDD //[1,2],[3,4] // val rdd=sc.makeRDD(List(1,2,3,4),2) //[1],[2],[3,4] // val rdd=sc.makeRDD(List(1,2,3,4),3) //[1],[2,3],[4,5] val rdd=sc.makeRDD(List(1,2,3,4,5),3) //将处理的数据保存成分区文件 rdd.saveAsTextFile("output") //TODO 关闭环境 sc.stop() } }从文件中创建RDD
package com.atguigu.bigdata.sparkcore.wc.rdd.builder import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_File { def main(args: Array[String]): Unit = { //TODO 准备环境 val sparkConf =new SparkConf().setMaster("local[*]").setAppName("RDD") val sc=new SparkContext(sparkConf) //TODO 创建RDD //从文件中创建RDD,将文件中的数据作为处理的数据源 //path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径 // sc.textFile("E:\sparkcode\comatguigu\datas.txt") // val rdd:RDD[String]=sc.textFile("datas/1.txt") //path路径可以是文件的具体路径,也可以是目录名称 // val rdd=sc.textFile("datas") //path路径还可以使用通配符 * // val rdd =sc.textFile("datas/1*.txt") //path还可以是分布式存储系统路径:HDFS val rdd =sc.textFile("hdfs://linux1:8020/test.txt") rdd.collect().foreach(println) //TODO 关闭环境 sc.stop() } }
package com.atguigu.bigdata.sparkcore.wc.rdd.builder import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_File1 { def main(args: Array[String]): Unit = { //TODO 准备环境 val sparkConf =new SparkConf().setMaster("local[*]").setAppName("RDD") val sc=new SparkContext(sparkConf) //TODO 创建RDD //从文件中创建RDD,将文件中的数据作为处理的数据源 //textFile :以行为单位莱读取数据,读取的数据都是字符串 //wholeTextFiles:以文件为单位读取数据, //读取的结果表示元组,第一个元素表示文件路径,第二个元素表示文件内容 val rdd=sc.wholeTextFiles("datas") rdd.collect().foreach(println) //TODO 关闭环境 sc.stop() } }分区数量的计算方式
package com.atguigu.bigdata.sparkcore.wc.rdd.builder import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_File_Par { def main(args: Array[String]): Unit = { //TODO 准备环境 val sparkConf =new SparkConf().setMaster("local[*]").setAppName("RDD") val sc=new SparkContext(sparkConf) //TODO 创建RDD //textFile可以将文件作为数据处理的数据源,默认也可以设定分区。 //minPartitions:最小分区数量 //math.min(defaultParallelism,2) // val rdd=sc.textFile("datas/1.txt") //如果不想使用默认的分区数量,可以通过第二个参数指定分区数 //Spark读取文件,底层其实使用的就是Hadoop的读取方式 //分区数量的计算方式 //totalSize=7 //goalSize=7/2=3(byte) val rdd=sc.textFile("datas/1.txt",2) rdd.saveAsTextFile("output") //TODO 关闭环境 sc.stop() } }数据分区的分配
package com.atguigu.bigdata.sparkcore.wc.rdd.builder import org.apache.spark.{SparkConf, SparkContext} object Spark02_RDD_File_Par1 { def main(args: Array[String]): Unit = { //TODO 准备环境 val sparkConf =new SparkConf().setMaster("local[*]").setAppName("RDD") val sc=new SparkContext(sparkConf) //TODO 创建RDD //TODO 数据分区的分配 //1.数据以行为单位进行读取 //spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,和字节数没有关系、 //2.数据读取时以偏移量为单位,偏移量不会被重复读取 //3.数据分区的偏移量范围的计算 //0=>[0,3] =>12 //1=>[3.6] =>3 //2=>[6,7] val rdd=sc.textFile("datas/1.txt",2) rdd.saveAsTextFile("output") //TODO 关闭环境 sc.stop() } }
package com.atguigu.bigdata.sparkcore.wc.rdd.builder import org.apache.spark.{SparkConf, SparkContext} object Spark03_RDD_File_Par2 { def main(args: Array[String]): Unit = { //TODO 准备环境 val sparkConf =new SparkConf().setMaster("local[*]").setAppName("RDD") val sc=new SparkContext(sparkConf) //TODO 创建RDD //14byte/2=7byte //14/7=2(分区) //如果数据源为多个文件,那么计算分区时以文件为单位进行分区 val rdd=sc.textFile("datas/word.txt",2) rdd.saveAsTextFile("output") //TODO 关闭环境 sc.stop() } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)