spark远程debug之调试spark on yarn 程序

spark远程debug之调试spark on yarn 程序,第1张

简介

由于spark有多种运行模式,远程调试的时候,虽然大体步骤相同,但是还是有小部分需要注意的地方,这里记录一下调试运行在spark on yarn模式下的程序

环境准备

需要完好的Hadoop,spark集群,以便于提交spark on yarn程序。我这里是基于CDH的环境

步骤

1.随便写个spark程序,比如序列化一个集合,然后求和。然后使用maven打包,上传至集群。可以先提交运行一次,确保可以运行成功。

[root@kjtlxsvr5 bin]# ./spark-submit --class cn.spark.study.core.ParallelizeCollection --master yarn-cluster --num-executors 3 --executor-cores 2 --executor-memory 1G --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8787" /home/spark-study-scala-0.0.1-SNAPSHOT-jar-with-dependencies.jar

现在有两个办法可以解决这个问题。

第一个办法是节点少的话,通过修改上面IDEA远程主机地址来一个一个试。

第二办法可以精确知道ApplicationMaster在哪里:

①通过CDH进入yarn的应用程序界面

②然后点击进入该程序的详细信息界面,如下图就可以知道Applicationmaster在哪台NodeManager上:

③可以去该节点查看进程,的确有一个ApplicationMaster,然后在IDEA中修改为该远程主机地址,开始debug程序看源码吧!

创建 maven 工程

使用下面命令创建一个普通的 maven 工程:

bash

$ mvn archetype:generate -DgroupId=com.cloudera.sparkwordcount -DartifactId=sparkwordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

将 sparkwordcount 目录重命名为simplesparkapp,然后,在 simplesparkapp 目录下添加 scala 源文件目录:

bash

$ mkdir -p sparkwordcount/src/main/scala/com/cloudera/sparkwordcount

修改 pom.xml 添加 scala 和 spark 依赖:

xml

<dependencies>

<dependency>

<groupId>org.scala-lang</groupId>

<artifactId>scala-library</artifactId>

<version>2.10.4</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.10</artifactId>

<version>1.2.0-cdh5.3.0</version>

</dependency>

</dependencies>

添加编译 scala 的插件:

xml

<plugin>

<groupId>org.scala-tools</groupId>

<artifactId>maven-scala-plugin</artifactId>

<executions>

<execution>

<goals>

<goal>compile</goal>

<goal>testCompile</goal>

</goals>

</execution>

</executions>

</plugin>

添加 scala 编译插件需要的仓库:

xml

<pluginRepositories>

<pluginRepository>

<id>scala-tools.org</id>

<name>Scala-tools Maven2 Repository</name>

<url>http://scala-tools.org/repo-releases</url>

</pluginRepository>

</pluginRepositories>

另外,添加 cdh hadoop 的仓库:

xml

<repositories>

<repository>

<id>scala-tools.org</id>

<name>Scala-tools Maven2 Repository</name>

<url>http://scala-tools.org/repo-releases</url>

</repository>

<repository>

<id>maven-hadoop</id>

<name>Hadoop Releases</name>

<url>https://repository.cloudera.com/content/repositories/releases/</url>

</repository>

<repository>

<id>cloudera-repos</id>

<name>Cloudera Repos</name>

<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>

</repository>

</repositories>

最后,完整的 pom.xml 文件见: https://github.com/javachen/simplesparkapp/blob/master/pom.xml 。

运行下面命令检查工程是否能够成功编译:

bash

mvn package

编写示例代码

以 WordCount 为例,该程序需要完成以下逻辑:

读一个输入文件

统计每个单词出现次数

过滤少于一定次数的单词

对剩下的单词统计每个字母出现次数

在 MapReduce 中,上面的逻辑需要两个 MapReduce 任务,而在 Spark 中,只需要一个简单的任务,并且代码量会少 90%。

编写 Scala 程序 如下:

scala

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

object SparkWordCount {

def main(args: Array[String]) {

val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))

val threshold = args(1).toInt

// split each document into words

val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))

// count the occurrence of each word

val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

// filter out words with less than threshold occurrences

val filtered = wordCounts.filter(_._2 >= threshold)

// count characters

val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)

System.out.println(charCounts.collect().mkString(", "))

charCounts.saveAsTextFile("world-count-result")

}

}

