- 一、Spark
- 1、Local 模式
- 2、StandAlone 模式
- 3、Yarn 模式
- 二、一些概念
- 1、RDD 五大特性
- 1.1 RDD 是由分区(partition)组成
- 1.2 算子作用在每一个分区(切片)
- 1.3 RDD 之间存在依赖关系
- 1.4 分区类算子只能作用在 k-v 格式的 RDD 中
- 1.5 Spark 中移动计算 不移动数据
- 2、job、stage、task、partition
- 2.1 job
- 2.2 stage
- 2.3 task
- 2.4 partition
- 3、Spark 比 MapReduce 快的原因
- 零碎
I know, i know
地球另一端有你陪我
一、Spark
一个计算系统,对标 Hadoop 中的 MapReduce,其中也应用了该思想
不同于 Hadoop 的基于磁盘,经常需要进行数据落地,在磁盘中进行运算
每个任务都需要进行落地,下一次任务需要从磁盘中再次读取
一个字:稳
Spark 是基于内存计算的,只有较少步骤需要数据落地,能够更快速地处理数据
取消了大部分任务之间落地的动作,直接进入下一个任务
一个字:快
语言上,Spark 支持 Scala Python Java 等语言
(Scala 和 Python 类似,Java 会比较复杂)
运行模式上,有4种模式可以运行
Local 多用于测试
Standalone
Mesos
YARN 最具前景
需要在项目中添加 Sark 依赖 spark-core_2.11
以及 scala 依赖
package day68 import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo1WC { def main(args: Array[String]): Unit = { // 创建Spark的入口 // SparkConf对象 val conf: SparkConf = new SparkConf() // 给Spark程序赋予一个名字,如果提交到Yarn上 可以再8088中看到 conf.setAppName("Demo1WC") // 指定Spark的运行方式为local方式,即在本地运行 conf.setMaster("local[3]") // 创建SparkContext上下文环境 val sc: SparkContext = new SparkContext(conf) // 1、读取文件 val linesRDD: RDD[String] = sc.textFile("Spark/data/words.txt") // 2、将每个单词提取出来 // flatMap在Spark中称之为 算子 // 大部分 算子 在执行完之后 返回的还是RDD val wordsRDD: RDD[String] = linesRDD.flatMap(line => line.split(",")) // 3、对每个单词进行分组 val groupRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy(word => word) // 4、统计每个单词的数量 val wordCount: RDD[String] = groupRDD.map(kv => kv._1 + "," + kv._2.size) // 5、打印 wordCount.foreach(println) // wordCount // .saveAsTextFile("Spark/data/wordCnt") } }
2、StandAlone 模式
使用 Spark 自带的集群模式,需要在每一个节点上进行配置
1 上传解压…
2 修改配置文件 conf
Spark 自己的集群中:
master 相当于 ResourceManager, work 相当于 NodeManager
cp spark-env.sh.template spark-env.sh // 增加配置 export SPARK_MASTER_IP=master export SPARK_MASTER_PORT=7077 export SPARK_WORKER_CORES=2 export SPARK_WORKER_INSTANCES=1 export SPARK_WORKER_MEMORY=2g export JAVA_HOME=/usr/local/soft/jdk1.8.0_171
3 从节点配置
cp slaves.template slaves node1 node2
4 分发
scp -r spark-2.4.5 node1:`pwd` scp -r spark-2.4.5 node2:`pwd`
5 配置环境变量
只需要添加 bin 目录下的
启动指令在 sbin,但是会与 hadoop 的启动指令冲突
因此一般会在当前目录下 ./ 启动
6 启动集群
需要处于 sbin 目录之下
./start-all.sh 可以访问spark ui http://master:8080/
client
日志在本地输出,一般用于上线前测试(bin/下执行)
Spark 中自带了几个测试用的类
cd /usr/local/soft/spark-2.4.5/examples/jars spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-examples_2.11-2.4.5.jar 100
cluster
上线使用,不会再本地打印日志
spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 --executor-memory 512M --total-executor-cores 1 --deploy-mode cluster spark-examples_2.11-2.4.5.jar 100
3、Yarn 模式
1 停止 Spark 的集群服务
./stop-all.sh
2 spark 整合 yarn 只需要在一个节点整合,
可以删除 node1 和 node2 中所有的spark 文件
3 增加hadoop 配置文件地址
spark-2.4.5/conf/spark-env.sh 增加 export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop
4 需要先关闭 Yarn 服务
stop-yarn.sh
5 修改 yarn-site 配置文件
位置 cd /usr/local/soft/hadoop-2.7.6/etc/hadoop vim yarn-site.xml 添加yarn.nodemanager.pmem-check-enabled false yarn.nodemanager.vmem-check-enabled false
6 分发
scp -r yarn-site.xml node1:`pwd` scp -r yarn-site.xml node2:`pwd`
7 启动 hadoop
start-all.sh
此时可以删除两个从节点上的 Spark,只需要在 master 保留一个即可
依然需要指定 jar 包来执行,因此在文件夹内最方便
cd /usr/local/soft/spark-2.4.5/examples/jars spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 512M --num-executors 2 spark-examples_2.11-2.4.5.jar 100 spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --executor-memory 512m --num-executors 2 --executor-cores 1 spark-examples_2.11-2.4.5.jar 100
单词计数
package day68 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkConf, SparkContext} object Demo1_1WC { def main(args: Array[String]): Unit = { // 创建Spark的入口 // SparkConf对象 val conf = new SparkConf() // 给Spark程序赋予一个名字,如果提交到Yarn上 可以再8088中看到 conf.setAppName("Demo1_1WC") // 指定Spark的运行方式为local方式,即在本地运行 // conf.setMaster("local") // 创建SparkContext上下文环境 val sc: SparkContext = new SparkContext(conf) // 1、读取文件 val fin = sc.textFile("hdfs://master:9000/Spark/data/words.txt") .flatMap(_.split(",")) .groupBy(w => w) .map(kv => kv._1 + ":" + kv._2.size) val cf: Configuration = new Configuration() val fs: FileSystem = FileSystem.get(cf) val path = new Path("hdfs://master:9000/Spark/data/output/Demo1out") if(fs.exists(path)){ fs.delete(path,true) } // 结果保存到 hdfs 本地 fin.saveAsTextFile("hdfs://master:9000/Spark/data/output/Demo1out") // 因为日志跑完就消失 // 想看到的话可以加一个死循环 // while (true) {} } }
二、一些概念
1、RDD 五大特性
ResilientDistributedDataset d性分布式数据集
是 RDD 中的一种编程模型,几乎贯穿了 Spark 的全过程
可以简单认为是整合了 Scala 基本数据类型,并且可以使用他们大部分函数(算子)
不同的是,RDD 本身不存储数据,只有在计算时会有数据流通
默认第一个 RDD (一般是文件读取) 分区的数量,是由切片来决定的
子RDD 一般会继承 父RDD 的数量,涉及到第三点的依赖
1 但是 Spark 最少会为每一个任务提供至少两个分区 2 文件读取是用 TextInputFormat 的方法,所以和 hadoop 中的 MR 一致, 而切片得到的是一个(偏移量,内容)的 KV 对, Spark 会将偏移量去除,只保留内容
上图虽然画了很多 partition,但在同一个 stage 中(后面会说)
会将能够依序进行的算子,放在一个 partition 中运行
被上传的任务,Spark 会为每个 job 分配至少两个分区
而本地模式可以自己手动控制线程的数量,并且没有最少2个分区的限制
Spark 中会将函数(map、groupby等等)称为算子
一些算子运算会返回一个新的 RDD,称为转换算子(Transformations)
其他返回不是 RDD 的算子的算子称为执行算子(Actions)
转换算子有懒依赖的特性,即单独上传时,不会计算;
只有当执行算子存在时,才会触发转换算子开始计算
在执行 task 时,根据算子的执行,会有先后形成的 RDD,称为父子 RDD
RDD 中分为两种依赖:窄依赖和宽依赖
窄依赖:一对一
例如 map、flatmap 等等,不需要不同分区间交互的算子(崽依赖hhh)
窄依赖后,partition 的数量不变
宽依赖:一对多
例如 groupby 等,需要不同分区间进行交互,例如 shuffle(洗牌)
宽依赖后,一般默认 partition 的数量也不变
但可以在会造成宽依赖的算子(如groupby等)参数中自定义数量
相较于其他算子,分区类算子可以更加针对性地处理数据
代价是对数据的局限性也会更高,要求其中的元素必须得是(key,value)格式
例如 groupByKey、reduceByKey、join等等
1.5 Spark 中移动计算 不移动数据
Spark 会将 task 发送到数据所在的节点中运行
可以避免数据移动造成的额外资源占用
2、job、stage、task、partition
2.1 job
由执行算子触发,一个执行算子会生成一个 job
包含 stage 和 task 的概念
2.2 stage
是一组可以并行计算的 task
由于宽依赖势必会造成 shuffle,会将原本连续的分区进行暂时性的终止,
因此 Spark 将每个 Job 按照 shuffle 分为多个部分(stage)
在 shuffle 后,会将数据进行一次落地(存储到磁盘,过程中会序列化,压缩)
是 Spark 计算中的最小单位,由一个或多个可以连续运行的算子组成
线程池中的一个线程对象
一个切片会形成一个 task,一个 task 会在一个 partition 中运行
类比 hadoop 中 MR 里的 map task 和 reduce task
是 RDD 编程模型中的概念,RDD 是由 partition 组成
Spark 的任务一般会从读取数据开始(sc.context())
类似 MapReduce 的读取流程,会先通过 Inputformat 对数据进行切片
一个切片会生成一个 partition ,一个 partition 会生成一个 task
3、Spark 比 MapReduce 快的原因
1、基于内存的运算模式,数据在内存中流通计算
2、DAG 能够将算子进行一定程度的整合,连续计算,大大避免中间数据落地的次数
3、不同于 MapReduce 的细粒度资源申请,开启一个 task 申请一次资源;
Spark 是粗粒度资源申请,会一次申请一块资源来使用,
task 全部完成后,才会释放避免了资源的频繁申请和释放
零碎
1、MapReduce(映射规约)是一种思想,或者说概念,并不是一种具体的工具
2、一个文件至少会产生一个 Map Task,因为至少会有一个切片(splits)
3、建议在本地的任意目录下添加一个 hadoop/bin/ 目录
该目录下存放一个 winutils.exe 文件
接着在环境变量中添加一个 HADOOP_HOME 指向 hadoop
可以回避一个运行时的报错,但不影响程序运行
4、关于读取 hdfs 中的文件,路径需要写全,即
hdfs://master:9000/Spark/data/words.txt
否则可能会默认读取一个其他的地方,很那什么
5、查看上传任务的日志
yarn logs -applicationId application_1640760945595_0004
id 是提交任务后,Spark 分配的任务id,控制台可以看到,8088 yarn 上也可以看到
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)