HUE通过oozie调度spark2 程序on yarn的两种方法

HUE通过oozie调度spark2 程序on yarn的两种方法,第1张

chd 5.12.3

hadoop 2.6.0

oozie 4.1.0

spark 2.3.0

spark1(集群自带的可以直接添加jar)做任务调度

spark2 需要进行配置后(添加spark2 所需要的jar到oozie 的sharelib中)才能进行调度

详细步骤可以参考:

https://blog.csdn.net/worldchinalee/article/details/80594593

特别注意事项1:

a、spark程序jar包路径,因为jar是传到HDFS上面的,cluster方式提交的时候,jar name栏目需要写jar包再hdfs上面的全路径,

见上图方框中内容。

b、需改集群oozie配置项   Spark on Yarn 服务   改为 none  ,默认是 yarn。不然运行时候会报错。

e.g   hdfs://nameserviceHa/user/hue/oozie/workspaces/hue-oozie-1505120868.97/spark-examples_2.11-2.3.0.cloudera2.jar

特别注意事项2:运行spark程序时候,oozie自动默认spark。需要通过配置参数  oozie.action.sharelib.for.spark  设置为spark2,指定运spark时候添加的jar包为spark2.。

小伙伴会疑问为社么直接通过通过shell小组件调用shell脚本?

shell组件的中的shell脚本不支持交互是查询。ssh远程命令不支持,如果spark环境不在oozie组件的主机上,此方法行不通。

*********************************下面重点介绍怎么通过ssh远程执行shell命令***********************************

使用oozie提供的小组件

现在有几个问题需要解决:

1、oozie调度时候使用的是oozie账户,当你执行ssh时候发现回报登陆错误。解决办法配置免密

2、怎么配置免密

su oozie 的时候报以下提示,因为chd生成的用户名问题,本人再这里花费了很多时间处理此问题。

This account is currently not available.

解决办法:使用sudo -u oozie 命令执行免密登陆设置

sudo -u oozie ssh-keygen

生成秘钥

cat /var/lib/oozie/.ssh/id_rsa.pub >>/root/.ssh/authorized_keys  (写入的是你需要免密的服务器,这里展示的是本机写法)

验证免密登录

到这里,你就可以随心所欲的书写脚本,并存到再服务器。通过oozie  ssh远程执行此脚本,以此达到调度的效果。

创建一个工作流定义:workflow.xml

下面这个简单的工作流定义了执行一个 Spark 作业的配置方法:

<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkWordCount'>

<start to='spark-node' />

<action name='spark-node'>

<spark xmlns="uri:oozie:spark-action:0.1">

<job-tracker>${jobTracker}</job-tracker>

<name-node>${nameNode}</name-node>

<prepare>

<delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data"/>

</prepare>

<master>${master}</master>

<name>Spark-Wordcount</name>

<class>com.ibm.biginsights.oozie.examples.WordCountSparkMain</class>

<jar>${nameNode}/user/${wf:user()}/${examplesRoot}/lib/examples-1.0.jar</jar>

<spark-opts>–conf spark.driver.extraJavaOptions=-Diop.version=4.1.0.0</spark-opts>

<arg>${nameNode}/user/${wf:user()}/${examplesRoot}/input-data</arg>

<arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output-data</arg>

</spark>

<ok to="end" />

<error to="fail" />

</action>

<kill name="fail">

<message>Workflow failed, error

message[${wf:errorMessage(wf:lastErrorNode())}]

</message>

</kill>

<end name='end' />

</workflow-app>

一些元素定义如下:

有关 Oozie 中的 Spark XML 模式的详细信息,请参阅https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html

prepare

元素指定一个要在启动作业之前删除或创建的路径列表。这些路径必须以 hdfs://host_name:port_number 开头。

master

元素指定 Spark Master 的 URL;例如

spark://host:port、mesos://host:port、yarn-cluster、yarn-master 或 local。对于

Spark on YARN 模式,在 master 元素中指定的 yarn-client 或

yarn-cluster。在这个示例中,master=yarn-cluster。

name

元素指定 Spark 应用程序的名称。

class

元素指定 Spark 应用程序的主要类。

jar

元素指定一个逗号分隔的 JAR 文件列表。

spark-opts

元素(如果存在)包含一个可通过指定 ‘-conf key=value’ 传递给 Spark 驱动程序的 Spark 配置选项列表。

arg

元素包含可传递给 Spark 应用程序的参数。

创建一个 Oozie 作业配置:job.properties

nameNode=hdfs://nn:8020

jobTracker=rm:8050

master=yarn-cluster

queueName=default

examplesRoot=spark-example

oozie.use.system.libpath=true

oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}

创建一个 Oozie 应用程序目录

创建一个包含工作流定义和资源的应用程序目录结构,如下面的示例所示:

