Spark-shell 参数
Spark-shell 是以一种交互式命令行方式将Spark应用程序跑在指定模式上,也可以通过Spark-submit提交指定运用程序,Spark-shell 底层调用的是Spark-submit,二者的使用参数一致的,通过- -help 查看参数:
sparkconf的传入有三种方式:
1.通过在spark应用程序开发的时候用set()方法进行指定
2.通过在spark应用程序提交的时候用过以上参数指定,一般使用此种方式,因为使用较为灵活
3.通过配置spark-default.conf,spark-env.sh文件进行指定,此种方式较shell方式级别低
Local模式
Local 模式是最简单的一种Spark运行方式,它采用单节点多线程(cpu)方式运行,local模式是一种OOTB(开箱即用)的方式,只需要在spark-env.sh导出JAVA_HOME,无需其他任何配置即可使用,因而常用于开发和学习
方式:./spark-shell - -master local[n] ,n代表线程升哗族数
Standalone模式
Spark on Yarn
on Yarn的俩种模式
客户端的Driver将应用提交给Yarn后,Yarn会先后启动ApplicationMaster和excutor,另外ApplicationMaster和executor都装在在container里运行,container默认的内存是1g,ApplicationMaster分配的内存是driver-memory,executor分配的内存是executor-memory.同时,因为Driver在客户端,所以程序的运行结果可以在客户端显示,Driver以进程名为SparkSubmit的形式存在。
Cluster 模式
1.由client向ResourceManager提交请求,并上传Jar到HDFS上
这期间包括吵弊四个步骤:
a).连接到RM
b).从RM ASM(applicationsManager)中获得metric,queue和resource等信息。
c).upload app jar and spark-assembly jar
d).设置运行环境和container上下文
2.ResourceManager向NodeManager申请资源,创建Spark ApplicationMaster(每个SparkContext都有一个ApplicationManager)
3.NodeManager启动Spark App Master,并向ResourceManager ASM注册
4.Spark ApplicationMaster从HDFS中找到jar文件,启动DAGScheduler和YARN Cluster Scheduler
5.ResourceManager向ResourceManager ASM注册申请container资源(INFO YarnClientImpl: Submitted application)
6.ResourceManager通知NodeManager分配Container,这是可以收到来自ASM关于container的报告。(每个container的对应一个executor)
7.Spark ApplicationMaster直接和container(executor)进行交互,完成这个分布式任务。
进芦樱入spark安装目录下的conf文件夹
[atguigu@hadoop102 module] mv slaves.template slaves
[atguigu@hadoop102 conf] vim slaves
hadoop102
hadoop103
hadoop104
4)修改spark-env.sh文件,添加如下配置:
[atguigu@hadoop102 conf]$ vim spark-env.sh
SPARK_MASTER_HOST=hadoop102
SPARK_MASTER_PORT=7077
5)分发spark包
[atguigu@hadoop102 module] sbin/start-all.sh
注意:如果遇到 “JAVA_HOME not set” 异常,可以在sbin目录下的spark-config.sh 文件中加入如下配置:
export JAVA_HOME=XXXX
官方求PI案例
spark-submit
--class org.apache.spark.examples.SparkPi
--master spark://server-2:7077
--executor-memory 1G
--total-executor-cores 2
/home/xxx/software/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar
100
spark-shell
--master spark://server-2:7077
--executor-memory 1g
--total-executor-cores 2
spark-shell --master spark://server-2:7077 --executor-memory 1g --total-executor-cores 2
参数:--master spark://server-2:7077 指定要连接的集群的master
Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。
yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出
yarn-cluster:Driver程序运行在由RM(ResourceManager)启动的AP(APPMaster)适用于生产环境。
安装使用
1)修改hadoop配置文件yarn-site.xml,添加如下内容:
2)修改spark-env.sh,添加如下配置:
[atguigu@hadoop102 conf]$ vi spark-env.sh
YARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop
3)分发配置文件
[atguigu@hadoop102 conf] xsync spark-env.sh
4)执行一个程序
spark-submit
--class org.apache.spark.examples.SparkPi
--master yarn
--deploy-mode client
/home/xxx/software/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar
100
注意:在提交任务之前需启动HDFS以及YARN集群。
日志查看
修改配置文件spark-defaults.conf
添加如下内容:
spark.yarn.historyServer.address=server-2:18080
spark.history.ui.port=18080
2)重启spark历史服务
[atguigu@hadoop102 spark] sbin/start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/module/spark/logs/spark-atguigu-org.apache.spark.deploy.history.HistoryServer-1-hadoop102.out
3)提交任务到Yarn执行
spark-submit
--class org.apache.spark.examples.SparkPi
--master yarn
--deploy-mode client
/home/xxx/software/spark-2.4.4-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.4.jar
100
这个兆差错误通常是由于Java或Spark配置问题引起的。下面是几种可能的解决方法:
检查Java是否正确安装并配置在系统环境变量中,确认环境变量JAVA_HOME的值是否正确,可以尝试在终端窗口中运行java命令来确认Java是否可誉猜旅以正常运行。
确认Spark的版本与Java版本是否兼容,可以通过在Spark官方网站查看Spark版本和Java版本之间的兼容性矩阵。
检查防火墙或网络代理是否阻止Spark与Java通信。尝试关闭防火墙或暂庆凳时禁用网络代理,然后再次运行代码。
尝试在SparkConf中设置spark.driver.host属性,如下所示:
pythonCopy codeconf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") \.set("spark.driver.host", "127.0.0.1")
这将指示Spark驱动程序使用指定的IP地址作为其主机名,可以尝试设置为本地IP地址。
尝试删除.pycache目录。如果您使用Python 3,可能会在代码文件所在的目录中找到.pycache目录,其中可能会包含与Spark上下文相关的缓存文件。尝试删除.pycache目录并重新运行代码。
在每个 RDD 上的 *** 作都被映射到了 RDD 上的所有分区中的每个元素,所以如果能将几个 *** 作合并到一起,就可以减少分区数量。例如,可以将 rdd2 和 rdd3 合并成一个 *** 作。
在 rdd1 上的 filter *** 作中,如果第一行是文件中的第一行,那么这个 filter *** 作将会在所有分区中进行,因此可以将其改带含为在读取文件时直接跳过第一行。
在 rdd4 上的 reduceByKey *** 作之后,可以考虑使用 sortByKey *** 作来将结果按照键排序,而不是在最后使用 collect *** 作来将所有数据加袜行余载到内存中并排序。
最后优化告滚后的代码如下:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('appName').setMaster('Local[4]')
sc = SparkContext.getOrCreate(conf)
rdd = sc.textFile('books.txt', 4)
rdd1 = rdd.map(lambda line: line.split('\t')) \
.filter(lambda x: x[0] != 'title') \
.map(lambda x: (x[1], 1)) \
.reduceByKey(lambda x, y: x + y) \
.sortByKey()
res = rdd1.collect()
print(res)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)