那么我们进行抽象,大致可以得到这两个类。
另外Kafka为了表现以下封装的特性,把准备生产者的参数配成了一个Properties类,
以这个类为KafkaProducer构造函数入参。
那么KafkaProducer的参数具体可以配置什么呢?
由123步可知,可以配置拦截器,序列化器,分区器。
这些都可以自己实现特定接口(ProducerInterceptor,Serializer,Partioner),
然后放到Properties里面,最后给KafkaProducer
拦截器就是对ProducerRecord做一些处理,然后返回处理过的新的ProducerRecord(自定义拦截策略)
序列化器是要讲java对象转成byte[]数组然后进行网络传输(自己定义序列化策略)
分区器就是为消息选择分区(这里自己可以设计分区策略)
再次回到这张图
可以看到,有两个线程在完成消息的发送,一个是主线程,一个是Sender线程。
主线程经过123步后,会将同一个partition的多个Record封装(压缩)到一个ProducerBatch对象中,
这样的目的是方便传输,提高效率,RecordAccumulator里面维持着一个双端ProducerBatch队列数组,
然后Sender线程从队头取ProducerBatch封装成Request,这里设计到一个逻辑到物理的转换。
分区是逻辑的,而broker才是物理的,一个区对应一个broker,所以要转换。
另外Sender线程里面维持了一个以Nodeid(就是对应broker)为Key,Deque<Request>为值的Map,
这里面的Request指的是那种没有Response的Request。一旦有了Response就会清理掉的。
这个是由通过leastLoadedNode节点实现的,不多说了。
其实除了123步中的参数,还有其它参数,这里就说一个
ack
acks=1。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。
acks=0。生产者发送消息之后不需要等待任何服务端的响应。
acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。
首先明确说明Kafka不是数据库,它没有schema,也没有表,更没有索引。
1.它仅仅是生产消息流、消费消息流而已。从这个角度来说Kafka的确不像数据库,至少不像我们熟知的关系型数据库。
那么到底什么是数据库呢?或者说什么特性使得一个系统可以被称为数据库?经典的教科书是这么说的:数据库是提供 ACID 特性的,我们依次讨论下ACID。
1、持久性(durability)
我们先从最容易的持久性开始说起,因为持久性最容易理解。在80年代持久性指的是把数据写入到磁带中,这是一种很古老的存储设备,现在应该已经绝迹了。目前实现持久性更常见的做法是将数据写入到物理磁盘上,而这也只能实现单机的持久性。当演进到分布式系统时代后,持久性指的是将数据通过备份机制拷贝到多台机器的磁盘上。很多数据库厂商都有自己的分布式系统解决方案,如GreenPlum和Oracle RAC。它们都提供了这种多机备份的持久性。和它们类似,Apache Kafka天然也是支持这种持久性的,它提供的副本机制在实现原理上几乎和数据库厂商的方案是一样的。
2、原子性(atomicity)
数据库中的原子性和多线程领域内的原子性不是一回事。我们知道在Java中有AtomicInteger这样的类能够提供线程安全的整数 *** 作服务,这里的atomicity关心的是在多个线程并发的情况下如何保证正确性的问题。而在数据库领域,原子性关心的是如何应对错误或异常情况,特别是对于事务的处理。如果服务发生故障,之前提交的事务要保证已经持久化,而当前运行的事务要终止(abort),它执行的所有 *** 作都要回滚,最终的状态就好像该事务从未运行过那样。举个实际的例子,
第三个方法是采用基于日志结构的消息队列来实现,比如使用Kafka来做,如下图所示:
在这个架构中app仅仅是向Kafka写入消息,而下面的数据库、cache和index作为独立的consumer消费这个日志——Kafka分区的顺序性保证了app端更新 *** 作的顺序性。如果某个consumer消费速度慢于其他consumer也没关系,毕竟消息依然在Kafka中保存着。总而言之,有了Kafka所有的异质系统都能以相同的顺序应用app端的更新 *** 作,
3、隔离性(isolation)
在传统的关系型数据库中最强的隔离级别通常是指serializability,国内一般翻译成可串行化或串行化。表达的思想就是连接数据库的每个客户端在执行各自的事务时数据库会给它们一个假象:仿佛每个客户端的事务都顺序执行的,即执行完一个事务之后再开始执行下一个事务。其实数据库端同时会处理多个事务,但serializability保证了它们就像单独执行一样。举个例子,在一个论坛系统中,每个新用户都需要注册一个唯一的用户名。一个简单的app实现逻辑大概是这样的:
4、一致性(consistency)
最后说说一致性。按照Kelppmann大神的原话,这是一个很奇怪的属性:在所有ACID特性中,其他三项特性的确属于数据库层面需要实现或保证的,但只有一致性是由用户来保证的。严格来说,它不属于数据库的特性,而应该属于使用数据库的一种方式。坦率说第一次听到这句话时我本人还是有点震惊的,因为从没有往这个方面考虑过,但仔细想想还真是这么回事。比如刚才的注册用户名的例子中我们要求每个用户名是唯一的。这种一致性约束是由我们用户做出的,而不是数据库本身。数据库本身并不关心或并不知道用户名是否应该是唯一的。针对Kafka而言,这种一致性又意味着什么呢?Kelppmann没有具体展开,
希望能帮到你,谢谢!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)