java连接kafka查询对应topic消费具体信息

java连接kafka查询对应topic消费具体信息,第1张

java连接kafka查询对应topic消费具体信息

首先需要创建maven项目导入坐标


            org.apache.kafka
            kafka_2.11
            0.10.2.1
        

接下来,直接上代码

package com.example.demo;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.ExecutionException;


public class KafkaTest2 {
    @Test
    public void DemoTest() throws ExecutionException, InterruptedException {

        Properties properties=new Properties();
        //主机信息查看kafka/config对应目录下advertised.host.name
        properties.put("bootstrap.servers","对应的主机ip:端口号");
        //这个id在kafka目录下使用bin/kafka-consumer-groups.sh --zookeeper 对应的zookeeperip:2181 --list命令查看
        properties.put("group.id", "test");
        
        properties.put("enable.auto.commit", "true");
        
        properties.put("auto.commit.interval.ms", "1000");
        
        properties.put("auto.offset.reset", "earliest");
        
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);

        kafkaConsumer.assign(Arrays.asList(new TopicPartition("对应的topic",0)));
        
//        kafkaConsumer.subscribe(Arrays.asList("mt1"));
        Map> stringListMap = kafkaConsumer.listTopics();

        System.out.println(stringListMap.get("对应的topic").stream());
        ConsumerRecords records = kafkaConsumer.poll(100);
        //查询最新的topic信息数量
        records.count();
//  查询topic具体内容
//        for (ConsumerRecord record : records) {
//            //Thread.sleep(1000);
//            System.out.printf("offset = %d, value = %s", record.offset(), record.value());
//            System.out.println();
//        }
//如果想实时获取可以使用while
		//        while (true) {
//            ConsumerRecords records = kafkaConsumer.poll(100);
//            for (ConsumerRecord record : records) {
//                //Thread.sleep(1000);
//                System.out.printf("offset = %d, value = %s", record.offset(), record.value());
//                System.out.println();
//            }
//        }
    }

}

另外,查询所有的topic可以在kafka目录下使用bin/kafka-topics.sh --zookeeper 对应的zookeeperip:2181 --list
也可以使用java代码:

        ZkUtils zkUtils = ZkUtils.apply("对应的zookeeperip:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
        Map topics = JavaConverters.mapAsJavaMapConverter(AdminUtils.fetchAllTopicConfigs(zkUtils))
                .asJava();
        for (Map.Entry entry : topics.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();
            System.out.println(key + ":" + value);
        }
        zkUtils.close();

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4685805.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存