Spark Core [05-07]
两个demo
很多复杂的业务拆分开都是变种的wc:
分组 ==> 变种WC ==> 数据补齐
将不同组的数据按规则合并在一起
split(“,”,0) // 第一个参数指定分隔符,第二个参数限制split后数组长度:
// split(",",3) 限制3个元素,所以从第三个元素开始就不再切分,而是把剩下的所有内容作为一个长字符串元素
String[] split = "1,2,3,4,5,6".split(",",3) ;
{"1","2","3,4,5,6"}
闭包
rdd1的transiformtion 中不能嵌套另一个rdd2。
可以将rdd2转成scala的数组(触发action),再放到rdd1中使用,能用但本末倒置。
闭包:在driver 中定义的变量,只会传递一个值的副本到 excutor中,对这个值的 *** 作作用域仅限于 excutor内,而无法在 driver中获取 这个变量被excutor改变的值。
例如之前学习将rdd内容写入数据库,获取数据库连接也需要写在 map中,而不能在driver中提前获取连接再传入map,会报错 java.sql.Connection 没有序列化。
在driver端定义的类,如果拿到excutor中执行(涉及网络IO了),也会报错无法序列化。
如果是自己的类无法序列化,那就实现序列化接口,但是框架的类没法修改。
广播变量 Broadcast Varizbles在MR中,学习了mapjoin 和 reducejoin,mapjoin在spark中就对应BroadcastJoin。
left.join(right) 普通的join,可能有也可能没有shuffle(两个rdd和并到一个rdd)。
广播变量,就是将小数据量的内容广播出去,(在MR中也是把小表的内容丢到分布式缓存),广播变量没有Shuffle。(数据广播出去以后便不会再有数据重新分发)。
未使用广播变量时,数据需要从driver复制到每个executor的每个task中。
广播变量的数据从driver传输到executor的内存中,每个task都从自己节点的executor获取。
从driver传输到executor,必然涉及网络IO,必须实现序列化。(这个变量不经过广播传到executor也会要求序列化)
广播变量是广播只读变量,创建后不可修改。(这个变量不经过广播时传到executor也会因为闭包的原因"无法修改")
所以广播一词的含义体现在executor的每个task都从所在的executor中共享这个变量。
如何使用可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
这个广播变量被创建以后,那么在集群中的任何函数中,都应该使用广播变量broadcastVar的值,而不是使用v的值,这样就不会把v重复分发到这些节点上。
此外,一旦广播变量创建后,普通变量v的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。https://zhuanlan.zhihu.com/p/354811884
rdd.cache() 和 sc.broadcast(v)
持久化 是rdd才有的,把整个rdd数据持久化到个节点的内存中;是节点自己去读取数据放到自己内存,不存在网络IO。
持久化的内容可以修改,但不适合频繁修改(持久化的目的是反复使用,需要频繁修改不叫反复使用)。
广播变量 可以是任何可被序列化的数据,从driver传输到executor。
广播变量是广播只读变量,着眼于读;
累加器 是全局写变量,着眼于写。
https://blog.csdn.net/hellojoy/article/details/113359299
Java 版本的SparkContext相同包名的java文件和scala文件都会编译到一个目录下。
SparkConf sparkConf = new SparkConf.setAppName("Your App Name").setMaster("local[2]")
JavaSparkContext jsc = new JavaSparkContext(sparkConf)
map() 在java里是mapToPair()。
其他参考Spark Core 06.
一个父RDD的分区最多被子RDD的某个分区只使用一次 (1 --> 1、n --> 1窄;1 --> n 宽)。
计算失败导致需要从上级重新计算时,不至于波及其他分区也重新计算。是pipeline式 *** 作。
明确了父级RDD及父分区。就算子RDD某分区出错,只需要对出错的分区从父级(可能是多个父级,但是也明确的)重新计算即可。
(窄依赖虽然也可能会触发所有父RDD的重新计算,但不会涉及子级的所有分区重新计算,成本节约是体现在子级不是父级的)
不存在shuffle:父RDD的分区只会去到一个子RDD,所有数据不用重新分发,也就没有shuffle。
join可能为窄依赖:join的两个RDD中也是按分区组合的,如rdd1的partition1 只会和 rdd2的partition1合并,而不会涉及到rdd2的partition2。这样也是窄依赖。但凡有任一rdd的任意partiton合并了不止一个partition,都是宽依赖。
(join的源码会先判断当前依赖是OneToOneDependency还是SuffleDependency)
map、filter、union、join(with co-partitioned) :窄依赖
groupByKey、join(with no co-partitioned):宽依赖
宽依赖会波及其他分区(甚至整个RDD)重新计算:因为重新计算了父级,但是这个父级还可能被其他子级RDD分区依赖,导致其他子级分区也需要重新计算。
宽依赖往往对应着shuffle *** 作,需要在运行的过程中将同一个RDD分区传入到不同的RDD分区中。
n 个宽依赖 == n 个shuffle == n+1 个stage。
spark standalone模式spark standalone集群模式,一般不用(当初Spark on Yarn不成熟时才用)。在Spark on Yarn中 Spark只作为客户端提交作业。
部署一个master,一个worker。
参考Spark Core 06.
把作业打包到线上spark服务(Yarn、…)上运行的话,都需要将代码里.setAppName().setMaster(“local[2]”)去掉。约定大于配置。
位置 spark.apache.org/docs/latest/index.html ==> Deploying ==> Submitting Applications
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \ // 指定主类,全限定名
--master local[8] \ // 指定提交方式
--deploy-mode cluster \
--executor-memory 20G \
--conf = \
... # other options
/path/to/examples.jar \ // [作业打成Jar文件的路径](记得将代码里.setAppName().setMaster()去掉)
100 // 其他参数
以local[2]提交到本地执行
spark-submit \
--master local[2] \
--class org.apache.spark.examples.SparkPi \
/home/lib/xxx.jar \
3
提交到standalone
spark-submit \
--master spark://localhost:7077 \ // standalone部署所在的机器
--class org.apache.spark.examples.SparkPi \
/home/lib/xxx.jar \
3
Spark on Yarn
spark-submit \
--master yarn \
--class org.apache.spark.examples.SparkPi \
/home/lib/xxx.jar \
3
Spark 配置&参数
在学习Hive的时候,对Hive配置可以有:
hive-site.xml // 全局配置
hive --hiveconf // 带参数启动
hive> set key=value // 在hive中临时设置
对于spark也是有全局配置和带参数提交任务
spark-default.conf
spark-shell --help // spark-submit 是一样的
spark-default.conf
启动时默认都从这个文件加载。
需要改的默认参数都可以写在这里面,就不用在命令里写了。
如可以将 --master local[2] 也在配置中打开注释,以后提交任务都不用再指定–master
除了默认的spark-default.conf,也可以用 --properties-file 指定其他文件,适合多配置文件按需启动的情况。
spark-shell \
--properties-file spark-xxx.conf
sparl-shell --conf 自定义参数
如果有其他参数从外面传:–conf指定参数,参数key=value格式
sparl-shell --conf spark.xxx=1
// 参数要用spark.作为前缀,否则不识别,并警告:Ignoring non-Spark config property:xxx
// 源码里可以搜到这段内容,并通过源码也发现参数不是以spark.开头的会被移除
spark 指定输入输出这两个参数到hdfs
- 用main 方法实现:main方法接收两个参数,spark-submit 启动传入这两个参数。
val input = arg(0) val output = arg(1)
spark-submit \ ... /input/wc /output/wc
- 用sc.getConf实现:.sc.getConf.get(“spark.xx”) ;然后在 spark-submit 启动时传入两个参数即:
val input = sc.getConf.get("spark.intput") val output = sc.getConf.get("spark.output")
spark-submit \ ... --conf spark.intput=/input/wc \ --conf spark.output=/output/wc
期望用一个action返回两个结果,因为闭包,是无法在driver中定义变量传到executor *** 作后再返回driver的。
// driver中获取累加器
acc = sc.longAccumulator("xx")
// excutor中
{
acc.add()
}
// 在driver中获取累加的结果
acc.value
用计数器时,每一次action后都会触发一次计数器。要么把acc.value放在所有action只有,保证后续不再反复触发。要么使用rdd.cache /rdd.checkPoint()
rdd.cache:把链条断掉,避免重复累加。(cache时注意rdd数据量大小)
自定义累加器参考Spark Core 07.
监控/ Web UI 4040官网介绍,位置 spark.apache.org/docs/latest/index.html ==> More ==> Monitoring
每一个Application都会有一个Web UI,默认端口4040(如果再启一个就是4041):gargantua:4040
页面中有如下信息:
调度程序阶段和任务的列表
RDD大小和内存使用的摘要
环境信息。
正在运行的执行器信息
ou can access this interface by simply opening http://:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).
默认Application只有运行中才能打开对于的4040页面。想要运行结束也记录日志需要在应用启动前将spark.eventLog.enabled 设置为true;
HistoryServer 18080 配置在spark.eventLog.enabled开启后,作业运行的情况都会记录下来(hdfs),支持后期UI界面对其渲染。
-
开启spark.eventLog.enabled,并设置日志保存的目录:
spark-default.conf# 把注释解开就行 spark.eventLog.enabled true spark.eventLog.dir hdfs://gargantua:9000/spark-logs # 写日志的目录 [这个目录需要提前创建]
-
环境变量相关配置 (Environment Variables)
spark-env.sh# 内存 export SPARK_DAEMON_MEMORY=2g # 以spark.history.开头的配置都写在SPARK_HISTORY_OPTS这个变量里,-D开头,多个以空格分隔 (e.g. "-Dx=y -Dk=v") export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://gargantua:9000/spark-logs -Dspark.history.ui.port=18080" # # -Dspark.history.fs.logDirectory=hdfs://gargantua:9000/spark-logs # 读日志的目录,与上述spark.eventLog.dir相对应 # -Dspark.history.ui.port=18080 # HistoryServer启动的端口[默认就是18080]
-
Spark History Server Configuration Options
关于上述 spark.history.开头的其他很多配置...
./sbin/start-history-server.sh
This creates a web interface at http://
没有sc.stop() 的应用不会在History中记录(在4040才可以)。
支持对UI界面自定义日志文件就是Json文件,解析成页面的信息。 那么是可以自行解析渲染出页面的。
查阅REST API部分。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)