Spark 学习【二】

Spark 学习【二】,第1张

Spark Core [05-07]

两个demo
很多复杂的业务拆分开都是变种的wc:
分组 ==> 变种WC ==> 数据补齐

将不同组的数据按规则合并在一起
split(“,”,0) // 第一个参数指定分隔符,第二个参数限制split后数组长度:

// split(",",3)  限制3个元素,所以从第三个元素开始就不再切分,而是把剩下的所有内容作为一个长字符串元素
String[] split = "1,2,3,4,5,6".split(",",3) ;  
{"1","2","3,4,5,6"}

闭包

rdd1的transiformtion 中不能嵌套另一个rdd2。
可以将rdd2转成scala的数组(触发action),再放到rdd1中使用,能用但本末倒置。

闭包:在driver 中定义的变量,只会传递一个值的副本到 excutor中,对这个值的 *** 作作用域仅限于 excutor内,而无法在 driver中获取 这个变量被excutor改变的值。

例如之前学习将rdd内容写入数据库,获取数据库连接也需要写在 map中,而不能在driver中提前获取连接再传入map,会报错 java.sql.Connection 没有序列化。

在driver端定义的类,如果拿到excutor中执行(涉及网络IO了),也会报错无法序列化。

如果是自己的类无法序列化,那就实现序列化接口,但是框架的类没法修改。

广播变量 Broadcast Varizbles

在MR中,学习了mapjoin 和 reducejoin,mapjoin在spark中就对应BroadcastJoin。
left.join(right) 普通的join,可能有也可能没有shuffle(两个rdd和并到一个rdd)。

广播变量,就是将小数据量的内容广播出去,(在MR中也是把小表的内容丢到分布式缓存),广播变量没有Shuffle。(数据广播出去以后便不会再有数据重新分发)。

未使用广播变量时,数据需要从driver复制到每个executor的每个task中。
广播变量的数据从driver传输到executor的内存中,每个task都从自己节点的executor获取。

从driver传输到executor,必然涉及网络IO,必须实现序列化。(这个变量不经过广播传到executor也会要求序列化)
广播变量是广播只读变量,创建后不可修改。(这个变量不经过广播时传到executor也会因为闭包的原因"无法修改")

所以广播一词的含义体现在executor的每个task都从所在的executor中共享这个变量。

如何使用

可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

这个广播变量被创建以后,那么在集群中的任何函数中,都应该使用广播变量broadcastVar的值,而不是使用v的值,这样就不会把v重复分发到这些节点上。
此外,一旦广播变量创建后,普通变量v的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。https://zhuanlan.zhihu.com/p/354811884

持久化与广播变量区别

rdd.cache() 和 sc.broadcast(v)

持久化 是rdd才有的,把整个rdd数据持久化到个节点的内存中;是节点自己去读取数据放到自己内存,不存在网络IO。
持久化的内容可以修改,但不适合频繁修改(持久化的目的是反复使用,需要频繁修改不叫反复使用)。

广播变量 可以是任何可被序列化的数据,从driver传输到executor。
广播变量是广播只读变量,着眼于读;

累加器 是全局写变量,着眼于写。

https://blog.csdn.net/hellojoy/article/details/113359299

Java 版本的SparkContext

相同包名的java文件和scala文件都会编译到一个目录下。

SparkConf sparkConf = new SparkConf.setAppName("Your App Name").setMaster("local[2]")
JavaSparkContext jsc = new JavaSparkContext(sparkConf)

map() 在java里是mapToPair()。
其他参考Spark Core 06.

宽依赖、窄依赖 窄依赖

一个父RDD的分区最多被子RDD的某个分区只使用一次 (1 --> 1、n --> 1窄;1 --> n 宽)。

计算失败导致需要从上级重新计算时,不至于波及其他分区也重新计算。是pipeline式 *** 作。

明确了父级RDD及父分区。就算子RDD某分区出错,只需要对出错的分区从父级(可能是多个父级,但是也明确的)重新计算即可。
(窄依赖虽然也可能会触发所有父RDD的重新计算,但不会涉及子级的所有分区重新计算,成本节约是体现在子级不是父级的)

不存在shuffle:父RDD的分区只会去到一个子RDD,所有数据不用重新分发,也就没有shuffle。

join可能为窄依赖:join的两个RDD中也是按分区组合的,如rdd1的partition1 只会和 rdd2的partition1合并,而不会涉及到rdd2的partition2。这样也是窄依赖。但凡有任一rdd的任意partiton合并了不止一个partition,都是宽依赖。
(join的源码会先判断当前依赖是OneToOneDependency还是SuffleDependency)

map、filter、union、join(with co-partitioned) :窄依赖
groupByKey、join(with no co-partitioned):宽依赖

宽依赖

宽依赖会波及其他分区(甚至整个RDD)重新计算:因为重新计算了父级,但是这个父级还可能被其他子级RDD分区依赖,导致其他子级分区也需要重新计算。

宽依赖往往对应着shuffle *** 作,需要在运行的过程中将同一个RDD分区传入到不同的RDD分区中。

n 个宽依赖 == n 个shuffle == n+1 个stage。

spark standalone模式

spark standalone集群模式,一般不用(当初Spark on Yarn不成熟时才用)。在Spark on Yarn中 Spark只作为客户端提交作业。

部署一个master,一个worker。
参考Spark Core 06.

Spark作业

把作业打包到线上spark服务(Yarn、…)上运行的话,都需要将代码里.setAppName().setMaster(“local[2]”)去掉。约定大于配置。

