通过yarn的restful api简单的对提交到yarn的flink任务进行运行状态预警监控。
官方文档地址:Apache Hadoop 3.2.2 – ResourceManager REST APIs.
yarn的restful api:
1,我们主要看看
Cluster Applications API
2,简单找个集群测试一下,查一下
简单代码演示:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.mzlion.easyokhttp.HttpClient; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.ListUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; public class GetYarnApps2 { // private static volatile ConcurrentHashMapcurrentTaskMap = new ConcurrentHashMap (); private static volatile List failedTaskList = new ArrayList (); private static volatile List failedTaskList2 = new ArrayList (); public static void main(String[] args) throws InterruptedException { // String url = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps"; // String url = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps?queue=users"; //todo 指定队列 //todo 生产环境 String url = "http://prod-qd-ct6-cdh-master01:8088/ws/v1/cluster/apps?queue=flink"; //todo 指定队列 String jobUrl= "http://prod-qd-ct6-cdh-master01:8088/proxy/application_1609329247342_52776/jobs"; String perfer_url = "http://prod-qd-ct6-cdh-master01:8088/proxy/"; String url2 = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps/application_1326821518301_0005"; //todo 查看指定任务 String url3 = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps/application_1326821518301_0005/state"; //todo 查看指定任务状态 String url4 = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/metrics"; //todo 整个集群指标 // String url="http://dev-ct6-dc-master01:8088/cluster/apps/FINISHED"; // String url="http://dev-ct6-dc-master01:7180/static/apidocs/"; String responseData = HttpClient.get(url).asString(); JSonObject jsonObject = JSONObject.parseObject(responseData); String app = jsonObject.getJSonObject("apps").getString("app"); while (true) { failedTaskList.clear(); failedTaskList2.clear(); List appList = JSON.parseArray(app, YarnApp.class); for (int i = 0; i < appList.size(); i++) { String queue = appList.get(i).getQueue(); String id = appList.get(i).getId(); String state = appList.get(i).getState(); String name = appList.get(i).getName(); // System.out.println("任务状态state = " + state); System.out.println("yarn任务名称name = " + name+",___,"+"任务state = " + state+",___,"+"任务Id = " + id); String jobs_url = perfer_url+id+"/jobs"; String job_responseData = HttpClient.get(jobs_url).asString(); JSonObject job_json = JSONObject.parseObject(job_responseData); String job_id = job_json.getJSonArray("jobs").getJSonObject(0).getString("id"); String job_status = job_json.getJSonArray("jobs").getJSonObject(0).getString("status"); System.out.println("job_id = " + job_id+",________,job_status = "+job_status); System.out.println(); String erveryTastStr = name+","+id+","+state+","+job_id+","+job_status; failedTaskList.add(erveryTastStr); failedTaskList2.add(erveryTastStr); } System.out.println("failedTaskList = " + failedTaskList); System.out.println("failedTaskList2 = " + failedTaskList2); List list = ListUtils.removeAll(failedTaskList, failedTaskList2); if (list.size()>0){ System.out.println("11111111"); }else { System.out.println(22222222); } TimeUnit.SECONDS.sleep(30); } } //todo 循环对比目前的任务是否存在。 public static Boolean RunningTaskList(){ //todo 在这里可以获取mysql或者redis当前我们提交的任务。 //todo 这里的案例是从redis获取,或者本地写死 // private static volatile List failedTaskList = Collections.synchronizedList(new ArrayList ()); List list1 = Arrays.asList("Zeppelin Flink on yarn"); // List list2= Arrays.asList("Zeppelin Flink on yarn"); boolean equalCollection = CollectionUtils.isEqualCollection(list1, failedTaskList); return equalCollection; } }
3, flink on yarn任务监控,我们去需要拿到applicationId 和 JobId
~通过applicationId 也可以拿到JobId
但是我们正常启动flink任务之后,在打印日志里面可以获取到applicationId和jobId,我们可以保存到数据库,这个在我之前写的文章里面有写到。
4,通过restful api监控flink任务,我这里只展示几个有用的,简单的。
参考官方文档:
Apache Flink 1.11 documentation: Metrics
1,flink checkpoint的metric
案例:
通过restful api查询flink任务最后一次checkpoint的地址(任务要在运行中)
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.mzlion.easyokhttp.HttpClient; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.stream.Collectors; public class YarnHttpUtils2 { private static String getCheckpointUrl(String mater, String applicationId, String jobId) { // String url = "http://prod-qd-ct6-cdh-master01:8088/ws/v1/cluster/apps"; String url = mater + "/proxy/" + applicationId + "/jobs/" + jobId + "/metrics?get=lastCheckpointExternalPath"; // String url = "http://ct7-cdh-master02:8088/proxy/"+applicationId+"/jobs/"+jobId+"/metrics?get=lastCheckpointExternalPath"; // String url= mater + "/proxy/" + applicationId+"/jobs"; String value = ""; try { String responseData = HttpClient.get(url).asString(); JSonArray jsonArray = JSONArray.parseArray(responseData); value = jsonArray.getJSonObject(0).getString("value"); System.out.println("value = " + value); return value; } catch (Exception ex) { ex.getStackTrace(); System.out.println("第一次没查询到..."); } System.out.println("value = " + value); return ""; } public static void main(String[] args) throws InterruptedException { // String url = "http://ct7-cdh-master02:8088/proxy/application_1641381557441_0067/jobs/fd3e4ba2d3cb7e8d0785905f2a9868c2/metrics?get=lastCheckpointExternalPath"; String checkpointUrl = getCheckpointUrl("http://ct7-cdh-master02:8088", "application_1641381557441_0109", "1b78e99add472bf0db9e011a363ebd43"); if (StringUtils.isEmpty(checkpointUrl) || !checkpointUrl.contains("checkpoints/flink-1.13.0/cdc/rocksDBStateBackend")) { checkpointUrl = getCheckpointUrl("http://ct7-cdh-master02:8088", "application_1641381557441_0109", "1b78e99add472bf0db9e011a363ebd43"); } System.out.println("checkpointUrl = " + checkpointUrl); } }
输出结果:
checkpointUrl = hdfs://ct7-cdh-master02:8020/checkpoints/flink-1.13.0/cdc/rocksDBStateBackend/6a9299c415b3f7acd3a3d5bfa13f6ac9/chk-361
主要是看job的信息,这里就不演示了 很简单,跟着官网做:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)