- 1. 生产者消息发送流程
- 1.1 发送原理
- 1.2 生产者重要参数列表
- 2. 生产者分区
- 2.1 分区的优点
- 2.2 生产者发送消息的分区策略
- 3. 生产者吞吐量与数据可靠性
- 3.1 吞吐量
- 3.2 数据可靠性
- 4. 生产者数据幂等性与事务
- 4.1 幂等性
- 4.2 事务
- 5. 生产者的数据有序与乱序
在消息发送的过程中,涉及到两个线程——main线程和sender线程。在main线程中创建了一个双端队列RecordAccumulator,main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator 中拉取消息发送到kafka broker中
1.2 生产者重要参数列表参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的broker地址清单。可以设置 1 个或者多个,中间用逗号隔开。 注意:这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名 |
buffer.memory | RecordAccumulator 缓冲区总大小,默认 32m |
batch.size | 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 收到数据后应答。 -1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4 和 zstd |
- 便于合理的使用存储资源。每个partition在一个broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台broker上。合理控制分区的任务,可以实现负载均衡的效果。
- 提高并行度。生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费数据。
-
默认分区器DefaultPartitioner
(1)指明partition的情况下,直接将指明的值作为partition值;
(2)没有指明partition值,但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
(3)既没有partition值,又没有key值的情况下,kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能使用该分区,等该分区的batch已满或已完成,kafka再随机一个分区进行使用(和上次的分区不同)
例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16K)或者linger.ms设置的时间到了,kafka再随机一个分区进行使用(如果还是0会继续随机) -
自定义分区器
研发人员可以根据企业需求,自己重新实现分区器
(1)定义类实现Partitioner接口
(2)重写partition()方法
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
/**
* 尚硅谷大数据技术之 Kafka
* —————————————————————————————
* 返回信息对应的分区
*
* @param topic 主题
* @param key 消息的 key
* @param keyBytes 消息的 key 序列化后的字节数组
* @param value 消息的 value
* @param valueBytes 消息的 value 序列化后的字节数组
* @param cluster 集群元数据可以查看分区信息
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取消息
String msgValue = value.toString();
// 创建 partition
int partition;
// 判断消息是否包含 atguigu
if (msgValue.contains("atguigu")) {
partition = 0;
} else {
partition = 1;
}
// 返回分区号
return partition;
}
// 关闭资源
@Override
public void close() {
}
// 配置方法
@Override
public void configure(Map<String, ?> configs) {
}
}
(3)在生产者的配置中,添加该分区器参数
3. 生产者吞吐量与数据可靠性 3.1 吞吐量(1)batch.size:批次大小,默认16K
(2)linger.ms:等待时间,修改为5~100ms。默认0ms
(3)compression.type:压缩snappy。默认none
(4)RecordAccumulator:缓冲区大小,修改为64M,默认32M
-
kafka如何处理某个follower故障,不能与leader进行数据同步的问题?
leader维护一个动态的in-sync replica set(ISR),意为和leader保持同步的folloer+leader集合(leader: 0,isr:0,1,2)
如果follower长时间没有向leader发送通信请求或者数据同步,则该follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s -
数据完全可靠的条件
ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2 -
可靠性总结
(1)acks = 0,生产者发送过来数据就不管了,可靠性差,效率高。生产环境中很少使用;
(2)acks = 1,生产者发送过来数据leader应答,可靠性中等,效率中等。生产环境中多用于传输普通日志,允许丢个别数量;
(3)ack = -1,生产者发送过来数据leader和ISR队列里面所有follower应答,可靠性高,效率低。生产环境一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
(1)当ack = 0时:数据最多传递一次,可以保证数据不重复,但是不能保证数据不丢失;
(2)当ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2:数据至少传第一次,可以保证数据不丢失,但是不能保证数据不重复。
(3)对于非常重要的信息,例如钱,要求数据既不能重复也不能丢失。在kafka 0.11之后,引入了一项重大特性:幂等性和事务
- 幂等性就是指producer不论向Broker发送多少次重复数据,Broker都只会持久化一条数据,保证了不重复。
- 精确一次 = 幂等性 + 至少一次(ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2)
重复数据的判断标准:
具有有
所以幂等性只能保证的是在单分区单会话内不重复
- 幂等性开启参数 enable.idempotence 默认为 true,false 关闭
- 开启事务,就必须开启幂等性
- kafka事务一共有5个api
//1、初始化事务
void initTransactions();
//2、开启事务
void beginTransaction() throws ProducerFencedException;
//3、在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
//4、提交事务
void commitTransaction() throws ProducerFencedException;
//5、放弃事务(类似于回滚事务的 *** 作)
void abortTransaction() throws ProducerFencedException;
5. 生产者的数据有序与乱序
- 单分区内有序
(1)在kafka 1.X版本之前保证单分区有序,条件如下
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)
(2)在kafka 1.X及以后保证单分区有序,条件如下
未开启幂等性
max.in.flight.requests.per.connection需要设置为1
开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的
- 多分区内无序。如果要让多分区也有序,可以在消费者处添加中间件存储数据
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)