- 前言
- 一、Kafka基础
- 1.消息队列
- 2、kafka特点
- 3、kafka存储结构
- 4、kafka集群架构
- 5.kafka基础 *** 作
- 6.kafka Api
- 二、Kafka原理
- 1.kafka储存原理
- 2.kafka写入数据
- 3.kafka读取数据
- 4.kafka数据清洗
- 5.kafka分区副本和数据同步
- 6.kafka生产者数据安全
- 7.kafka消费者数据精确
- 总结
前言
Kafka基础
一、Kafka基础
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
- 发布订阅:消息的发布者不会将消息直接发送给特定订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
- Kafka 最新定义:Kafka 是一个开源的分布式事件流式平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键人物应用。
消息队列(Message Queue),经常缩写为MQ,从字面上来理解,消息队列是一种用来存储消息的队列。
先进先出的数据结构;
将传输的数据放进队列中;
把数据放到消息队列叫做生产者(Producer);
从消息队列里边取数据叫做消费者(Consumer);
-
好处
系统解耦
异步处理
流量削峰
日志处理 -
消息传递模式
点对点- 数据只能被一个消费者使用,消费成功以后数据就会被删除,无法实现消费数据的共享
订阅发布模式 - 多个发布者将消息发送到队列,系统将这些消息传递给多个订阅者,类似于微信公众号
- 数据只能被一个消费者使用,消费成功以后数据就会被删除,无法实现消费数据的共享
- 高性能:对数据进行实时读写
- 高并发:分布式并行读写
- 高吞吐:使用分布式磁盘存储
- 高可靠:分布式主从架构
- 高安全性:数据安全保障机制
- 高灵活性:根据需求,随意添加生产者和消费者
Kafka在大数据中专门用于实现实时的数据缓冲存储,实现大数据实时流式计算
3、kafka存储结构- Broker:
- Kafka是一个分布式集群,多台机器构成,每台Kafka的节点就是一个Broker
- 类比为HDFS中从节点:DataNode,认为Zookeeper每个节点
Producer:生产者
- 负责将数据写入Kafka中,Kafka写入数据的客户端
- Kafka的每条数据格式:KV格式,其中V才是真正写入的数据,K决定数据写到队列中哪个位置
Consumer:消费者
-
负责从Kafka中消费数据,Kafka读取数据的客户端
-
消费数据:主要消费的数据是V,在实际项目中V的数据类型为字符串,往往是JSON字符串。
-
Consumer Group:Kafka中必须以消费者组的形式从Kafka中消费数据
-
消费者组(group id)到Kafka消费数据
- 任何一个消费者必须属于某一个消费者组
- 一个消费者组中可以有多个消费者:多个消费者共同并行消费数据,提高消费性能
- 消费者组中多个消费者消费的数据是不一样的
- 整个消费者组中所有消费者消费的数据加在一起是一份完整的数据
Topic:缓存队列,数据主题,用于区分不同的数据,对数据进行分类
- 类似于MySQL中会将数据划分到不同的表:不同的数据存储在不同的表中
- 一个Topic可以划分多个分区Partition,每个不同分区存储在不同的Kafka节点上
- 写入Topic的数据实现分布式存储
- 生产者写入一条KV结构数据,这条数据写入这个Topic的哪个分区由分区规则来决定
- 有多种分区规则:不同场景对应的分区规则不一样
Partition:数据分区,用于实现Topic的分布式存储,对Topic的数据进行划分
- 每个分区存储在不同的Kafka节点Broker上
- 例如上图中:Topic名称为T1,T1有三个分区:P0、P1、P2
- 写入Topic:根据一定的规则决定写入哪个具体的分区
Replication:数据副本,保证数据的安全性
- Kafka每一个分区都可以有多个副本,类似于HDFS的副本机制,一个块构建多个副本
- 注意:Kafka中一个分区的副本个数最多只能等于机器的个数,相同分区的副本不允许放在同一台机器
- Kafka将一个分区的多个副本,划分为两种角色:Leader副本和Follower副本
- Leader副本:负责对外提供读写,可以认为:Master 副本,生产者和消费者只对leader副本进行读写
- Follower副本:与Leader同步数据,如果leader故障,从follower新的leader副本对外提供读写
Kafka | 解释 | HDFS |
---|---|---|
Producer | 生产者,写入数据到Kafka的Topic | 写入数据客户端 |
Consumer | 消费者,消费Kafka的Topic的Partition数据 | 读取数据客户端 |
ConsumerGroup | 消费者组,消费Kafka的Topic | - |
Broker | Kafka节点 | NameNode + DataNode |
Topic | 逻辑数据分类的对象,类似于数据库或者表的概念,Topic是分布式的,一个Topic可以有多个分区 | 文件 |
Partition | 分区结构,物理概念,数据按照写入先后顺序写入分区,一个Topic有多个分区,每个分区有多个副本 | Block |
Replication | 副本机制,通过副本来保证分区数据安全,相同分区的副本不能再同一台机器 | 副本机制 |
-
架构角色**
- Kafka 集群:分布式主从架构,实现消息队列的构建
- Zookeeper 集群:辅助选举Controller、元数据存储
-
Kafka中的每个角色以及对应的功能
-
分布式主从架构,节点:Broker,进程:Kafka
-
主节点:Kafka Controller
- 一种特殊的Broker,从所有Broker中选举出来的
- 负责普通Broker的工作
- 负责管理所有从节点:Topic、分区和副本
- 每次启动集群,会从所有Broker中选举一个Controller,由Zookeeper实现
-
从节点:Kafka Broker
- 对外提供读写请求
- 如果Controller故障,会重新从Broker选举一个新的Controller
-
-
Zookeeper 的功能
- 辅助选举Controller节点
- 存储元数据,比如Brokers信息、topic名称、分区及副本等等
创建topic
/export/server/kafka/bin/kafka-topics.sh --create \
--topic test-topic \
--partitions 3 \
--replication-factor 2 \
--bootstrap-server node1:9092,node2:9092,node3:9092
- –create:创建
- –topic:指定名称
- –partitions :分区个数
- –replication-factor:分区的副本个数
- –bootstrap-server:指定Kafka服务端地址
- –list:列举
列举topic
/export/server/kafka/bin/kafka-topics.sh --list \
--bootstrap-server node1:9092,node2:9092,node3:9092
查看Topic信息
/export/server/kafka/bin/kafka-topics.sh --describe \
--topic test-topic \
--bootstrap-server node1:9092,node2:9092,node3:9092
删除Topic
/export/server/kafka/bin/kafka-topics.sh --delete \
--topic test-topic \
--bootstrap-server node1:9092,node2:9092,node3:9092
生产者与消费者模拟:
# 生产者
/export/server/kafka/bin/kafka-console-producer.sh \
--topic test-topic \
--broker-list node1:9092,node2:9092,node3:9092
# 消费者
/export/server/kafka/bin/kafka-console-consumer.sh \
--topic test-topic \
--bootstrap-server node1:9092,node2:9092,node3:9092 \
--from-beginning
- –from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费
- 如果不指定【–from-beginning】,默认从最新位置开始消费
生产者压力测试
/export/server/kafka/bin/kafka-producer-perf-test.sh \
--topic logs-data \
--num-records 1000000 \
--throughput -1 \
--record-size 1000 \
--producer-props \
bootstrap.servers=node1:9092,node2:9092,node3:9092 \
acks=1
- –num-records:写入数据的条数
- –throughput:是否做限制,-1表示不限制
- –record-size:每条数据的字节大小
消费者压力测试
/export/server/kafka/bin/kafka-consumer-perf-test.sh \
--topic logs-data \
--broker-list node1:9092,node2:9092,node3:9092 \
--fetch-size 1048576 \
--messages 1000000
6.kafka Api
引入依赖
<dependencies>
<!-- Kafka的依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
- 生产者
public class KafkaWriteTest {
public static void main(String[] args) {
// TODO: 1.构建KafkaProducer连接对象
// 1-1. 设置Producer属性
Properties props = new Properties();
// Kafka Brokers地址信息
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
// 写入数据时序列化和反序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 1-2. 传递配置,创建Producer实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// TODO: 2. 构建ProducerRecord记录实例
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test-topic", "hello world");
// TODO: 3. 调用send方法,发送数据至Topic
producer.send(record) ;
// TODO: 4. 关闭资源
producer.close();
}
}
- 消费者
public class KafkaReadTest {
public static void main(String[] args){
// TODO: 1. 构建KafkaConsumer连接对象
// 1-1. 构建Consumer配置信息
Properties props = new Properties();
// 指定服务端地址
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
// 指定当前消费者属于哪个组
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "gid-10001");
// 读取数据对KV进行反序列化
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 1-2. 构建消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// TODO: 2. 消费数据
// 2-1. 设置订阅topic
consumer.subscribe(Collections.singletonList("test-topic"));
// 2-2. 拉取数据
while (true) {
// 拉取订阅topic中数据,设置超时时间
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 循环遍历拉取的数据
for (ConsumerRecord<String, String> record : records) {
String topic = record.topic();
int part = record.partition();
String key = record.key();
String value = record.value();
//模拟处理:输出
System.out.println(topic + "\t" + part + "\t" + key + "\t" + value);
}
}
}
}
二、Kafka原理
1.kafka储存原理
-
kafka集群结构
-
Broker:物理存储节点,用于存储Kafka中Topic队列的每个分区数据
-
Producer:生产者,向Topic队列中写入数据
-
Consumer:消费者,从Topic队列中读取数据
-
Topic:缓冲队列,用于区分不同数据的的存储
-
Partition:Topic 对应子队列,一个topic可以设置为多个partition
- 每个分区存储在Broker节点
- 名称构成:Topic名称+partition分区编号
-
Segment:分区段,每个分区的数据存储在1个或者多个Segment中,每个Segment由一对文件组成
- .log:数据文件,以log日志形式存储
- .index:索引文件,方便快速查询检索
- .timeindex:时间索引文件
step1:生产者发送每一条数据,将数据放入一个batch批次中,如果batch满了或者达到一定的时间,提交写入请求
- 批次大小:batch.size
- 发送消息前等待时间:linger.ms
step2:生产者根据规则确定写入分区Partition,获取对应的元数据,将请求提交给leader副本所在的Broker
-
一个Topic的分区Partition会有多个副本,只向写leader副本
-
从元数据中获取当前这个分区对应的leader副本的位置,提交写入请求
-
Kafka 元数据存储在ZK中
step3:数据先写入Broker的PageCache 页缓存【 *** 作系统级别内存】
- Kafka使用内存机制来实现数据的快速的读写
- 选用 *** 作系统自带的缓存区域:PageCache
- 由 *** 作系统来管理所有内存,即使Kafka Broker故障,数据依旧存在PageCache中
step4: *** 作系统后台,自动将页缓存PageCache中的数据SYNC同步到磁盘文件中:最新Segment的中.log
- Kafka通过 *** 作系统内存刷新调用机制来实现:内存存储容量 + 时间
- 顺序写磁盘:不断将每一条数据追加到.log文件中
step5:其他的Follower 副本到Leader 副本中同步数据
- step1:消费者根据Topic、Partition、Offset提交给Kafka请求读取数据
- step2:Kafka根据元数据信息,找到对应分区Partition对应的Leader副本节点
- step3:请求Leader副本所在的Broker,先读PageCache,通过零拷贝机制【Zero Copy】读取PageCache
- 实现零磁盘读写
- 直接将内存数据发送到网络端口,实现传输
- step4:如果PageCache中没有,读取Segment文件段,先根据offset找到要读取的Segment
- 先根据offset和segment文件段名称定位这个offset在哪个segment文件段中
- step5:将.log文件对应的.index文件加载到内存中,根据.index中索引的信息找到Offset在.log文件中的最近位置
- step6:读取.log,根据索引读取对应Offset的数据
kafka用于消息队列,只对数据进行缓存,其中有自己的数据清理规格
#开启清理
log.cleaner.enable = true
#清理规则
log.cleanup.policy = delete | compact
-
delete
- 基于存活时间规则:
log.retention.ms
log.retention.minutes
log.retention.hours=168/7天 - 基于文件大小规则
删除文件阈值,如果整个数据文件大小,超过阈值的一个segment大小,将会删除最老的segment,-1表示不使用这种规则
log.retention.bytes = -1 - 基于offset消费规则
–offset-json-file
- 基于存活时间规则:
-
compact
也称为压缩,将重复的更新数据的老版本删除,保留新版本,要求每条数据必须要有Key,根据Key来判断是否重复
在kafka中为了保证数据的安全性,对每个分区进行副本机制,其中
- AR:所有副本
- ISR:可用副本
- OSR:不可用副本
- AR = ISR + OSR
其中可用副本和不可用副本的区分是通过副本直接数据的同步差异和同步超时时间来区分的。
不同副本之间的同步数据差异,导致数据的同步不同
-
HW:High Watermark的缩写,俗称高水位
标识一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。
HW = 当前这个分区所有副本同步的最低位置 + 1
消费者能消费到的最大位置 -
LEO:Log End Offset 缩写
标识当前日志文件中下一条待写入消息的offset
LEO = 当前每个副本已经写入数据的最新位置 + 1 -
LSO:LogStartOffset
标识分区的开始offset
推荐一个kafka集群监控工具Kafka Eagle
kafka生产者在生产数据时如何保证数据已经被接收到,这里利用一个ack检验机制,当kafka生产者发送数据后返回一个ack检验
ProducerConfig.ACKS_CONFIG
其有三个数值:
- 0:生产者发送一条数据写入Kafka, 不管Kafka有没有写入这条数据,都直接发送下一条【快,不安全,不用的】
- 1:中和性方案,生产者发送一条数据写入Kafka,Kafka将这条数据写入对应分区Leader副本,就返回一个ack,生产者收到ack,发送下一条【性能和安全性之间做了权衡】
- all/-1:生产者发送一条数据写入Kafka,Kafka将这条数据写入对应分区Leader副本,并且等待所有Follower同步成功,就返回一个ack,生产者收到ack,发送下一条【安全,慢】
同时,在生产者发送数据之后一直没有ack返回可通过重试机制进行数据重新发送
ProducerConfig.RETRIES_CONFIG
这里重试会有一个问题,是否会重复写入数据,在kafka中通过事务幂等性解决,即:在每条数据中增加一个数据id,下一条数据会比上一条数据id多1,Kafka会根据id进行判断是否写入过了。
- 如果没有写入:写入kafka
- 如果已经写入:直接返回ack
kafka生产者的数据往哪个分区进行写入,kafka提供了三种方式:
-
第1、如果发送数据指定partitionId,直接发送给分区
-
第2、如果没有指定,指定Key
默认情况下,依据key生成hash值,对topic分区数取模,确定发送分区 -
第3、如果没有指定key
2.4之前:轮询机制,循环分区发送消息数据- step1:先将数据放入一个批次中,判断是否达到条件,达到条件才将整个批次的数据写入kafka
- step2:根据数据属于哪个分区,就与分区构建一个连接,发送这个分区的批次的数据
2.4开始:粘性机制,先缓存中有没有分区连接,有的话直接使用发送数据,没有的随机分区连接,缓存其中,再发消息数据。
- 判断缓存中是否有这个topic的分区连接,如果有,直接使用;如果没有,随机写入一个分区,并且放入缓存
自定义分区实现Partitioner接口
public class RandomPartitioner implements Partitioner {
/**
* TODO: 依据数据信息,构建分区编号并返回
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取Topic中分区数目
Integer count = cluster.partitionCountForTopic(topic);
// 构建随机数
Random random = new Random() ;
// 随机生成一个分区编号
int partitionId = random.nextInt(count);
// 返回分区编号ID
return partitionId;
}
// 释放资源
@Override
public void close() {
}
// 获取配置
@Override
public void configure(Map<String, ?> configs) {
}
}
分区器的使用:
// 设置分区规则,轮询分区,根据数据,根据分区不断构建连接
// prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
// 设置分区规则,粘性分区,判断缓存中是否有这个topic的分区连接,如果有,直接使用;如果没有,随机写入一个分区,并且放入缓存
// prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,StickyPartitionCache.class.getName());
// 自定义分区
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RandomPartitioner.class.getName());
7.kafka消费者数据精确
kafka消费数据时有两条规范
规则一:一个分区的数据只能由一个消费组中的某一个消费者消费
规则二:一个消费者可以消费多个分区的数据
kafka提供了三种消费策略:
-
RangeAssignor:范围分配,默认的分配策略
org.apache.kafka.clients.consumer.RangeAssignor
-
RoundRobinAssignor:轮询分配,常见于Kafka2.0之前的版本
org.apache.kafka.clients.consumer.RoundRobinAssignor
-
StickyAssignor:黏性分配,Kafka2.0之后建议使用
org.apache.kafka.clients.consumer.StickyAssignor
-
范围分配策略
-
每个消费者消费一定范围的分区,尽量的实现将分区均分给不同的消费者,如果不能均分,优先将分区分配给编号小的消费者
1个topic7个分区,1个消费组3个消费者
c1: t0, t1, t2 c2: t3, t4 c3: t5, t6 -
优点:如果Topic的个数比较少,分配会相对比较均衡
-
缺点:如果Topic的个数比较多,而且不能均分,导致负载不均衡问题
-
应用:Topic个数少或者每个Topic都均分的场景
-
-
轮询分配策略
按照名称和分区编号自然升序排序1个topic7个分区,1个消费组3个消费者 c1: t0, t3, t6 c2: t1, t5 c3: t2, t5
- 优点:如果有多个消费者,消费的Topic都是一样的,实现将所有Topic的所有分区轮询分配给所有消费者,尽量的实现负载的均衡
- 缺点:遇到消费者订阅的Topic是不一致的,不同的消费者订阅不同Topic,只能基于订阅的消费者进行轮询分配,导致整体消费者负载不均衡的
- 应用:所有消费者都订阅共同的Topic,能实现让所有Topic的分区轮询分配所有的消费者
-
粘性分配策略: 随机性
1个topic7个分区,1个消费组3个消费者
c1: t0, t4, t5 c2: t1, t2 c3: t3, t6
当某个消费者挂掉,其消费分区平方其他消费者
c1: t0, t4, t5, t3 c2: t1, t2, t6- 黏性分配的规则:类似于轮询分配,尽量的将分区均衡的分配给消费者
- 黏性分配的特点
- 相对的保证的分配的均衡
- 如果某个消费者故障,尽量的避免网络传输
- 尽量保证原来的消费的分区不变,将多出来分区均衡给剩余的消费者
在消费者消费分区数据过程中可能出现两种问题,一个是数据的重复读,一种是数据的少读
kafka通过对offset最大偏移量的持久化解决问题,当消费者消费队列topic中数据时,需要记录每次消费偏移量,以便下一次继消费数据
- 第1、消费组CG第一次消费数据,依据属性:auto.offset.reset值,消费topic各个分区数据
- 默认属性值:latest,最大偏移量开始消费
- auto.offset.reset = latest | earliest | none
- 第2、不是第一次消费数据,每次消费偏移量存储在内存中
当再次消费数据时,依据内存中偏移量继续消费数据 - 第3、当程序宕机,重启以后,如果需要继续之前偏移量消费数据,需要保存每次消费偏移量数据
- a. 自动提交偏移量
Kafka默认机制,将消费偏移量数据存储到topic: __consumer_offsets
enable.auto.commit=true
auto.commit.interval.ms=5000- 问题:
出现数据被重复消费,数据丢失
- 问题:
- b. 手动提交topic偏移量
当消费topic中数据,并且处理完成后,手动提交偏移量信息
enable.auto.commit=false
kafkaConsumer.commitcommitSync()- 问题:
数据被重复消费,某些分区数据消费并处理成功,但是某个分区数据处理失败
- 问题:
- c. 手动提交Partition偏移量
每个分区数据消费并处理完成后,手动提交偏移量
获取每个分区数据,并且处理完成后,手动提交偏移量
enable.auto.commit=false
kafkaConsumer.commitSync(offsets)
- a. 自动提交偏移量
public class KafkaReadCommitPartitionOffsetTest {
public static void main(String[] args) {
// TODO: 1. 构建KafkaConsumer连接对象
Properties props = new Properties() ;
// 指定Kafka 服务集群地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
// 指定消费数据key和value类型,此处从topic文件中读数据,反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 指定消费组ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "gid-1001") ;
// todo: 第一次消费起始位置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") ;
// todo: 关闭自动提交偏移
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 传递参数,创建连接
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props) ;
// TODO: 2. 指定topic名称,消费数据
// 设置消费topic名称,可以时多个队列
kafkaConsumer.subscribe(Collections.singletonList("test-topic"));
// 拉取数据,一直消费数据
while (true){
// 设置超时时间,可以理解为每隔多久拉取一次数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
// 第1步、获取拉取数据的分区信息
Set<TopicPartition> partitions = records.partitions();
// 第2步、获取每个分区数据,进行消费
for (TopicPartition partition: partitions){
// 分区数据
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// 遍历消费
long consumerOffset = 0 ;
for (ConsumerRecord<String, String> record : partitionRecords) {
String topic = record.topic();
int part = record.partition();
long offset = record.offset();
String key = record.key();
String value = record.value();
//模拟处理:输出
System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);
// 记录消费偏移量
consumerOffset = offset ;
}
// TODO: 每个分区数据处理完成,手动提交offset
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(partition, new OffsetAndMetadata(consumerOffset+1)) ;
kafkaConsumer.commitSync(offsets);
}
}
// TODO: 4. 关闭连接
//kafkaConsumer.close();
}
}
消费者消费Kafka队列数据,设置三种方式
- 第1种:订阅topic消费数据
- 第2种:设置topic和分区
- 第3中:依据topic、partition和offset拉取数据,指定具体偏移量
从Kafka队列Topic中消费数据
- 语义:精确一次性语义、至少一次性语义、最多一次性语义
可以将消费者的offset存储在外部数据库中,如:mysql、redis,重启之后从其中获取offset,通过依据topic、partition和offset拉取数据,继续拉取数据
Kafka基础内容。
时光如水,人生逆旅矣。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)