flink官方提供了写metrics的方式,但是相对来说有些不灵活,不符合我当前的要求,也没法自定义动态的label值,于是自定义了sink写入到pushgateway里。
代码如下:
class MyPushGateWaySink(pushgatewayipport:String) extends RichSinkFunction[(String,String,String)] { var pushgateway:PushGateway = _ var gauge:Gauge = Gauge.build .name("name") .help("help") .labelNames("label1","label2", "label3").register // open函数用于初始化富函数运行时的上下文等环境 override def open(parameters: Configuration): Unit = { println("----------------------------初始化连接-------------------------") super.open(parameters) pushgateway= new PushGateway(pushgatewayipport) } override def invoke(value: (String,String,String), context: SinkFunction.Context): Unit = { val timestamp=System.currentTimeMillis() gauge.labels(value._1,value._2,value._3).set(timestamp) pushgateway.push(gauge,"name") } // 关闭时做清理工作 override def close(): Unit = { println("-----------------------关闭连接-----------------------") } }
kafkaStream.addSink(new MyPushGateWaySink(pushgatewayipport))
所需依赖如下:
io.dropwizard.metrics metrics-core4.1.16 io.prometheus simpleclient0.9.0 io.prometheus simpleclient_httpserver0.9.0 io.prometheus simpleclient_pushgateway0.9.0
但是运行一直报无法序列化问题,于是修改源码,添加序列化
因为使用的是Gauge类型,所以对该类添加序列化,如下图
同时父类也添加序列化
两个类的路径如下
再次运行,数据成功发送到pushgateway
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)