目前prometheus监控组件主要是通过其他组件提供的metrics接口来获取组件的指标监控项,然后设定相关规则(报警阈值),最后通过alertmanager将达到阈值的指标发送给相关运维人员(可以发送到微信,钉钉,邮件等)。
那么,如果公司想通过prometheus监控自己开发的应用服务,并且该服务并没有提供metrics接口,就可以通过pushGateway的方式,将自定义的指标数据推送到网关,然后prometheus会定期从gateway拉取指标数据,从而达到目的。本案例就是通过python脚本监控自己的服务,将自定义的指标数据推送至网关来实现的。
import time import requests import json from datetime import datetime import socket from prometheus_client import CollectorRegistry, Gauge, push_to_gateway # 获取配置信息 pushAddress = 'http://localhost:9091' statusField = 'status' # flink任务正常运行的canal实例接口 flinkJobUrl = 'http://localhost:8199/data/getCanalTaskInstance' metrics_help = 'flink_job_running_status and kafka and canal' job = 'flink_job' name = 'flink_job_running_status_test1' print("本机ip为:", socket.gethostbyname(socket.gethostname())) start0 = datetime.now() # 根据接口获取label数据 try: response2 = requests.get(flinkJobUrl) flinkJobs = json.loads(response2.text) jobInfos = flinkJobs.get('data') print("共采集flinkJobs数:", len(jobInfos)) except Exception as e: print("flink任务canal任务实例获取失败") print("获取数据耗时:", (datetime.now() - start0).seconds) # label实体类 class CustomerLabel: def __init__(self, jobId, jobName, destination, topic, sinkType, datasourceName, dbName, creator, createTime, status): self.jobId = jobId self.jobName = jobName self.destination = destination self.topic = topic self.sinkType = sinkType self.datasourceName = datasourceName self.dbName = dbName self.creator = creator self.createTime = createTime self.status = status # 构造metrics数据 def push_metrics_gateway(url, job, metricsName, helps): registry = CollectorRegistry() labels = ['jobId', 'jobName', 'destination', 'topic', 'sinkType', 'datasourceName', 'dbName', 'creator', 'createTime'] labelObj = set() start1 = datetime.now() for data in jobInfos: label = CustomerLabel(data['job_id'], data['job_name'], data['destination'], data['topic'], data['sink_type'], data['datasource_name'], data['db_name'], data['creator'], data['create_time'], data[statusField]) labelObj.add(label) gau = Gauge(metricsName, helps, labels, registry=registry) print("构造数据耗时:", (datetime.now() - start1).seconds) start2 = datetime.now() for obj in labelObj: gau.labels(obj.jobId, obj.jobName, obj.destination, obj.topic, obj.sinkType, obj.datasourceName, obj.dbName, obj.creator, obj.createTime).set(int(obj.status)) push_to_gateway(url, job=job, registry=registry, timeout=20) print("推送数据耗时:", (datetime.now() - start2).seconds) # 推送数据到网关 try: print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "开始推送数据到Prometheus网关") push_metrics_gateway(pushAddress, job, name, metrics_help) print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "metrics push gateway successfully") except Exception as e: print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " : metrics push gateway exception")
以上代码实现了通过数据服务接口从mysql数据库查询出来关键数据,然后组装成metrics特定的数据结构,推送到指定网关的案例。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)