不过要注意一些注意事项,对于多个partition和多个consumer
1 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5 High-level接口中获取不到数据的时候是会block的
简单版,
简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置
因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset
Properties props = new Properties();
propsput("autooffsetreset", "smallest"); //必须要加,如果要读旧数据
propsput("zookeeperconnect", "localhost:2181");
propsput("groupid", "pv");
propsput("zookeepersessiontimeoutms", "400");
propsput("zookeepersynctimems", "200");
propsput("autocommitintervalms", "1000");
ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafkaconsumerConsumercreateJavaConsumerConnector(conf);
String topic = "page_visits";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMapput(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic);
KafkaStream<byte[], byte[]> stream = streamsget(0);
ConsumerIterator<byte[], byte[]> it = streamiterator();
while (ithasNext()){
Systemoutprintln("message: " + new String(itnext()message()));
}
if (consumer != null) consumershutdown(); //其实执行不到,因为上面的hasNext会block
在用high-level的consumer时,两个给力的工具,
1 bin/kafka-run-classsh kafkatoolsConsumerOffsetChecker --group pv
可以看到当前group offset的状况,比如这里看pv的状况,3个partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键就是offset,logSize和Lag
这里以前读完了,所以offset=logSize,并且Lag=0
2 bin/kafka-run-classsh kafkatoolsUpdateOffsetsInZK earliest config/consumerproperties page_visits
3个参数,
[earliest | latest],表示将offset置到哪里
consumerproperties ,这里是配置文件的路径
topic,topic名,这里是page_visits
我们对上面的pv group执行完这个 *** 作后,再去check group offset状况,结果如下,
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已经被清0,Lag=logSize
底下给出原文中多线程consumer的完整代码
import kafkaconsumerConsumerConfig;
import kafkaconsumerKafkaStream;
import kafkajavaapiconsumerConsumerConnector;
import javautilHashMap;
import javautilList;
import javautilMap;
import javautilProperties;
import javautilconcurrentExecutorService;
import javautilconcurrentExecutors;
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafkaconsumerConsumercreateJavaConsumerConnector( // 创建Connector,注意下面对conf的配置
createConsumerConfig(a_zookeeper, a_groupId));
thistopic = a_topic;
}
public void shutdown() {
if (consumer != null) consumershutdown();
if (executor != null) executorshutdown();
}
public void run(int a_numThreads) { // 创建并发的consumers
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMapput(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumercreateMessageStreams(topicCountMap); // 创建Streams
List<KafkaStream<byte[], byte[]>> streams = consumerMapget(topic); // 每个线程对应于一个KafkaStream
// now launch all the threads
//
executor = ExecutorsnewFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executorsubmit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
propsput("zookeeperconnect", a_zookeeper);
propsput("groupid", a_groupId);
propsput("zookeepersessiontimeoutms", "400");
propsput("zookeepersynctimems", "200");
propsput("autocommitintervalms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = IntegerparseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
examplerun(threads);
try {
Threadsleep(10000);
} catch (InterruptedException ie) {
}
exampleshutdown();
}
}
SimpleConsumer
另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口
参考,>
查看Kafka某Topic的Partition详细信息时,使用如下哪个命令()
Abin/kafka-topicssh--create
Bbin/kafka-topicssh--delete
Cbin/kafka-topicssh--describe(正确答案)
Dbin/kafka-topicssh--1ist
输出
输出
注意如果你的kafka设置了zookeeper root,比如为/kafka,那么命令应该改为:
重启相关的应用程序,就可以从设置的offset开始读数据了。
手动更新Kafka存在Zookeeper中的偏移量。我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper中的数据了。Kafka内置为我们提供了修改偏移量的类:kafkatoolsUpdateOffsetsInZK,我们可以通过它修改Zookeeper中某个主题的偏移量,具体 *** 作如下:
在不输入参数的情况下,我们可以得知kafkatoolsUpdateOffsetsInZK类需要输入的参数。我们的consumerproperties文件配置内容如下:
这个工具只能把Zookeeper中偏移量设置成earliest或者latest,如下:
以上就是关于kafka consumer重新连接后如何获取当前最新数据全部的内容,包括:kafka consumer重新连接后如何获取当前最新数据、Kafka 源码解析之 Topic 的新建/扩容/删除、查看Kafka某Topic的Partition详细信息时,使用如下哪个命令()等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)