这个错误通常是由于Java或Spark配置问题引起的。下面是几种可能的解决方法:
检查Java是否正确安装并配置在系统环境变量中,确认环境变量JAVA_HOME的值是否正确,可以尝试在终端窗口中运行java命令来确认Java是否可以正常运行。
确认Spark的版本与Java版本是否兼容,可以通过在Spark官方网站查看Spark版本和Java版本之间的兼容性矩阵。
检查防火墙或网络代理是否阻止Spark与Java通信。尝试关闭防火墙或暂时禁用网络代理,然后再次运行代码。
尝试在SparkConf中设置sparkdriverhost属性,如下所示:
pythonCopy codeconf = SparkConf()setMaster("local[]")setAppName("test_spark_app") \set("sparkdriverhost", "127001")
这将指示Spark驱动程序使用指定的IP地址作为其主机名,可以尝试设置为本地IP地址。
尝试删除pycache目录。如果您使用Python 3,可能会在代码文件所在的目录中找到pycache目录,其中可能会包含与Spark上下文相关的缓存文件。尝试删除pycache目录并重新运行代码。
搭建Hadoop集群环境一般建议三个节点以上,一个作为Hadoop的NameNode节点。另外两个作为DataNode节点。在本次实验中,采用了三台CentOS 75作为实验环境。
将所需要的java 文件解压到合适的目录,并将整个java 目录添加进 /etc/profile ,并 source /etc/profile
需要说明的是ssh免密登录的配置不是双向的,是单向的。也就是说,每个节点都需要和另外两个节点进行ssh的免密配置。
此时会在用户目录的 ssh 下,生成秘钥文件。现在需要将此验证文件拷贝至slave1节点,
在 /etc/profile 目录下追加:
vim /home/postgres/hadoop/hadoop-330/etc/hadoop/hadoop-envsh 修改配置文件java路径
vim /home/postgres/hadoop/hadoop-330/etc/hadoop/core-sitexml 修改core-site文件
vim /home/postgres/hadoop/hadoop-330/etc/hadoop/hdfs-sitexml 修改hdfs-site文件
vim /home/postgres/hadoop/hadoop-330/etc/hadoop/yarn-sitexml 修改yarn-site文件
vim /home/postgres/hadoop/hadoop-330/etc/hadoop/mapred-sitexml 修改mapred-site文件
vim /home/postgres/hadoop/hadoop-330/etc/hadoop/slaves 修改slaves文件
手动创建文件夹:/home/postgres/hadoop/hdfs/ logs 和 data 目录,并分配777权限。
在hadoop初始化启动后,在master上xxx/name/namesecondary/下会自动创建/current/VERSION文件路径。
在master运行: hadoop namenode -format
如果有必要,运行DataNode命令: hadoop datanode -format
master+slave1+slave2启动集群: start-allsh
master : jps
slave1 : jps
slave2 : jps
hadoop dfsadmin -report
按照上述的配置情况:一个namenode节点,两个datanode节点,整个集群监控情况如下:
输入 >
Spark作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。
在执行Spark的应用程序时,Spark集群会启动Driver和Executor两种JVM进程:
Spark管理的内存主要划分为4个区域:
Executor作为一个JVM进程,它的内存管理建立在JVM的内存管理之上,Spark对JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
堆内内存的大小,由 Spark 应用程序启动时的 executor-memory 或 sparkexecutormemory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不同的管理模式下,这三部分占用的空间大小各不相同。
Spark 对堆内内存的管理是一种逻辑上的"规划式"的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存,我们来看其具体流程:
为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 20 开始),在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现,Spark 可以直接 *** 作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
在默认情况下堆外内存并不启用,可通过配置 sparkmemoryoffHeapenabled 参数启用,并由 sparkmemoryoffHeapsize 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
Spark 16 之后默认为统一管理(UnifiedMemoryManager)方式,16 之前采用的静态管理(StaticMemoryManager)方式仍被保留,可通过配置 sparkmemoryuseLegacyMode=true 参数启用静态内存管理方式。下面我们介绍下两种内存管理模型的进化。
在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如下所示:
Spark 16 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域。如下图所示:
其中最重要的优化在于动态占用机制,其规则如下:
新的版本引入了新的配置项:
凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。譬如,所以如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的。所以要想充分发挥 Spark 的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。
Spark on YARN模式下,有Driver、ApplicationMaster、Executor三种进程。在任务调度和运行的过程中,Driver和Executor承担了很大的责任,而ApplicationMaster主要负责container的启停。
因而Driver和Executor的参数配置对spark应用的执行有着很大的影响意义。用户可通过如下 *** 作对Spark集群性能做优化。
1 配置Driver内存。
Driver负责任务的调度,和Executor、AM之间的消息通信。当任务数变多,任务平行度增大时,Driver内存都需要相应增大。可以根据实际任务数量的多少,为Driver设置一个合适的内存。
● 将“spark-defaultsconf”中的“sparkdrivermemory”配置项或者“spark-envsh”中的“SPARK_DRIVER_MEMORY”配置项设置为合适大小。
● 在使用spark-submit命令时,添加“--driver-memory MEM”参数设置内存。
2 配置Executor个数。
每个Executor每个核同时能跑一个task,所以增加了Executor的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加Executor的个数,以提高运行效率。
● 将 “spark-defaultsconf” 中的 “sparkexecutorinstance” 配置项或者 “spark-envsh” 中的 “SPARK_EXECUTOR_INSTANCES” 配置项设置为合适大小。还可以设置动态资源调度功能进行优化,详情请参见 >
这里说明一点:本文提到的解决 Spark insertIntoJDBC找不到Mysql驱动的方法是针对单机模式(也就是local模式)。在集群环境下,下面的方法是不行的。这是因为在分布式环境下,加载mysql驱动包存在一个Bug,13及以前的版本 --jars 分发的jar在executor端是通过 Spark自身特化的classloader加载的。而JDBC driver manager使用的则是系统默认的classloader,因此无法识别。可行的方法之一是在所有 executor 节点上预先装好JDBC driver并放入默认的classpath。
不过Spark 14应该已经fix了这个问题,即 --jars 分发的 jar 也会纳入 YARN 的 classloader 范畴。
今天在使用Spark中DataFrame往Mysql中插入RDD,但是一直报出以下的异常次信息:
01
[itelbog@iteblog~]$ bin/spark-submit --master local[2]
02
--jars lib/mysql-connector-java-5135jar
03
--class sparksparkToJDBC /spark-test_210-10jar
04
05
spark assembly has been built with Hive, including Datanucleus jars on classpath
06
Exception in thread "main" javasqlSQLException: No suitable driver found for
07
jdbc:mysql://>
08
true&characterEncoding=utf8&autoReconnect=true
09
at javasqlDriverManagergetConnection(DriverManagerjava:602)
10
at javasqlDriverManagergetConnection(DriverManagerjava:207)
11
at orgapachesparksqlDataFramecreateJDBCTable(DataFramescala:1189)
12
at sparkSparkToJDBC$toMysqlFromJavaBean(SparkToJDBCscala:20)
13
at sparkSparkToJDBC$main(SparkToJDBCscala:47)
14
at sparkSparkToJDBCmain(SparkToJDBCscala)
15
at sunreflectNativeMethodAccessorImplinvoke0(Native Method)
16
at sunreflectNativeMethodAccessorImplinvoke(NativeMethodAccessorImpljava:39)
17
at sunreflectDelegatingMethodAccessorImplinvoke(DelegatingMethodAccessorImpljava:25)
18
at javalangreflectMethodinvoke(Methodjava:597)
19
at orgapachesparkdeploySparkSubmit$org$apache$spark$deploy$SparkSubmit$
20
$runMain(SparkSubmitscala:569)
21
at orgapachesparkdeploySparkSubmit$doRunMain$1(SparkSubmitscala:166)
22
at orgapachesparkdeploySparkSubmit$submit(SparkSubmitscala:189)
23
at orgapachesparkdeploySparkSubmit$main(SparkSubmitscala:110)
24
at orgapachesparkdeploySparkSubmitmain(SparkSubmitscala)
感觉很奇怪,我在启动作业的时候加了Mysql驱动啊在,怎么会出现这种异常呢??经过查找,发现在--jars参数里面加入Mysql是没有用的。通过查找,发现提交的作业可以通过加入--driver-class-path参数来设置driver的classpath,试了一下果然没有出现错误!
1
[itelbog@iteblog ~]$ bin/spark-submit --master local[2]
2
--driver-class-path lib/mysql-connector-java-5135jar
3
--class sparkSparkToJDBC /spark-test_210-10jar
其实,我们还可以在spark安装包的conf/spark-envsh通过配置SPARK_CLASSPATH来设置driver的环境变量,如下:
(这里需要注意的是,在Spark13版本中,在Spark配置中按如下进行配置时,运行程序时会提示该配置方法在Spark10之后的版本已经过时,建议使用另外两个方法;其中一个就是上面讲到的方法。另外一个就是在配置文件中配置sparkexecutorextraClassPath,具体配置格式会在试验之后进行补充)
1
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/iteblog/com/mysql-connector-java-5135jar
这样也可以解决上面出现的异常。但是,我们不能同时在conf/spark-envsh里面配置SPARK_CLASSPATH和提交作业加上--driver-class-path参数,否则会出现以下异常:
查看源代码打印帮助
01
[itelbog@iteblog~]$ bin/spark-submit --master local[2]
02
--driver-class-path lib/mysql-connector-java-5135jar
03
--class sparkSparkToJDBC /spark-test_210-10jar
04
05
Spark assembly has been built with Hive, including Datanucleus jars on classpath
06
Exception in thread "main"orgapachesparkSparkException:
07
Found both sparkdriverextraClassPath and SPARK_CLASSPATH Use only the former
08
at orgapachesparkSparkConf$$anonfun$validateSettings$6$$anonfun$apply
09
$7apply(SparkConfscala:339)
10
at orgapachesparkSparkConf$$anonfun$validateSettings$6$$anonfun$apply
11
$7apply(SparkConfscala:337)
12
at scalacollectionimmutableListforeach(Listscala:318)
13
at orgapachesparkSparkConf$$anonfun$validateSettings$6apply(SparkConfscala:337)
14
at orgapachesparkSparkConf$$anonfun$validateSettings$6apply(SparkConfscala:325)
15
at scalaOptionforeach(Optionscala:236)
16
at orgapachesparkSparkConfvalidateSettings(SparkConfscala:325)
17
at orgapachesparkSparkContext<init>(SparkContextscala:197)
18
at sparkSparkToJDBC$main(SparkToJDBCscala:41)
19
at sparkSparkToJDBCmain(SparkToJDBCscala)
20
at sunreflectNativeMethodAccessorImplinvoke0(Native Method)
21
at sunreflectNativeMethodAccessorImplinvoke(NativeMethodAccessorImpljava:39)
22
at sunreflectDelegatingMethodAccessorImplinvoke(DelegatingMethodAccessorImpljava:25)
23
at javalangreflectMethodinvoke(Methodjava:597)
24
at orgapachesparkdeploySparkSubmit$org$apache$spark$
25
deploy$SparkSubmit$$runMain(SparkSubmitscala:569)
26
at orgapachesparkdeploySparkSubmit$doRunMain$1(SparkSubmitscala:166)
27
at orgapachesparkdeploySparkSubmit$submit(SparkSubmitscala:189)
28
at orgapachesparkdeploySparkSubmit$main(SparkSubmitscala:110)
29
at orgapachesparkdeploySparkSubmitmain(SparkSubmitscala)
共享变量
通常情况下,当向Spark *** 作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传。在任务之间使用通用的,支持读写的共享变量是低效的。尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。
广播变量
广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。
Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌 *** 作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。
通过在一个变量v上调用SparkContextbroadcast(v)可以创建广播变量。广播变量是围绕着v的封装,可以通过value方法访问这个变量。举例如下:
scala> val broadcastVar = scbroadcast(Array(1, 2, 3))
broadcastVar: orgapachesparkbroadcastBroadcast[Array[Int]] = Broadcast(0)
scala> broadcastVarvalue
res0: Array[Int] = Array(1, 2, 3)
在创建了广播变量之后,在集群上的所有函数中应该使用它来替代使用v这样v就不会不止一次地在节点之间传输了。另外,为了确保所有的节点获得相同的变量,对象v在被广播之后就不应该再修改。
累加器
累加器是仅仅被相关 *** 作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于python还不支持)
累加器通过对一个初始化了的变量v调用SparkContextaccumulator(v)来创建。在集群上运行的任务可以通过add或者"+="方法在累加器上进行累加 *** 作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。
下面的代码展示了如何把一个数组中的所有元素累加到累加器上:
scala> val accum = scaccumulator(0, "My Accumulator")
accum: sparkAccumulator[Int] = 0
scala> scparallelize(Array(1, 2, 3, 4))foreach(x => accum += x)
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0317106 s
scala> accumvalue
res2: Int = 10
尽管上面的例子使用了内置支持的累加器类型Int,但是开发人员也可以通过继承AccumulatorParam类来创建它们自己的累加器类型。AccumulatorParam接口有两个方法:
zero方法为你的类型提供一个0值。
addInPlace方法将两个值相加。
假设我们有一个代表数学vector的Vector类。我们可以向下面这样实现:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vectorzeros(initialValuesize)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = scaccumulator(new Vector())(VectorAccumulatorParam)
在Scala里,Spark提供更通用的累加接口来累加数据,尽管结果的类型和累加的数据类型可能不一致(例如,通过收集在一起的元素来创建一个列表)。同时,SparkContextaccumulableCollection方法来累加通用的Scala的集合类型。
累加器仅仅在动作 *** 作内部被更新,Spark保证每个任务在累加器上的更新 *** 作只被执行一次,也就是说,重启任务也不会更新。在转换 *** 作中,用户必须意识到每个任务对累加器的更新 *** 作可能被不只一次执行,如果重新执行了任务和作业的阶段。
累加器并没有改变Spark的惰性求值模型。如果它们被RDD上的 *** 作更新,它们的值只有当RDD因为动作 *** 作被计算时才被更新。因此,当执行一个惰性的转换 *** 作,比如map时,不能保证对累加器值的更新被实际执行了。下面的代码片段演示了此特性:
val accum = scaccumulator(0)
datamap { x => accum += x; f(x) }
//在这里,accum的值仍然是0,因为没有动作 *** 作引起map被实际的计算
Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法
数据科学家为了回答一个问题或进行深入研究,会使用相关的技术分析数据。通常,他们的工作包含特殊的分析,所以他们使用交互式shell,以使得他们能在最短的时间内看到查询结果和代码片段。Spark的速度和简单的API接口很好地符合这个目标,它的内建库意味着很多算法可以随时使用。
Spark通过若干组件支持不同的数据科学任务。Spark shell使得用Python或Scala进行交互式数据分析变得简单。Spark SQL也有一个独立的SQL shell,可以用SQL进行数据分析,也可以在Spark程序中或Spark shell中使用Spark SQL。MLlib库支持机器学习和数据分析。而且,支持调用外部的MATLAB或R语言编写的程序。Spark使得数据科学家可以用R或Pandas等工具处理包含大量数据的问题。
以上就是关于RuntimeError: Java gateway process exited before sending its port number,求大佬解答全部的内容,包括:RuntimeError: Java gateway process exited before sending its port number,求大佬解答、Spark + Kafka大数据环境的搭建和示例的简单运行、Spark原理 | 内存管理等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)