首先需要创建maven项目导入坐标
org.apache.kafka kafka_2.110.10.2.1
接下来,直接上代码
package com.example.demo; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.junit.Test; import java.util.*; import java.util.concurrent.ExecutionException; public class KafkaTest2 { @Test public void DemoTest() throws ExecutionException, InterruptedException { Properties properties=new Properties(); //主机信息查看kafka/config对应目录下advertised.host.name properties.put("bootstrap.servers","对应的主机ip:端口号"); //这个id在kafka目录下使用bin/kafka-consumer-groups.sh --zookeeper 对应的zookeeperip:2181 --list命令查看 properties.put("group.id", "test"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerkafkaConsumer = new KafkaConsumer (properties); kafkaConsumer.assign(Arrays.asList(new TopicPartition("对应的topic",0))); // kafkaConsumer.subscribe(Arrays.asList("mt1")); Map > stringListMap = kafkaConsumer.listTopics(); System.out.println(stringListMap.get("对应的topic").stream()); ConsumerRecords records = kafkaConsumer.poll(100); //查询最新的topic信息数量 records.count(); // 查询topic具体内容 // for (ConsumerRecord record : records) { // //Thread.sleep(1000); // System.out.printf("offset = %d, value = %s", record.offset(), record.value()); // System.out.println(); // } //如果想实时获取可以使用while // while (true) { // ConsumerRecords records = kafkaConsumer.poll(100); // for (ConsumerRecord record : records) { // //Thread.sleep(1000); // System.out.printf("offset = %d, value = %s", record.offset(), record.value()); // System.out.println(); // } // } } }
另外,查询所有的topic可以在kafka目录下使用bin/kafka-topics.sh --zookeeper 对应的zookeeperip:2181 --list
也可以使用java代码:
ZkUtils zkUtils = ZkUtils.apply("对应的zookeeperip:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); Maptopics = JavaConverters.mapAsJavaMapConverter(AdminUtils.fetchAllTopicConfigs(zkUtils)) .asJava(); for (Map.Entry entry : topics.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); System.out.println(key + ":" + value); } zkUtils.close();
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)