kafka consumer重新连接后如何获取当前最新数据

kafka consumer重新连接后如何获取当前最新数据,第1张

不过要注意一些注意事项,对于多个partition和多个consumer

1 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数

2 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀

最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目

3 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

4 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

5 High-level接口中获取不到数据的时候是会block的

简单版,

简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置

因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset

Properties props = new Properties();

propsput("autooffsetreset", "smallest"); //必须要加,如果要读旧数据

propsput("zookeeperconnect", "localhost:2181");

propsput("groupid", "pv");

propsput("zookeepersessiontimeoutms", "400");

propsput("zookeepersynctimems", "200");

propsput("autocommitintervalms", "1000");

ConsumerConfig conf = new ConsumerConfig(props);

ConsumerConnector consumer = kafkaconsumerConsumercreateJavaConsumerConnector(conf);

String topic = "page_visits";

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMapput(topic, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap);

List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic);

KafkaStream<byte[], byte[]> stream = streamsget(0);

ConsumerIterator<byte[], byte[]> it = streamiterator();

while (ithasNext()){

Systemoutprintln("message: " + new String(itnext()message()));

}

if (consumer != null) consumershutdown(); //其实执行不到,因为上面的hasNext会block

在用high-level的consumer时,两个给力的工具,

1 bin/kafka-run-classsh kafkatoolsConsumerOffsetChecker --group pv

可以看到当前group offset的状况,比如这里看pv的状况,3个partition

Group Topic Pid Offset logSize Lag Owner

pv page_visits 0 21 21 0 none

pv page_visits 1 19 19 0 none

pv page_visits 2 20 20 0 none

关键就是offset,logSize和Lag

这里以前读完了,所以offset=logSize,并且Lag=0

2 bin/kafka-run-classsh kafkatoolsUpdateOffsetsInZK earliest config/consumerproperties page_visits

3个参数,

[earliest | latest],表示将offset置到哪里

consumerproperties ,这里是配置文件的路径

topic,topic名,这里是page_visits

我们对上面的pv group执行完这个 *** 作后,再去check group offset状况,结果如下,

Group Topic Pid Offset logSize Lag Owner

pv page_visits 0 0 21 21 none

pv page_visits 1 0 19 19 none

pv page_visits 2 0 20 20 none

可以看到offset已经被清0,Lag=logSize

底下给出原文中多线程consumer的完整代码

import kafkaconsumerConsumerConfig;

import kafkaconsumerKafkaStream;

import kafkajavaapiconsumerConsumerConnector;

import javautilHashMap;

import javautilList;

import javautilMap;

import javautilProperties;

import javautilconcurrentExecutorService;

import javautilconcurrentExecutors;

public class ConsumerGroupExample {

private final ConsumerConnector consumer;

private final String topic;

private ExecutorService executor;

public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {

consumer = kafkaconsumerConsumercreateJavaConsumerConnector( // 创建Connector,注意下面对conf的配置

createConsumerConfig(a_zookeeper, a_groupId));

thistopic = a_topic;

}

public void shutdown() {

if (consumer != null) consumershutdown();

if (executor != null) executorshutdown();

}

public void run(int a_numThreads) { // 创建并发的consumers

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMapput(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap); // 创建Streams

List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic); // 每个线程对应于一个KafkaStream

// now launch all the threads

//

executor = ExecutorsnewFixedThreadPool(a_numThreads);

// now create an object to consume the messages

//

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executorsubmit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread

threadNumber++;

}

}

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {

Properties props = new Properties();

propsput("zookeeperconnect", a_zookeeper);

propsput("groupid", a_groupId);

propsput("zookeepersessiontimeoutms", "400");

propsput("zookeepersynctimems", "200");

propsput("autocommitintervalms", "1000");

return new ConsumerConfig(props);

}

public static void main(String[] args) {

String zooKeeper = args[0];

String groupId = args[1];

String topic = args[2];

int threads = IntegerparseInt(args[3]);

ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);

examplerun(threads);

try {

Threadsleep(10000);

} catch (InterruptedException ie) {

}

exampleshutdown();

}

}

SimpleConsumer

另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口

参考,>

查看Kafka某Topic的Partition详细信息时,使用如下哪个命令()

Abin/kafka-topicssh--create

Bbin/kafka-topicssh--delete

Cbin/kafka-topicssh--describe(正确答案)

Dbin/kafka-topicssh--1ist

输出

输出

注意如果你的kafka设置了zookeeper root,比如为/kafka,那么命令应该改为:

重启相关的应用程序,就可以从设置的offset开始读数据了。

手动更新Kafka存在Zookeeper中的偏移量。我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper中的数据了。Kafka内置为我们提供了修改偏移量的类:kafkatoolsUpdateOffsetsInZK,我们可以通过它修改Zookeeper中某个主题的偏移量,具体 *** 作如下:

在不输入参数的情况下,我们可以得知kafkatoolsUpdateOffsetsInZK类需要输入的参数。我们的consumerproperties文件配置内容如下:

这个工具只能把Zookeeper中偏移量设置成earliest或者latest,如下:

以上就是关于kafka consumer重新连接后如何获取当前最新数据全部的内容,包括:kafka consumer重新连接后如何获取当前最新数据、Kafka 源码解析之 Topic 的新建/扩容/删除、查看Kafka某Topic的Partition详细信息时,使用如下哪个命令()等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9511887.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-29
下一篇 2023-04-29

发表评论

登录后才能评论

评论列表(0条)

保存