Broker丢失消息是由于Kafka本身的原因造成的,kafka为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。消息的刷盘过程,为了提高性能,减少刷盘次数,kafka采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。这种机制也是由于linux *** 作系统决定的。将数据存储到linux *** 作系统种,会先存储到页缓存(Page cache)中,按照时间或者其他条件进行刷盘(从page cache到file),或者通过fsync命令强制刷盘。数据在page cache中时,如果系统挂掉,数据会丢失。
Broker在linux服务器上高速读写以及同步到Replica
上图简述了broker写数据以及同步的一个过程。broker写数据只写到PageCache中,而pageCache位于内存。这部分数据在断电后是会丢失的。pageCache的数据通过linux的flusher程序进行刷盘。刷盘触发条件有三:
Broker配置刷盘机制,是通过调用fsync函数接管了刷盘动作。从单个Broker来看,pageCache的数据会丢失。
Kafka没有提供同步刷盘的方式。同步刷盘在RocketMQ中有实现,实现原理是将异步刷盘的流程进行阻塞,等待响应,类似ajax的callback或者是java的future。下面是一段rocketmq的源码。
也就是说,理论上,要完全让kafka保证单个broker不丢失消息是做不到的,只能通过调整刷盘机制的参数缓解该情况。比如,减少刷盘间隔,减少刷盘数据量大小。时间越短,性能越差,可靠性越好(尽可能可靠)。这是一个选择题。
为了解决该问题,kafka通过producer和broker协同处理单个broker丢失参数的情况。一旦producer发现broker消息丢失,即可自动进行retry。除非retry次数超过阀值(可配置),消息才会丢失。此时需要生产者客户端手动处理该情况。那么producer是如何检测到数据丢失的呢?是通过ack机制,类似于http的三次握手的方式。
以上的引用是kafka官方对于参数 acks 的解释(在老版本中,该参数是 request.required.acks )。
上面第三点提到了ISR的列表的follower,需要配合另一个参数才能更好的保证ack的有效性。ISR是Broker维护的一个“可靠的follower列表”,in-sync Replica列表,broker的配置包含一个参数: min.insync.replicas 。该参数表示ISR中最少的副本数。如果不设置该值,ISR中的follower列表可能为空。此时相当于acks=1。
如上图中:
性能依次递减,可靠性依次升高。
Producer丢失消息,发生在生产者客户端。
为了提升效率,减少IO,producer在发送数据时可以将多个请求进行合并后发送。被合并的请求咋发送一线缓存在本地buffer中。缓存的方式和前文提到的刷盘类似,producer可以将请求打包成“块”或者按照时间间隔,将buffer中的数据发出。通过buffer我们可以将生产者改造为异步的方式,而这可以提升我们的发送效率。
但是,buffer中的数据就是危险的。在正常情况下,客户端的异步调用可以通过callback来处理消息发送失败或者超时的情况,但是,一旦producer被非法的停止了,那么buffer中的数据将丢失,broker将无法收到该部分数据。又或者,当Producer客户端内存不够时,如果采取的策略是丢弃消息(另一种策略是block阻塞),消息也会被丢失。抑或,消息产生(异步产生)过快,导致挂起线程过多,内存不足,导致程序崩溃,消息丢失。
producer
根据上图,可以想到几个解决的思路:
Consumer消费消息有下面几个步骤:
Consumer的消费方式主要分为两种:
上面的示例是自动提交的例子。如果此时,insertIntoDB(record)发生异常,消息将会出现丢失。接下来是手动提交的例子:
将提交类型改为手动以后,可以保证消息“至少被消费一次”(at least once)。但此时可能出现重复消费的情况,重复消费不属于本篇讨论范围。
上面两个例子,是直接使用Consumer的High level API,客户端对于offset等控制是透明的。也可以采用Low level API的方式,手动控制offset,也可以保证消息不丢,不过会更加复杂。
publicstaticvoidconsumer(){Propertiesprops=newProperties()props.put("zk.connect","hadoop-2:2181")props.put("zk.connectiontimeout.ms","1000000")props.put("groupid","fans_group")//CreatetheconnectiontotheclusterConsumerConfigconsumerConfig=newConsumerConfig(props)ConsumerConnectorconsumerConnector=Consumer.createJavaConsumerConnector(consumerConfig)Mapmap=newHashMap()map.put("fans",1)//create4partitionsofthestreamfortopic“test”,toallow4threadstoconsumeMap>>topicMessageStreams=consumerConnector.createMessageStreams(map)List>streams=topicMessageStreams.get("fans")//createlistof4threadstoconsumefromeachofthepartitionsExecutorServiceexecutor=Executors.newFixedThreadPool(1)longstartTime=System.currentTimeMillis()//consumethemessagesinthethreadsfor(finalKafkaStreamstream:streams){executor.submit(newRunnable(){publicvoidrun(){ConsumerIteratorit=stream.iterator()while(it.hasNext()){log.debug(byteBufferToString(it.next().message().payload()))}}})log.debug("usetime="+(System.currentTimeMillis()-startTime))}}欢迎分享,转载请注明来源:内存溢出
评论列表(0条)