hadoop 2.6.0
oozie 4.1.0
spark 2.3.0
spark1(集群自带的可以直接添加jar)做任务调度
spark2 需要进行配置后(添加spark2 所需要的jar到oozie 的sharelib中)才能进行调度
详细步骤可以参考:
https://blog.csdn.net/worldchinalee/article/details/80594593
特别注意事项1:
a、spark程序jar包路径,因为jar是传到HDFS上面的,cluster方式提交的时候,jar name栏目需要写jar包再hdfs上面的全路径,
见上图方框中内容。
b、需改集群oozie配置项 Spark on Yarn 服务 改为 none ,默认是 yarn。不然运行时候会报错。
e.g hdfs://nameserviceHa/user/hue/oozie/workspaces/hue-oozie-1505120868.97/spark-examples_2.11-2.3.0.cloudera2.jar
特别注意事项2:运行spark程序时候,oozie自动默认spark。需要通过配置参数 oozie.action.sharelib.for.spark 设置为spark2,指定运spark时候添加的jar包为spark2.。
小伙伴会疑问为社么直接通过通过shell小组件调用shell脚本?
shell组件的中的shell脚本不支持交互是查询。ssh远程命令不支持,如果spark环境不在oozie组件的主机上,此方法行不通。
*********************************下面重点介绍怎么通过ssh远程执行shell命令***********************************
使用oozie提供的小组件
现在有几个问题需要解决:
1、oozie调度时候使用的是oozie账户,当你执行ssh时候发现回报登陆错误。解决办法配置免密
2、怎么配置免密
su oozie 的时候报以下提示,因为chd生成的用户名问题,本人再这里花费了很多时间处理此问题。
This account is currently not available.
解决办法:使用sudo -u oozie 命令执行免密登陆设置
sudo -u oozie ssh-keygen
生成秘钥
cat /var/lib/oozie/.ssh/id_rsa.pub >>/root/.ssh/authorized_keys (写入的是你需要免密的服务器,这里展示的是本机写法)
验证免密登录
到这里,你就可以随心所欲的书写脚本,并存到再服务器。通过oozie ssh远程执行此脚本,以此达到调度的效果。
创建一个工作流定义:workflow.xml
下面这个简单的工作流定义了执行一个 Spark 作业的配置方法:
<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkWordCount'>
<start to='spark-node' />
<action name='spark-node'>
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data"/>
</prepare>
<master>${master}</master>
<name>Spark-Wordcount</name>
<class>com.ibm.biginsights.oozie.examples.WordCountSparkMain</class>
<jar>${nameNode}/user/${wf:user()}/${examplesRoot}/lib/examples-1.0.jar</jar>
<spark-opts>–conf spark.driver.extraJavaOptions=-Diop.version=4.1.0.0</spark-opts>
<arg>${nameNode}/user/${wf:user()}/${examplesRoot}/input-data</arg>
<arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output-data</arg>
</spark>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Workflow failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]
</message>
</kill>
<end name='end' />
</workflow-app>
一些元素定义如下:
有关 Oozie 中的 Spark XML 模式的详细信息,请参阅https://oozie.apache.org/docs/4.2.0/DG_SparkActionExtension.html
。
prepare
元素指定一个要在启动作业之前删除或创建的路径列表。这些路径必须以 hdfs://host_name:port_number 开头。
master
元素指定 Spark Master 的 URL;例如
spark://host:port、mesos://host:port、yarn-cluster、yarn-master 或 local。对于
Spark on YARN 模式,在 master 元素中指定的 yarn-client 或
yarn-cluster。在这个示例中,master=yarn-cluster。
name
元素指定 Spark 应用程序的名称。
class
元素指定 Spark 应用程序的主要类。
jar
元素指定一个逗号分隔的 JAR 文件列表。
spark-opts
元素(如果存在)包含一个可通过指定 ‘-conf key=value’ 传递给 Spark 驱动程序的 Spark 配置选项列表。
arg
元素包含可传递给 Spark 应用程序的参数。
创建一个 Oozie 作业配置:job.properties
nameNode=hdfs://nn:8020
jobTracker=rm:8050
master=yarn-cluster
queueName=default
examplesRoot=spark-example
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}
创建一个 Oozie 应用程序目录
创建一个包含工作流定义和资源的应用程序目录结构,如下面的示例所示:
+-~/spark-example/
+-job.properties
+-workflow.xml
+-lib/
+-example-1.0.jar
example-1.0.jar 文件包含 Spark 应用程序。
下载 spark-assembly.jar 文件
浏览 HDFS NameNode 用户界面,下载 spark-assembly.jar 文件。
从 Ambari 控制台中选择 HDFS
,然后选择 Quick Links
–> NameNode UI
。
单击 Utilities
–> Browse the file system
。
在 Hadoop 文件资源管理器中,导航到 /iop/apps/4.1.0.0/spark/jars,选择 spark-assembly.jar,单击 Download
并保存该文件。
将下载的 spark-assembly.jar 文件转移到 lib 目录,这会得到以下目录结构:
+-~/spark-example/
+-job.properties
+-workflow.xml
+-lib/
+-example-1.0.jar
+-spark-assembly.jar
将应用程序复制到 HDFS
将 spark-example/ 目录复制到 HDFS 中的用户 HOME 目录。确保 HDFS 中的 spark-example 位置与 job.properties 中的 oozie.wf.application.path 值匹配。
$ hadoop fs -put spark-example spark-example
运行示例作业
运行以下命令来提交 Oozie 作业:
$cd ~/spark-example
$oozie job -oozie http://oozie-host:11000/oozie -config ./job.properties –run
job: 0000012-151103233206132-oozie-oozi-W
检查工作流作业状态:
$ oozie job –oozie http://oozie-host:11000/oozie -info 0000012-151103233206132-oozie-oozi-W
Job ID : 0000012-151103233206132-oozie-oozi-W
————————————————————————————————————————————
Workflow Name : SparkWordCount
App Path : hdfs://bdvs1211.svl.ibm.com:8020/user/root/spark-example
Status : SUCCEEDED
Run : 0
User : root
Group : –
Created : 2015-11-04 15:19 GMT
Started : 2015-11-04 15:19 GMT
Last Modified : 2015-11-04 15:23 GMT
Ended : 2015-11-04 15:23 GMT
CoordAction ID: –
Actions
————————————————————————————————————————————
ID Status Ext ID Ext Status Err Code
————————————————————————————————————————————
0000012-151103233206132-oozie-oozi-W@:start: OK – OK –
0000012-151103233206132-oozie-oozi-W@spark-node OK job_1446622088718_0022 SUCCEEDED –
0000012-151103233206132-oozie-oozi-W@end OK – OK –
————————————————————————————————————————————
完整的 Java 程序
public static void main(String[] args) {
if (args.length <2) {
System.err.println("Usage: WordCountSparkMain <file><file>")
System.exit(1)
}
String inputPath = args[0]
String outputPath = args[1]
SparkConf sparkConf = new SparkConf().setAppName("Word count")
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
JavaRDD<String>lines = ctx.textFile(inputPath, 1)
JavaRDD<String>words = lines.flatMap(new FlatMapFunction<String,
String>() {
private static final long serialVersionUID = 1L
public Iterable<String>call(String sentence) {
List<String>result = new ArrayList<>()
if (sentence != null) {
String[] words = sentence.split(" ")
for (String word : words) {
if (word != null &&word.trim().length() >0) {
result.add(word.trim().toLowerCase())
}
}
}
return result
}
})
JavaPairRDD<String, Integer>pairs = words.mapToPair(new
PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L
public Tuple2<String, Integer>call(String s) {
return new Tuple2<>(s, 1)
}
})
JavaPairRDD<String, Integer>counts = pairs.reduceByKey(new
Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L
public Integer call(Integer a, Integer b) {
return a + b
}
}, 2)
JavaPairRDD<Integer, String>countsAfterSwap = counts.mapToPair(new
PairFunction<Tuple2<String, Integer>, Integer, String>() {
private static final long serialVersionUID = 2267107270683328434L
@Override
public Tuple2<Integer, String>call(Tuple2<String, Integer>t)
throws Exception {
return new Tuple2<>(t._2, t._1)
}
})
countsAfterSwap = countsAfterSwap.sortByKey(false)
counts = countsAfterSwap.mapToPair(new PairFunction<Tuple2<Integer,
String>, String, Integer>() {
private static final long serialVersionUID = 2267107270683328434L
@Override
public Tuple2<String, Integer>call(Tuple2<Integer, String>t)
throws Exception {
return new Tuple2<>(t._2, t._1)
}
})
JavaRDD<String>results = counts.map(new Function<Tuple2<String,
Integer>, String>() {
@Override
public String call(Tuple2<String, Integer>v1) throws Exception {
return String.format("%s,%s", v1._1, Integer.toString(v1._2))
}
})
results.saveAsTextFile(outputPath)
}
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)