多线程运行项目时,统计数据量----AtomicLong

多线程运行项目时,统计数据量----AtomicLong,第1张

多线程运行项目时,统计数据量----AtomicLong 多线程运行,记录流动数据量;

本项目属于kafka消费者,每天需要消费的数据多则达到上亿级别,所以开设的多线程去消费数据,目前的需求是需要记录kafka每个主题每天消费的数据量;
因为kafka每个主题(topic)下面会有多个分区(pratition),多个消费者(一个消费者相当于一个线程)同时去消费这些分区,所以在统计数据量的时候,需要整合分区统计出每个主题总消费数据量。
又因为是多个主题(topic)每个主题下多个(分区),消费者也是多个,所以:
代码如下:

//定义多主题多线程poll数据Map(全局变量)
public static Map atomicLongMap = new HashMap<>();

在kafka消费者监听方法中:

//每个主题定义一个多线程数据量统计Map(如果消费的数据所属的topic已经创建了AtomicLong,则记录++,如果所属的topic还没有AtomicLong,则创建)
if (atomicLongMap.isEmpty() || atomicLongMap.get(topic) == null){
    atomicLongMap.put(topic,new AtomicLong(0));
}
//多线程统计数据量(根据topic统计每个topic的数据量)
Long data_amt = atomicLongMap.get(record.topic()).getAndIncrement();

data_amt 即为多线程每个主题消费到的数据总量

AtomicLong atomicLong = new AtomicLong();
Long count = atomicLong.getAndIncrement();
(.getAndIncrement() 该方法即多线程统计数据量)

----END----

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4827315.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-10
下一篇 2022-11-10

发表评论

登录后才能评论

评论列表(0条)

保存