azkaban.project
azkaban-flow-version: 2.0
one.flow
nodes: - name: jobOne type: command config: command: echo "Hello world"
two.flow 依赖
nodes: - name: jobA type: command config: command: echo "jobA" - name: jobB type: command config: command: echo "jobB" - name: jobC type: command dependsOn: - jobA - jobB config: command: echo "jobC"
three.flow 依赖 启动程序
nodes: - name: jobA type: command config: command: echo "jobA" - name: jobB type: command config: command: sh job.sh - name: jobC type: command dependsOn: - jobA - jobB config: command: echo "jobC"
job.sh
#!/bin/bash echo "do job" echo "azkaban job" echo "azkaban shell sh" stop-all.sh start-all.sh hdfs dfsadmin -safemode leave nohup hive --service hiveserver2 & nohup hive --service metastore & zkServer.sh start zkServer.sh status start-hbase.sh echo "over"
four.flow 重试
nodes: - name: joberror type: command config: command: sh job.sh retries: 3 retry.backoff: 5000
five.flow 条件
nodes: - name: jobA type: command config: command: sh jobA.sh - name: jobB type: command dependsOn: - jobA config: command: sh jobB.sh condition: ${jobA:wk} == 2
jobA.sh
#!/bin/bash echo "do jobA.sh" wk=`date +%w` echo "{"wk":$wk}" > $JOB_OUTPUT_PROP_FILE
jobB.sh
#!/bin/bash stop-hbase.sh zkServer.sh stop stop-all.sh
six.flow 预定义宏
nodes: - name: starthadoopJob type: command config: command: sh startHadoop.sh - name: startZkJob type: command config: command: sh startZK.sh - name: startHbase type: command dependsOn: - starthadoopJob - startZkJob config: command: sh startHbase.sh condition: all_success
startHadoop.sh
#!/bin/bash start-all.sh hdfs dfsadmin -safemode leave
startZK.sh 开启Hbase
#!/bin/bash zkServer.sh start zkServer.sh status
startHbase.sh
#!/bin/bash start-hbase.sh
seven.flow Hbase *** 作
nodes: - name: hbaseOpjob type: command config: command: hbase shell hbasedemo.sh
hbasedemo.sh
#!/bin/bash create_namespace 'aabbcc' create 'aabbcc:tb1','student','teacher' list_namespace list_namespace_tables 'aabbcc' exit
eight.flow
nodes: - name: hiveDemoJob type: command config: command: hive -f hivedemo.sql
hivedemo.sql
create database if not exists exam20220121; use exam20220121; -- drop table if exists student; create table if not exists student(id int, name string) row format delimited fields terminated by ','; load data inpath '/opt/azkaban/student.txt' into table student; -- drop table if exists student2; create table if not exists student2 stored as orc as select * from student;
student.txt
1,zs 2,ls 3,ww
ninejava.flow
nodes: - name: javaJob type: javaprocess config: java.class: cn.kgc.TestJavaProcess bigdata kb15
package cn.kgc; public class TestJavaProcess { public static void main(String[] args) { System.out.println("azkaban java process"); System.out.println(args[0]); System.out.println(args[1]); } }
tenspark.flow
nodes: - name: javaJob type: command config: command: spark-submit --class cn.kgc.TestSparkProcess ./azkabandemo-1.0-SNAPSHOT.jar
package cn.kgc; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List; public class TestSparkProcess { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("azkabanSparkDemo").setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); Integer[] ints = {1,2,3,4,5}; JavaRDDrdd = sc.parallelize(Arrays.asList(ints)); List collect = rdd.collect(); for (Integer i : collect) { System.out.println(i); } sc.close(); } }
project 与 flow内容打包成zip, java程序还需要将jar包、project 与 flow内容一同打包成zip, 在azkaban网页创建主题,上传,执行 达到效果。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)