位置 spark.apache.org/docs/latest/index.html ==> Deploying ==> Submitting Applications

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \   // 指定主类,全限定名
  --master local[8] \  // 指定提交方式
  --deploy-mode cluster \
  --executor-memory 20G \
  --conf = \
  ... # other options
  /path/to/examples.jar \  // [作业打成Jar文件的路径](记得将代码里.setAppName().setMaster()去掉)
  100  // 其他参数
以local[2]提交到本地执行
spark-submit \
--master local[2] \
--class org.apache.spark.examples.SparkPi \
/home/lib/xxx.jar \
3

提交到standalone
spark-submit \
--master spark://localhost:7077 \   // standalone部署所在的机器
--class org.apache.spark.examples.SparkPi \
/home/lib/xxx.jar \
3

Spark on Yarn
spark-submit \
--master yarn \
--class org.apache.spark.examples.SparkPi \
/home/lib/xxx.jar \
3

Spark 配置&参数

在学习Hive的时候,对Hive配置可以有:

hive-site.xml       // 全局配置
hive --hiveconf     // 带参数启动
hive> set key=value // 在hive中临时设置

对于spark也是有全局配置和带参数提交任务

spark-default.conf 
spark-shell --help // spark-submit 是一样的
spark-default.conf

启动时默认都从这个文件加载。
需要改的默认参数都可以写在这里面,就不用在命令里写了。

如可以将 --master local[2] 也在配置中打开注释,以后提交任务都不用再指定–master

除了默认的spark-default.conf,也可以用 --properties-file 指定其他文件,适合多配置文件按需启动的情况。

spark-shell \
--properties-file spark-xxx.conf
sparl-shell --conf 自定义参数

如果有其他参数从外面传:–conf指定参数,参数key=value格式

sparl-shell --conf spark.xxx=1  
// 参数要用spark.作为前缀,否则不识别,并警告:Ignoring non-Spark config property:xxx 
// 源码里可以搜到这段内容,并通过源码也发现参数不是以spark.开头的会被移除
spark 指定输入输出这两个参数到hdfs
  1. 用main 方法实现:main方法接收两个参数,spark-submit 启动传入这两个参数。
    val input = arg(0)
    val output = arg(1)
    
    spark-submit \
    ...
    /input/wc /output/wc
    
  2. 用sc.getConf实现:.sc.getConf.get(“spark.xx”) ;然后在 spark-submit 启动时传入两个参数即:
    val input = sc.getConf.get("spark.intput")
    val output = sc.getConf.get("spark.output")
    
    spark-submit \
    ...
    --conf spark.intput=/input/wc \
    --conf spark.output=/output/wc 
    
计数器 也叫 Accumulators/ 累加器。

期望用一个action返回两个结果,因为闭包,是无法在driver中定义变量传到executor *** 作后再返回driver的。

// driver中获取累加器
acc = sc.longAccumulator("xx")
// excutor中
{
    acc.add()
}
// 在driver中获取累加的结果
acc.value

用计数器时,每一次action后都会触发一次计数器。要么把acc.value放在所有action只有,保证后续不再反复触发。要么使用rdd.cache /rdd.checkPoint()

rdd.cache:把链条断掉,避免重复累加。(cache时注意rdd数据量大小)

自定义累加器

参考Spark Core 07.

监控/ Web UI 4040

官网介绍,位置 spark.apache.org/docs/latest/index.html ==> More ==> Monitoring

每一个Application都会有一个Web UI,默认端口4040(如果再启一个就是4041):gargantua:4040
页面中有如下信息:

调度程序阶段和任务的列表
RDD大小和内存使用的摘要
环境信息。
正在运行的执行器信息

ou can access this interface by simply opening http://:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).

默认Application只有运行中才能打开对于的4040页面。想要运行结束也记录日志需要在应用启动前将spark.eventLog.enabled 设置为true;

HistoryServer 18080 配置

在spark.eventLog.enabled开启后,作业运行的情况都会记录下来(hdfs),支持后期UI界面对其渲染。

  1. 开启spark.eventLog.enabled,并设置日志保存的目录:
    spark-default.conf

    # 把注释解开就行
    spark.eventLog.enabled	true
    spark.eventLog.dir	hdfs://gargantua:9000/spark-logs   # 写日志的目录 [这个目录需要提前创建]
    
  2. 环境变量相关配置 (Environment Variables)
    spark-env.sh

    # 内存
    export SPARK_DAEMON_MEMORY=2g
    
    # 以spark.history.开头的配置都写在SPARK_HISTORY_OPTS这个变量里,-D开头,多个以空格分隔  (e.g. "-Dx=y -Dk=v")
    export SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://gargantua:9000/spark-logs -Dspark.history.ui.port=18080"
    
    #
    # -Dspark.history.fs.logDirectory=hdfs://gargantua:9000/spark-logs   # 读日志的目录,与上述spark.eventLog.dir相对应
    # -Dspark.history.ui.port=18080 # HistoryServer启动的端口[默认就是18080]
    
  3. Spark History Server Configuration Options

    关于上述 spark.history.开头的其他很多配置...
    
启动HistoryServer
./sbin/start-history-server.sh

This creates a web interface at http://:18080 by default。不管是local运行的还是提交到Yarn运行的作业,都会在HistoryServer中展示。

没有sc.stop() 的应用不会在History中记录(在4040才可以)。

支持对UI界面自定义

日志文件就是Json文件,解析成页面的信息。 那么是可以自行解析渲染出页面的。
查阅REST API部分。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/720417.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存