消息可靠性的保证基本上我们都要从3个方面来阐述(这样才比较全面,无懈可击)
- 生产者发送消息丢失
- kafka自身消息丢失
- 消费者消息丢失
kafka支持3种方式发送消息,这也是常规的3种方式,发送后不管结果、同步发送、异步发送,基本上所有的消息队列都是这样玩的。
- 发送并忘记,直接调用发送send方法,不管结果,虽然可以开启自动重试,但是肯定会有消息丢失的可能
- 同步发送,同步发送返回Future对象,我们可以知道发送结果,然后进行处理
- 异步发送,发送消息,同时指定一个回调函数,根据结果进行相应的处理
为了保险起见,一般我们都会使用异步发送带有回调的方式进行发送消息,再设置参数为发送消息失败不停地重试。
acks=all,这个参数有可以配置0|1|all。
0表示生产者写入消息不管服务器的响应,可能消息还在网络缓冲区,服务器根本没有收到消息,当然会丢失消息。
1表示至少有一个副本收到消息才认为成功,一个副本那肯定就是集群的Leader副本了,但是如果刚好Leader副本所在的节点挂了,Follower没有同步这条消息,消息仍然丢失了。
配置all的话表示所有ISR都写入成功才算成功,那除非所有ISR里的副本全挂了,消息才会丢失。
retries=N,设置一个非常大的值,可以让生产者发送消息失败后不停重试
kafka自身消息丢失kafka因为消息写入是通过PageCache异步写入磁盘的,因此仍然存在丢失消息的可能。
因此针对kafka自身丢失的可能设置参数:
replication.factor=N,设置一个比较大的值,保证至少有2个或者以上的副本。
min.insync.replicas=N,代表消息如何才能被认为是写入成功,设置大于1的数,保证至少写入1个或者以上的副本才算写入消息成功。
unclean.leader.election.enable=false,这个设置意味着没有完全同步的分区副本不能成为Leader副本,如果是true的话,那些没有完全同步Leader的副本成为Leader之后,就会有消息丢失的风险。
消费者消息丢失消费者丢失的可能就比较简单,关闭自动提交位移即可,改为业务处理成功手动提交。
因为重平衡发生的时候,消费者会去读取上一次提交的偏移量,自动提交默认是每5秒一次,这会导致重复消费或者丢失消息。
enable.auto.commit=false,设置为手动提交。
还有一个参数我们可能也需要考虑进去的:
auto.offset.reset=earliest,这个参数代表没有偏移量可以提交或者broker上不存在偏移量的时候,消费者如何处理。earliest代表从分区的开始位置读取,可能会重复读取消息,但是不会丢失,消费方一般我们肯定要自己保证幂等,另外一种latest表示从分区末尾读取,那就会有概率丢失消息。
综合这几个参数设置,我们就能保证消息不会丢失,保证了可靠性。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)