+-~/spark-example/

+-job.properties

+-workflow.xml

+-lib/

+-example-1.0.jar

example-1.0.jar 文件包含 Spark 应用程序。

下载 spark-assembly.jar 文件

浏览 HDFS NameNode 用户界面,下载 spark-assembly.jar 文件。

从 Ambari 控制台中选择 HDFS

,然后选择 Quick Links

–> NameNode UI

单击 Utilities

–> Browse the file system

在 Hadoop 文件资源管理器中,导航到 /iop/apps/4.1.0.0/spark/jars,选择 spark-assembly.jar,单击 Download

并保存该文件。

将下载的 spark-assembly.jar 文件转移到 lib 目录,这会得到以下目录结构:

+-~/spark-example/

+-job.properties

+-workflow.xml

+-lib/

+-example-1.0.jar

+-spark-assembly.jar

将应用程序复制到 HDFS

将 spark-example/ 目录复制到 HDFS 中的用户 HOME 目录。确保 HDFS 中的 spark-example 位置与 job.properties 中的 oozie.wf.application.path 值匹配。

$ hadoop fs -put spark-example spark-example

运行示例作业

运行以下命令来提交 Oozie 作业:

$cd ~/spark-example

$oozie job -oozie http://oozie-host:11000/oozie -config ./job.properties –run

job: 0000012-151103233206132-oozie-oozi-W

检查工作流作业状态:

$ oozie job –oozie http://oozie-host:11000/oozie -info 0000012-151103233206132-oozie-oozi-W

Job ID : 0000012-151103233206132-oozie-oozi-W

————————————————————————————————————————————

Workflow Name : SparkWordCount

App Path : hdfs://bdvs1211.svl.ibm.com:8020/user/root/spark-example

Status : SUCCEEDED

Run : 0

User : root

Group : –

Created : 2015-11-04 15:19 GMT

Started : 2015-11-04 15:19 GMT

Last Modified : 2015-11-04 15:23 GMT

Ended : 2015-11-04 15:23 GMT

CoordAction ID: –

Actions

————————————————————————————————————————————

ID Status Ext ID Ext Status Err Code

————————————————————————————————————————————

0000012-151103233206132-oozie-oozi-W@:start: OK – OK –

0000012-151103233206132-oozie-oozi-W@spark-node OK job_1446622088718_0022 SUCCEEDED –

0000012-151103233206132-oozie-oozi-W@end OK – OK –

————————————————————————————————————————————

完整的 Java 程序

public static void main(String[] args) {

if (args.length <2) {

System.err.println("Usage: WordCountSparkMain <file><file>")

System.exit(1)

}

String inputPath = args[0]

String outputPath = args[1]

SparkConf sparkConf = new SparkConf().setAppName("Word count")

try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {

JavaRDD<String>lines = ctx.textFile(inputPath, 1)

JavaRDD<String>words = lines.flatMap(new FlatMapFunction<String,

String>() {

private static final long serialVersionUID = 1L

public Iterable<String>call(String sentence) {

List<String>result = new ArrayList<>()

if (sentence != null) {

String[] words = sentence.split(" ")

for (String word : words) {

if (word != null &&word.trim().length() >0) {

result.add(word.trim().toLowerCase())

}

}

}

return result

}

})

JavaPairRDD<String, Integer>pairs = words.mapToPair(new

PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L

public Tuple2<String, Integer>call(String s) {

return new Tuple2<>(s, 1)

}

})

JavaPairRDD<String, Integer>counts = pairs.reduceByKey(new

Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L

public Integer call(Integer a, Integer b) {

return a + b

}

}, 2)

JavaPairRDD<Integer, String>countsAfterSwap = counts.mapToPair(new

PairFunction<Tuple2<String, Integer>, Integer, String>() {

private static final long serialVersionUID = 2267107270683328434L

@Override

public Tuple2<Integer, String>call(Tuple2<String, Integer>t)

throws Exception {

return new Tuple2<>(t._2, t._1)

}

})

countsAfterSwap = countsAfterSwap.sortByKey(false)

counts = countsAfterSwap.mapToPair(new PairFunction<Tuple2<Integer,

String>, String, Integer>() {

private static final long serialVersionUID = 2267107270683328434L

@Override

public Tuple2<String, Integer>call(Tuple2<Integer, String>t)

throws Exception {

return new Tuple2<>(t._2, t._1)

}

})

JavaRDD<String>results = counts.map(new Function<Tuple2<String,

Integer>, String>() {

@Override

public String call(Tuple2<String, Integer>v1) throws Exception {

return String.format("%s,%s", v1._1, Integer.toString(v1._2))

}

})

results.saveAsTextFile(outputPath)

}

}

}


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/11955132.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-19
下一篇 2023-05-19

发表评论

登录后才能评论

评论列表(0条)

保存