本项目属于kafka消费者,每天需要消费的数据多则达到上亿级别,所以开设的多线程去消费数据,目前的需求是需要记录kafka每个主题每天消费的数据量;
因为kafka每个主题(topic)下面会有多个分区(pratition),多个消费者(一个消费者相当于一个线程)同时去消费这些分区,所以在统计数据量的时候,需要整合分区统计出每个主题总消费数据量。
又因为是多个主题(topic)每个主题下多个(分区),消费者也是多个,所以:
代码如下:
//定义多主题多线程poll数据Map(全局变量) public static MapatomicLongMap = new HashMap<>();
在kafka消费者监听方法中:
//每个主题定义一个多线程数据量统计Map(如果消费的数据所属的topic已经创建了AtomicLong,则记录++,如果所属的topic还没有AtomicLong,则创建) if (atomicLongMap.isEmpty() || atomicLongMap.get(topic) == null){ atomicLongMap.put(topic,new AtomicLong(0)); }
//多线程统计数据量(根据topic统计每个topic的数据量) Long data_amt = atomicLongMap.get(record.topic()).getAndIncrement();
data_amt 即为多线程每个主题消费到的数据总量
AtomicLong atomicLong = new AtomicLong();
Long count = atomicLong.getAndIncrement();
(.getAndIncrement() 该方法即多线程统计数据量)
----END----
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)