Spark 使用懒执行的策略,意味着只有当 动作 执行的时候, 转换 才会运行。上面例子中的 动作 *** 作是 collect 和 saveAsTextFile ,前者是将数据推送给客户端,后者是将数据保存到 HDFS。

作为对比, Java 版的程序 如下:

java

import java.util.ArrayList

import java.util.Arrays

import org.apache.spark.api.java.*

import org.apache.spark.api.java.function.*

import org.apache.spark.SparkConf

import scala.Tuple2

public class JavaWordCount {

public static void main(String[] args) {

JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"))

final int threshold = Integer.parseInt(args[1])

// split each document into words

JavaRDD tokenized = sc.textFile(args[0]).flatMap(

new FlatMapFunction() {

public Iterable call(String s) {

return Arrays.asList(s.split(" "))

}

}

)

// count the occurrence of each word

JavaPairRDD counts = tokenized.mapToPair(

new PairFunction() {

public Tuple2 call(String s) {

return new Tuple2(s, 1)

}

}

).reduceByKey(

new Function2() {

public Integer call(Integer i1, Integer i2) {

return i1 + i2

}

}

)

另外, Python 版的程序 如下:

python

import sys

from pyspark import SparkContext

file="inputfile.txt"

count=2

if __name__ == "__main__":

sc = SparkContext(appName="PythonWordCount")

lines = sc.textFile(file, 1)

counts = lines.flatMap(lambda x: x.split(' ')) \

.map(lambda x: (x, 1)) \

.reduceByKey(lambda a, b: a + b) \

.filter(lambda (a, b) : b >= count) \

.flatMap(lambda (a, b): list(a)) \

.map(lambda x: (x, 1)) \

.reduceByKey(lambda a, b: a + b)

print ",".join(str(t) for t in counts.collect())

sc.stop()

编译

运行下面命令生成 jar:

bash

$ mvn package

运行成功之后,会在 target 目录生成 sparkwordcount-0.0.1-SNAPSHOT.jar 文件。

运行

因为项目依赖的 spark 版本是 1.2.0-cdh5.3.0 ,所以下面的命令只能在 CDH 5.3 集群上运行。

首先,将测试文件 inputfile.txt 上传到 HDFS 上;

bash

$ wget https://github.com/javachen/simplesparkapp/blob/master/data/inputfile.txt

$ hadoop fs -put inputfile.txt

其次,将 sparkwordcount-0.0.1-SNAPSHOT.jar 上传到集群中的一个节点;然后,使用 spark-submit 脚本运行 Scala 版的程序:

bash

$ spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2

或者,运行 Java 版本的程序:

bash

$ spark-submit --class com.cloudera.sparkwordcount.JavaWordCount --master local sparkwordcount-0.0.1-SNAPSHOT.jar inputfile.txt 2

对于 Python 版的程序,运行脚本为:

bash

$ spark-submit --master local PythonWordCount.py

如果,你的集群部署的是 standalone 模式,则你可以替换 master 参数的值为 spark://<master host>:<master port>,也可以以 Yarn 的模式运行。

错误的说法是:Spark运行的基本流程是先初始化程序,然后将数据加载到内存中,最后用户可以使用任何算法对数据进行处理。

Spark的基本流程并不是如此简单,它的流程包括:创建Spark上下文,加载数据集,转换数据,使用算法进行分析,将结果输出,最后释放资源。

首先,在Spark程序中,需要考虑创建一个Spark上下文,它是一个运行Spark程序的基本环境,它能够提供Spark程序所需要的一切资源,包括集群管理器、资源管理器、Scheduler等。

其次,需要加载要处理的数据集,这些数据可以从本地文件系统或者远程的HDFS文件系统中获取,并将其加载到Spark中。

接着,将加载的数据转换成可以被Spark处理的数据,这里可以使用Spark的RDD API或者DataFrame API进行数据转换,将数据转换成可以被Spark处理的形式。

然后,可以使用Spark MLlib中提供的各种机器学习算法进行数据分析,计算出分析结果,并将结果输出到指定的文件中。

最后,在程序完成后,需要释放资源,将Spark上下文中加载的数据及各种资源占用情况清空,以便在下次运行时能够重新使用。

因此,以上错误的说法不能概括Spark的基本流程,Spark的基本流程涉及到更多的步骤,如上所述。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/12040057.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-20
下一篇 2023-05-20

发表评论

登录后才能评论

评论列表(0条)

保存