Flink on yarn 的restful api 简单备份实践

Flink on yarn 的restful api 简单备份实践,第1张

Flink on yarn 的restful api 简单备份实践 业务需求:

           通过yarn的restful api简单的对提交到yarn的flink任务进行运行状态预警监控

官方文档地址:

Apache Hadoop 3.2.2 – ResourceManager REST APIs.


yarn的restful api:

 

1,我们主要看看

Cluster Applications API

 

2,简单找个集群测试一下,查一下

简单代码演示:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mzlion.easyokhttp.HttpClient;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.ListUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class GetYarnApps2 {
//    private static volatile ConcurrentHashMap currentTaskMap = new ConcurrentHashMap();
    private static volatile List failedTaskList =  new ArrayList();
    private static volatile List failedTaskList2 =  new ArrayList();


    public static void main(String[] args) throws InterruptedException {
//        String url = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps";
//        String url = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps?queue=users"; //todo 指定队列
        //todo 生产环境
        String url = "http://prod-qd-ct6-cdh-master01:8088/ws/v1/cluster/apps?queue=flink"; //todo 指定队列
        String jobUrl= "http://prod-qd-ct6-cdh-master01:8088/proxy/application_1609329247342_52776/jobs";
        String perfer_url = "http://prod-qd-ct6-cdh-master01:8088/proxy/";

        String url2 = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps/application_1326821518301_0005"; //todo 查看指定任务
        String url3 = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps/application_1326821518301_0005/state"; //todo 查看指定任务状态
        String url4 = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/metrics"; //todo 整个集群指标
//        String url="http://dev-ct6-dc-master01:8088/cluster/apps/FINISHED";
//        String url="http://dev-ct6-dc-master01:7180/static/apidocs/";
        String responseData = HttpClient.get(url).asString();
        JSonObject jsonObject = JSONObject.parseObject(responseData);
        String app = jsonObject.getJSonObject("apps").getString("app");
        while (true) {
            failedTaskList.clear();
            failedTaskList2.clear();
            List appList = JSON.parseArray(app, YarnApp.class);
            for (int i = 0; i < appList.size(); i++) {
                String queue = appList.get(i).getQueue();
                String id = appList.get(i).getId();

                String state = appList.get(i).getState();

                String name = appList.get(i).getName();

//                System.out.println("任务状态state = " + state);
                System.out.println("yarn任务名称name = " + name+",___,"+"任务state = " + state+",___,"+"任务Id = " + id);

                String jobs_url = perfer_url+id+"/jobs";
                String job_responseData = HttpClient.get(jobs_url).asString();
                JSonObject job_json = JSONObject.parseObject(job_responseData);

                 String job_id = job_json.getJSonArray("jobs").getJSonObject(0).getString("id");
                String job_status = job_json.getJSonArray("jobs").getJSonObject(0).getString("status");
                System.out.println("job_id = " + job_id+",________,job_status = "+job_status);
                System.out.println();

                String erveryTastStr = name+","+id+","+state+","+job_id+","+job_status;
                failedTaskList.add(erveryTastStr);
                failedTaskList2.add(erveryTastStr);
            }

            System.out.println("failedTaskList = " + failedTaskList);
            System.out.println("failedTaskList2 = " + failedTaskList2);
            List list = ListUtils.removeAll(failedTaskList, failedTaskList2);
            if (list.size()>0){
                System.out.println("11111111");
            }else {
                System.out.println(22222222);
            }
            TimeUnit.SECONDS.sleep(30);

        }

     }

    //todo 循环对比目前的任务是否存在。
    public static Boolean RunningTaskList(){
        //todo 在这里可以获取mysql或者redis当前我们提交的任务。
        //todo 这里的案例是从redis获取,或者本地写死
//        private static volatile List failedTaskList =  Collections.synchronizedList(new ArrayList());
        List list1 = Arrays.asList("Zeppelin Flink on yarn");
//        List list2= Arrays.asList("Zeppelin Flink on yarn");
        boolean equalCollection = CollectionUtils.isEqualCollection(list1, failedTaskList);

        return equalCollection;
    }
 
}

     

3, flink on yarn任务监控,我们去需要拿到applicationId 和 JobId

~通过applicationId 也可以拿到JobId

 

但是我们正常启动flink任务之后,在打印日志里面可以获取到applicationId和jobId,我们可以保存到数据库,这个在我之前写的文章里面有写到。

 

4,通过restful api监控flink任务,我这里只展示几个有用的,简单的。
 

参考官方文档:

Apache Flink 1.11 documentation: Metrics

1,flink checkpoint的metric 

 案例:

通过restful api查询flink任务最后一次checkpoint的地址(任务要在运行中)

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.mzlion.easyokhttp.HttpClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.stream.Collectors;

public class YarnHttpUtils2 {

    private static String getCheckpointUrl(String mater, String applicationId, String jobId) {
//        String url = "http://prod-qd-ct6-cdh-master01:8088/ws/v1/cluster/apps";
        String url = mater + "/proxy/" + applicationId + "/jobs/" + jobId + "/metrics?get=lastCheckpointExternalPath";
//        String url = "http://ct7-cdh-master02:8088/proxy/"+applicationId+"/jobs/"+jobId+"/metrics?get=lastCheckpointExternalPath";
//        String url= mater + "/proxy/" + applicationId+"/jobs";

        String value = "";
        try {
            String responseData = HttpClient.get(url).asString();
            JSonArray jsonArray = JSONArray.parseArray(responseData);
            value = jsonArray.getJSonObject(0).getString("value");
            System.out.println("value = " + value);
            return value;
        } catch (Exception ex) {
            ex.getStackTrace();
            System.out.println("第一次没查询到...");

        }
        System.out.println("value = " + value);
        return "";


    }

    public static void main(String[] args) throws InterruptedException {
        //        String url = "http://ct7-cdh-master02:8088/proxy/application_1641381557441_0067/jobs/fd3e4ba2d3cb7e8d0785905f2a9868c2/metrics?get=lastCheckpointExternalPath";
        String checkpointUrl = getCheckpointUrl("http://ct7-cdh-master02:8088",
                "application_1641381557441_0109",
                "1b78e99add472bf0db9e011a363ebd43");

        if (StringUtils.isEmpty(checkpointUrl) || !checkpointUrl.contains("checkpoints/flink-1.13.0/cdc/rocksDBStateBackend")) {
            checkpointUrl = getCheckpointUrl("http://ct7-cdh-master02:8088",
                    "application_1641381557441_0109",
                    "1b78e99add472bf0db9e011a363ebd43");
        }
        System.out.println("checkpointUrl = " + checkpointUrl);

    }


}

输出结果:

checkpointUrl = hdfs://ct7-cdh-master02:8020/checkpoints/flink-1.13.0/cdc/rocksDBStateBackend/6a9299c415b3f7acd3a3d5bfa13f6ac9/chk-361
 

主要是看job的信息,这里就不演示了 很简单,跟着官网做:

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705377.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存