flink自定义指标发送到pushgateway写入prometheus

flink自定义指标发送到pushgateway写入prometheus,第1张

flink自定义指标发送到pushgateway写入prometheus

flink官方提供了写metrics的方式,但是相对来说有些不灵活,不符合我当前的要求,也没法自定义动态的label值,于是自定义了sink写入到pushgateway里。
代码如下:

  class MyPushGateWaySink(pushgatewayipport:String) extends RichSinkFunction[(String,String,String)] {

    var pushgateway:PushGateway = _
    var gauge:Gauge = Gauge.build
      .name("name")
      .help("help")
      .labelNames("label1","label2", "label3").register
    // open函数用于初始化富函数运行时的上下文等环境
    override def open(parameters: Configuration): Unit = {
      println("----------------------------初始化连接-------------------------")
      super.open(parameters)
      pushgateway= new PushGateway(pushgatewayipport)
    }
    override def invoke(value: (String,String,String), context: SinkFunction.Context): Unit = {
      val timestamp=System.currentTimeMillis()
      gauge.labels(value._1,value._2,value._3).set(timestamp)
      pushgateway.push(gauge,"name")
    }
    // 关闭时做清理工作
    override def close(): Unit = {
      println("-----------------------关闭连接-----------------------")
    }
  }
   kafkaStream.addSink(new MyPushGateWaySink(pushgatewayipport))

所需依赖如下:


    io.dropwizard.metrics
    metrics-core
    4.1.16



    io.prometheus
    simpleclient
    0.9.0



    io.prometheus
    simpleclient_httpserver
    0.9.0



    io.prometheus
    simpleclient_pushgateway
    0.9.0

但是运行一直报无法序列化问题,于是修改源码,添加序列化
因为使用的是Gauge类型,所以对该类添加序列化,如下图

同时父类也添加序列化

两个类的路径如下

再次运行,数据成功发送到pushgateway

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683822.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存