当consumer group的状态发生变化(如有consumer故障、增减consumer成员等)或consumer group消费的topic状态发生变化(如增加了partition,消费的topic发生变化),kafka集群会自动调整和重新分配consumer消费的partition,这个过程就叫做rebalance(再平衡)。
__consumer_offsets是kafka集群自己维护的一个特殊的topic,它里面存储的是每个consumer group已经消费了每个topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id组成,格式为 {topic name}-${partition id},value值就是consumer提交的已消费的topic partition offset值。__consumer_offsets的分区数和副本数分别由offsetstopicnumpartitions(默认值为50)和offsetstopicreplicationfactor(默认值为1)参数配置。我们通过公式 hash(group id) % offsetstopicnumpartitions 就可以计算出指定consumer group的已提交offset存储的partition。由于consumer group提交的offset消息只有最后一条消息有意义,所以__consumer_offsets是一个compact topic,kafka集群会周期性的对__consumer_offsets执行compact *** 作,只保留最新的一次提交offset。
group coordinator运行在kafka某个broker上,负责consumer group内所有的consumer成员管理、所有的消费的topic的partition的消费关系分配、offset管理、触发rebalance等功能。group coordinator管理partition分配时,会指定consumer group内某个consumer作为group leader执行具体的partition分配任务。存储某个consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是该consumer group的协调器运行的broker。
跟大多数分布式系统一样,集群有一个master角色管理整个集群,协调集群中各个成员的行为。kafka集群中的controller就相当于其它分布式系统的master,用来负责集群topic的分区分配,分区leader选举以及维护集群的所有partition的ISR等集群协调功能。集群中哪个borker是controller也是通过一致性协议选举产生的,28版本之前通过zookeeper进行选主,28版本后通过kafka raft协议进行选举。如果controller崩溃,集群会重新选举一个broker作为新的controller,并增加controller epoch值(相当于zookeeper ZAB协议的epoch,raft协议的term值)
当kafka集群新建了topic或为一个topic新增了partition,controller需要为这些新增加的partition分配到具体的broker上,并把分配结果记录下来,供producer和consumer查询获取。
因为只有partition的leader副本才会处理producer和consumer的读写请求,而partition的其他follower副本需要从相应的leader副本同步消息,为了尽量保证集群中所有broker的负载是均衡的,controller在进行集群全局partition副本分配时需要使partition的分布情况是如下这样的:
在默认情况下,kafka采用轮询(round-robin)的方式分配partition副本。由于partition leader副本承担的流量比follower副本大,kafka会先分配所有topic的partition leader副本,使所有partition leader副本全局尽量平衡,然后再分配各个partition的follower副本。partition第一个follower副本的位置是相应leader副本的下一个可用broker,后面的副本位置依此类推。
举例来说,假设我们有两个topic,每个topic有两个partition,每个partition有两个副本,这些副本分别标记为1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(编码格式为topic-partition-replia,编号均从1开始,第一个replica是leader replica,其他的是follower replica)。共有四个broker,编号是1-4。我们先对broker按broker id进行排序,然后分配leader副本,最后分配foller副本。
1)没有配置brokerrack的情况
现将副本1-1-1分配到broker 1,然后1-2-1分配到broker 2,依此类推,2-2-1会分配到broker 4。partition 1-1的leader副本分配在broker 1上,那么下一个可用节点是broker 2,所以将副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那么下一个可用节点是broker 3,所以将副本1-1-2分配到broker 3上。依此类推分配其他的副本分片。最后分配的结果如下图所示:
2)配置了brokerrack的情况
假设配置了两个rack,broker 1和broker 2属于Rack 1,broker 3和broker 4属于Rack 2。我们对rack和rack内的broker分别排序。然后先将副本1-1-1分配到Rack 1的broker 1,然后将副本1-2-1分配到下一个Rack的第一个broker,即Rack 2的broker 3。其他的parttition leader副本依此类推。然后分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一个可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此类推。最后分配的结果如下图所示:
kafka除了按照集群情况自动分配副本,也提供了reassign工具人工分配和迁移副本到指定broker,这样用户可以根据集群实际的状态和各partition的流量情况分配副本
kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本。在topic的partition创建时,controller首先分配的副本就是leader副本,这个副本又叫做preference leader副本。
当leader副本所在broker失效时(宕机或网络分区等),controller需要为在该broker上的有leader副本的所有partition重新选择一个leader,选择方法就是在该partition的ISR中选择第一个副本作为新的leader副本。但是,如果ISR成员只有一个,就是失效的leader自身,其余的副本都落后于leader怎么办?kafka提供了一个uncleanleaderelection配置参数,它的默认值为true。当uncleanleaderelection值为true时,controller还是会在非ISR副本中选择一个作为leader,但是这时候使用者需要承担数据丢失和数据不一致的风险。当uncleanleaderelection值为false时,则不会选择新的leader,该partition处于不可用状态,只能恢复失效的leader使partition重新变为可用。
当preference leader失效后,controller重新选择一个新的leader,但是preference leader又恢复了,而且同步上了新的leader,是ISR的成员,这时候preference leader仍然会成为实际的leader,原先的新leader变为follower。因为在partition leader初始分配时,使按照集群副本均衡规则进行分配的,这样做可以让集群尽量保持平衡。
为了保证topic的高可用,topic的partition往往有多个副本,所有的follower副本像普通的consumer一样不断地从相应的leader副本pull消息。每个partition的leader副本会维护一个ISR列表存储到集群信息库里,follower副本成为ISR成员或者说与leader是同步的,需要满足以下条件:
1)follower副本处于活跃状态,与zookeeper(28之前版本)或kafka raft master之间的心跳正常
2)follower副本最近replicalagtimemaxms(默认是10秒)时间内从leader同步过最新消息。需要注意的是,一定要拉取到最新消息,如果最近replicalagtimemaxms时间内拉取过消息,但不是最新的,比如落后follower在追赶leader过程中,也不会成为ISR。
follower在同步leader过程中,follower和leader都会维护几个参数,来表示他们之间的同步情况。leader和follower都会为自己的消息队列维护LEO(Last End Offset)和HW(High Watermark)。leader还会为每一个follower维护一个LEO。LEO表示leader或follower队列写入的最后一条消息的offset。HW表示的offset对应的消息写入了所有的ISR。当leader发现所有follower的LEO的最小值大于HW时,则会增加HW值到这个最小值LEO。follower拉取leader的消息时,同时能获取到leader维护的HW值,如果follower发现自己维护的HW值小于leader发送过来的HW值,也会增加本地的HW值到leader的HW值。这样我们可以得到一个不等式: follower HW <= leader HW <= follower LEO <= leader LEO 。HW对应的log又叫做committed log,consumer消费partititon的消息时,只能消费到offset值小于或等于HW值的消息的,由于这个原因,kafka系统又称为分布式committed log消息系统。
kafka的消息内容存储在logdirs参数配置的目录下。kafka每个partition的数据存放在本地磁盘logdirs目录下的一个单独的目录下,目录命名规范为 ${topicName}-${partitionId} ,每个partition由多个LogSegment组成,每个LogSegment由一个数据文件(命名规范为: {baseOffset}index)和一个时间戳索引文件(命名规范为:${baseOffset}timeindex)组成,文件名的baseOffset就是相应LogSegment中第一条消息的offset。index文件存储的是消息的offset到该消息在相应log文件中的偏移,便于快速在log文件中快速找到指定offset的消息。index是一个稀疏索引,每隔一定间隔大小的offset才会建立相应的索引(比如每间隔10条消息建立一个索引)。timeindex也是一个稀疏索引文件,这样可以根据消息的时间找到对应的消息。
可以考虑将消息日志存放到多个磁盘中,这样多个磁盘可以并发访问,增加消息读写的吞吐量。这种情况下,logdirs配置的是一个目录列表,kafka会根据每个目录下partition的数量,将新分配的partition放到partition数最少的目录下。如果我们新增了一个磁盘,你会发现新分配的partition都出现在新增的磁盘上。
kafka提供了两个参数logsegmentbytes和logsegmentms来控制LogSegment文件的大小。logsegmentbytes默认值是1GB,当LogSegment大小达到logsegmentbytes规定的阈值时,kafka会关闭当前LogSegment,生成一个新的LogSegment供消息写入,当前供消息写入的LogSegment称为活跃(Active)LogSegment。logsegmentms表示最大多长时间会生成一个新的LogSegment,logsegmentms没有默认值。当这两个参数都配置了值,kafka看哪个阈值先达到,触发生成新的LogSegment。
kafka还提供了logretentionms和logretentionbytes两个参数来控制消息的保留时间。当消息的时间超过了logretentionms配置的阈值(默认是168小时,也就是一周),则会被认为是过期的,会被kafka自动删除。或者是partition的总的消息大小超过了logretentionbytes配置的阈值时,最老的消息也会被kafka自动删除,使相应partition保留的总消息大小维持在logretentionbytes阈值以下。这个地方需要注意的是,kafka并不是以消息为粒度进行删除的,而是以LogSegment为粒度删除的。也就是说,只有当一个LogSegment的最后一条消息的时间超过logretentionms阈值时,该LogSegment才会被删除。这两个参数都配置了值时,也是只要有一个先达到阈值,就会执行相应的删除策略
当我们使用KafkaProducer向kafka发送消息时非常简单,只要构造一个包含消息key、value、接收topic信息的ProducerRecord对象就可以通过KafkaProducer的send()向kafka发送消息了,而且是线程安全的。KafkaProducer支持通过三种消息发送方式
KafkaProducer客户端虽然使用简单,但是一条消息从客户端到topic partition的日志文件,中间需要经历许多的处理过程。KafkaProducer的内部结构如下所示:
从图中可以看出,消息的发送涉及两类线程,一类是调用KafkaProducersend()方法的应用程序线程,因为KafkaProducersend()是多线程安全的,所以这样的线程可以有多个;另一类是与kafka集群通信,实际将消息发送给kafka集群的Sender线程,当我们创建一个KafkaProducer实例时,会创建一个Sender线程,通过该KafkaProducer实例发送的所有消息最终通过该Sender线程发送出去。RecordAccumulator则是一个消息队列,是应用程序线程与Sender线程之间消息传递的桥梁。当我们调用KafkaProducersend()方法时,消息并没有直接发送出去,只是写入了RecordAccumulator中相应的队列中,最终需要Sender线程在适当的时机将消息从RecordAccumulator队列取出来发送给kafka集群。
消息的发送过程如下:
在使用KafkaConsumer实例消费kafka消息时,有一个特性我们要特别注意,就是KafkaConsumer不是多线程安全的,KafkaConsumer方法都在调用KafkaConsumer的应用程序线程中运行(除了consumer向kafka集群发送的心跳,心跳在一个专门的单独线程中发送),所以我们调用KafkaConsumer的所有方法均需要保证在同一个线程中调用,除了KafkaConsumerwakeup()方法,它设计用来通过其它线程向consumer线程发送信号,从而终止consumer执行。
跟producer一样,consumer要与kafka集群通信,消费kafka消息,首先需要获取消费的topic partition leader replica所在的broker地址等信息,这些信息可以通过向kafka集群任意broker发送Metadata请求消息获取。
我们知道,一个consumer group有多个consumer,一个topic有多个partition,而且topic的partition在同一时刻只能被consumer group内的一个consumer消费,那么consumer在消费partition消息前需要先确定消费topic的哪个partition。partition的分配通过group coordinator来实现。基本过程如下:
我们可以通过实现接口orgapachekafkaclientsconsumerinternalsPartitionAssignor自定义partition分配策略,但是kafka已经提供了三种分配策略可以直接使用。
partition分配完后,每个consumer知道了自己消费的topic partition,通过metadata请求可以获取相应partition的leader副本所在的broker信息,然后就可以向broker poll消息了。但是consumer从哪个offset开始poll消息?所以consumer在第一次向broker发送FetchRequest poll消息之前需要向Group Coordinator发送OffsetFetchRequest获取消费消息的起始位置。Group Coordinator会通过key {topic}-${partition}查询 __consumer_offsets topic中是否有offset的有效记录,如果存在,则将consumer所属consumer group最近已提交的offset返回给consumer。如果没有(可能是该partition是第一次分配给该consumer group消费,也可能是该partition长时间没有被该consumer group消费),则根据consumer配置参数autooffsetreset值确定consumer消费的其实offset。如果autooffsetreset值为latest,表示从partition的末尾开始消费,如果值为earliest,则从partition的起始位置开始消费。当然,consumer也可以随时通过KafkaConsumerseek()方法人工设置消费的起始offset。
kafka broker在收到FetchRequest请求后,会使用请求中topic partition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值)确定消息所属的LogSegment,然后继续查LogSegment的稀疏索引表(存储在index文件中),确定offset对应的消息在LogSegment文件中的位置。为了提升消息消费的效率,consumer通过参数fetchminbytes和maxpartitionfetchbytes告诉broker每次拉取的消息总的最小值和每个partition的最大值(consumer一次会拉取多个partition的消息)。当kafka中消息较少时,为了让broker及时将消息返回给consumer,consumer通过参数fetchmaxwaitms告诉broker即使消息大小没有达到fetchminbytes值,在收到请求后最多等待fetchmaxwaitms时间后,也将当前消息返回给consumer。fetchminbytes默认值为1MB,待fetchmaxwaitms默认值为500ms。
为了提升消息的传输效率,kafka采用零拷贝技术让内核通过DMA把磁盘中的消息读出来直接发送到网络上。因为kafka写入消息时将消息写入内存中就返回了,如果consumer跟上了producer的写入速度,拉取消息时不需要读磁盘,直接从内存获取消息发送出去就可以了。
为了避免发生再平衡后,consumer重复拉取消息,consumer需要将已经消费完的消息的offset提交给group coordinator。这样发生再平衡后,consumer可以从上次已提交offset出继续拉取消息。
kafka提供了多种offset提交方式
partition offset提交和管理对kafka消息系统效率来说非常关键,它直接影响了再平衡后consumer是否会重复拉取消息以及重复拉取消息的数量。如果offset提交的比较频繁,会增加consumer和kafka broker的消息处理负载,降低消息处理效率;如果offset提交的间隔比较大,再平衡后重复拉取的消息就会比较多。还有比较重要的一点是,kafka只是简单的记录每次提交的offset值,把最后一次提交的offset值作为最新的已提交offset值,作为再平衡后消息的起始offset,而什么时候提交offset,每次提交的offset值具体是多少,kafka几乎不关心(这个offset对应的消息应该存储在kafka中,否则是无效的offset),所以应用程序可以先提交3000,然后提交2000,再平衡后从2000处开始消费,决定权完全在consumer这边。
kafka中的topic partition与consumer group中的consumer的消费关系其实是一种配对关系,当配对双方发生了变化时,kafka会进行再平衡,也就是重新确定这种配对关系,以提升系统效率、高可用性和伸缩性。当然,再平衡也会带来一些负面效果,比如在再平衡期间,consumer不能消费kafka消息,相当于这段时间内系统是不可用的。再平衡后,往往会出现消息的重复拉取和消费的现象。
触发再平衡的条件包括:
需要注意的是,kafka集群broker的增减或者topic partition leader重新选主这类集群状态的变化并不会触发在平衡
有两种情况与日常应用开发比较关系比较密切:
consumer在调用subscribe()方法时,支持传入一个ConsumerRebalanceListener监听器,ConsumerRebalanceListener提供了两个方法,onPartitionRevoked()方法在consumer停止消费之后,再平衡开始之前被执行。可以发现,这个地方是提交offset的好时机。onPartitonAssigned()方法则会在重新进行partition分配好了之后,但是新的consumer还未消费之前被执行。
我们在提到kafka时,首先想到的是它的吞吐量非常大,这也是很多人选择kafka作为消息传输组件的重要原因。
以下是保证kafka吞吐量大的一些设计考虑:
但是kafka是不是总是这么快?我们同时需要看到kafka为了追求快舍弃了一些特性:
所以,kafka在消息独立、允许少量消息丢失或重复、不关心消息顺序的场景下可以保证非常高的吞吐量,但是在需要考虑消息事务、严格保证消息顺序等场景下producer和consumer端需要进行复杂的考虑和处理,可能会比较大的降低kafka的吞吐量,例如对可靠性和保序要求比较高的控制类消息需要非常谨慎的权衡是否适合使用kafka。
我们通过producer向kafka集群发送消息,总是期望消息能被consumer成功消费到。最不能忍的是producer收到了kafka集群消息写入的正常响应,但是consumer仍然没有消费到消息。
kafka提供了一些机制来保证消息的可靠传递,但是有一些因素需要仔细权衡考虑,这些因素往往会影响kafka的吞吐量,需要在可靠性与吞吐量之间求得平衡:
kafka只保证partition消息顺序,不保证topic级别的顺序,而且保证的是partition写入顺序与读取顺序一致,不是业务端到端的保序。
如果对保序要求比较高,topic需要只设置一个partition。这时可以把参数maxinflightrequestsperconnection设置为1,而retries设置为大于1的数。这样即使发生了可恢复型错误,仍然能保证消息顺序,但是如果发生不可恢复错误,应用层进行重试的话,就无法保序了。也可以采用同步发送的方式,但是这样也极大的降低了吞吐量。如果消息携带了表示顺序的字段,可以在接收端对消息进行重新排序以保证最终的有序。ModbusPoll是一个用于测试和调试Modbus通信协议的软件,它可以读取和写入寄存器、线圈等数据。如果你想要设置数据曲线,需要按照以下步骤进行:
1 打开ModbusPoll软件,并连接到你的设备。
2 在左侧的树形菜单中选择“Data Views”选项卡。
3 点击“Add View”按钮创建一个新视图。
4 选择“Chart View”作为视图类型,并输入名称和描述信息。
5 在右侧的属性面板中,选择要显示在曲线上的寄存器或线圈地址,并设置相应的参数(如采样时间间隔、Y轴范围等)。
6 点击“Apply”按钮保存设置并关闭属性面板。
7 双击新建视图,在d出窗口中选择刚才配置好的曲线参数即可查看数据曲线。
异步的概念和同步相对。
(1)当一个同步调用发出后,调用者要一直等待返回消息(结果)通知后,才能进行后续的执行;
(2)当一个异步过程调用发出后,调用者不能立刻得到返回消息(结果)。实际处理这个调用的部件在完成后,通过 状态、通知和回调 来通知调用者。
这里提到执行部件和调用者通过三种途径返回结果:状态、通知和回调。使用哪一种通知机制,依赖于执行部件的实现,除非执行部件提供多种选择,否则不受调用者控制。
(A)阻塞调用是指调用结果返回之前,当前线程会被挂起,一直处于等待消息通知,不能够执行其他业务
(B)非阻塞调用是指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回
场景比喻:
举个例子,比如我去银行办理业务,可能会有两种方式:
在上面的场景中,如果:
a)如果选择排队(同步),且排队的时候什么都不干(线程被挂起,什么都干不了),是同步阻塞模型;
b)如果选择排队(同步),但是排队的同时做与办银行业务无关的事情,比如抽烟,(线程没有被挂起,还可以干一些其他的事),是同步非阻塞模型;
c)如果选择拿个小票,做在位置上等着叫号(通知),但是坐在位置上什么都不干(线程被挂起,什么都干不了),这是异步阻塞模型;
d)如果选择那个小票,坐在位置上等着叫号(通知),但是坐着的同时还打电话谈生意(线程没有被挂起,还可以干其他事情),这是异步非阻塞模型。
对这四种模型做一个总结:
1:同步阻塞模型,效率最低,即你专心排队,什么都不干。
2:异步阻塞,效率也非常低,即你拿着号等着被叫(通知),但是坐那什么都不干
3:同步非阻塞,效率其实也不高,因为涉及到线程的来回切换。即你在排队的同时打电话或者抽烟,但是你必须时不时得在队伍中挪动。程序需要在排队和打电话这两种动作之间来回切换,系统开销可想而知。
4:异步非阻塞,效率很高,你拿着小票在那坐着等叫号(通知)的同时,打电话谈你的生意。
linux下几个基本概念
1:用户控件和内核空间。 现代 *** 作系统都是采用虚拟存储器,在32位 *** 作系统下,它的寻址空间(虚拟存储空间)为4G(2的32次方)。为了保证用户进程补鞥呢直接 *** 作内核,保证内核的安全, *** 作系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。对linux *** 作系统而言,将最高的1G字节空间分给了内核使用,称为内核空间,将较低的3G字节的空间划分为用户空间。
2:进程切换很耗资源 ,为了控制进程的执行,内核必须有能力挂起正在cpu上运行的进程,并恢复以前挂起的某个进程的执行,这种行为叫进程的切换。每次切换,要保存上一个的上下文环境等等,总之记住进程切换很耗资源。
3:文件描述符 :文件描述符在形式上是一个非负整数。实际上,他是一个索引,指向内核为每个进程所维护的该进程打开文件的记录表。当程序打开一个文件时,内核就会向进程返回一个非负整数的文件描述符。但是文件描述符一般在unix,linux系统中才讲。
缓存IO ,大多数系统的默认IO *** 作都是缓存IO,在linux的缓存IO机制中, *** 作系统会将IO的数据缓存在系统的页缓存(page cache)中,也就是说,数据会先被拷贝到 *** 作系统内核的缓冲区,然后才会从 *** 作系统内核的缓冲区拷贝到应用程序的地址空间。 缓存IO的缺点: 数据在传输过程中需要在应用程序和地址空间和内核进行多次数据拷贝 *** 作,这种数据拷贝 *** 作锁带来的cpu以及内存消耗是很大的。
LINUX的IO模型
网络IO的本质是socket的读取。socket在linux系统被抽象为流,故对网络IO的 *** 作可以理解为对流的 *** 作。
对于一次IO访问,比如以read *** 作为例, 数据会先被拷贝到 *** 作系统内核的缓冲区,然后才会从内核缓冲区拷贝到进程的用户层,即应用程序的地址空间 。故当一个read *** 作发生时,其实是经历了两个阶段:
1:内核缓冲区的数据就位
2:数据从内核缓冲区拷贝到用户程序地址空间
那么具体到socket io的一次read *** 来说,这两步分别是:
1:等待网络上的数据分组到达,然后复制到内核缓冲区中
2:数据从内核缓冲区拷贝到用户程序的地址空间(缓冲区)
所以说 网络应用要处理的无非就两个问题:网络IO和数据计算 ,一般来说网络io带来的延迟影响比较大。
网络IO的模型大致有如下几种:
熟悉不? 我们常说的select,poll和epoll就是属于同步模型中多路复用IO的不同实现方法罢了。 下面分别对同步阻塞,同步不阻塞,同步io复用进行说明。
一:同步阻塞
它是最简单也最常用的网络IO模型。linux下默认的socket都是blocking的。
从图中可以看到,用户进程调用recvfrom这个系统调用后,就处于阻塞状态。然后kernel就开始了IO的第一个阶段:数据准备。等第一个阶段准备完成之后,kernel开始第二阶段,将数据从内核缓冲区拷贝到用户程序缓冲区(需要花费一定时间)。然后kernel返回结果(确切的说是recvfrom这个系统调用函数返回结果),用户进程才结束blocking,重新运行起来。
总结 : 同步阻塞模型下,用户程序在kernel执行io的两个阶段都被blocking住了 。但是优点也是因为这个,无延迟能及时返回数据,且程序模型简单。
二:同步非阻塞
同步非阻塞就是隔一会瞄一下的轮询方式。同步非阻塞模式其实是可以看做一小段一小段的同步阻塞模式。
三:IO多路复用
由于同步非阻塞方式需要不断的轮询,光轮询就占据了很大一部分过程,且消耗cpu资源。而这个用户进程可能不止对这个socket的read,可能还有对其他socket的read或者write *** 作,那人们就想到了一次轮询的时候,不光只查询询一个socket fd,而是在一次轮询下,查询多个任务的socket fd的完成状态,只要有任何一个任务完成,就去处理它。而且,轮询人不是进程的用户态,而是有人帮忙就好了。那么这就是所谓的 IO多路复用 。总所周知的linux下的select,poll和epoll就是这么干的。。。
selelct调用是内核级别的,selelct轮询相比较同步非阻塞模式下的轮询的区别为: 前者可以等待多个socket,能实现同时对多个IO端口的监听 ,当其中任何一个socket数据准备好了,就返回可读。 select或poll调用之后,会阻塞进程 ,与blocking IO 阻塞不用在于,此时的select不是等到所有socket数据达到再处理,而是某个socket数据就会返回给用户进程来处理。
其实select这种相比较同步non-blocking的效果在单个任务的情况下可能还更差一些 ,因为这里调用了select和recvfrom两个system call,而non-blocking只调用了一个recvfrom,但是 用select的优势在于它可以同时处理多个socket fd 。
在io复用模型下,对于每一个socket,一般都设置成non-blocking,但是其实 整个用户进程是一直被block的 ,只不过用户process不是被socket IO给block住,而是被select这个函数block住的。
与多进程多线程技术相比,IO多路复用的最大优势是系统开销小。
一:select
select函数监视多个socket fs,直到有描述符就绪或者超时,函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。select的基本流程为:
二:poll
poll本质上跟select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd的状态,如果某个fd的状态为就绪,则将此fd加入到等待队列中并继续遍历。如果遍历完所有的fd后发现没有就绪的,则挂起当前进程,直到设备就绪或者主动超时。被唤醒后它又要再次遍历fd。
特点:
1:poll没有最大连接数限制,因为它是用基于链表来存储的,跟selelct直接监听fd不一样。
2:同样的大量的fd的数组被整体复制与用户态和内核地址空间之间。
3:poll还有一个特点是水平触发:如果报告了fd后没有被处理,则下次poll时还会再次报告该fd。
4:跟select一样,在poll返回后,还是需要通过遍历fdset来获取已经就绪的socket。当fd很多时,效率会线性下降。
三:epoll
epoll支持水平触发和边缘触发,最大的特点在于边缘触发,它只告诉进程哪些fd刚刚变为就绪态,并且只会通知一次。还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知。
没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)。
效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。
内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。
聊聊同步、异步、阻塞与非阻塞
聊聊Linux 五种IO模型
聊聊IO多路复用之select、poll、epoll详解
关于IO会涉及到阻塞、非阻塞、多路复用、同步、异步、BIO、NIO、AIO等几个知识点。知识点虽然不难但平常经常容易搞混,特此Mark下,与君共勉。
阻塞IO情况下,当用户调用 read 后,用户线程会被阻塞,等内核数据准备好并且数据从内核缓冲区拷贝到用户态缓存区后 read 才会返回。可以看到是阻塞的两个部分。
非阻塞IO发出read请求后发现数据没准备好,会继续往下执行,此时应用程序会不断轮询polling内核询问数据是否准备好,当数据没有准备好时,内核立即返回EWOULDBLOCK错误。直到数据被拷贝到应用程序缓冲区,read请求才获取到结果。并且你要注意!这里最后一次 read 调用获取数据的过程,是一个同步的过程,是需要等待的过程。这里的同步指的是 内核态的数据拷贝到用户程序的缓存区这个过程 。
非阻塞情况下无可用数据时,应用程序每次轮询内核看数据是否准备好了也耗费CPU,能否不让它轮询,当内核缓冲区数据准备好了,以事件通知当机制告知应用进程数据准备好了呢?应用进程在没有收到数据准备好的事件通知信号时可以忙写其他的工作。此时 IO多路复用 就派上用场了。
IO多路复用中文比较让人头大,IO多路复用的原文叫 I/O multiplexing,这里的 multiplexing 指的其实是在单个线程通过记录跟踪每一个Sock(I/O流)的状态来同时管理多个I/O流 发明它的目的是尽量多的提高服务器的吞吐能力。实现一个线程监控多个IO请求,哪个IO有请求就把数据从内核拷贝到进程缓冲区,拷贝期间是阻塞的!现在已经可以通过采用mmap地址映射的方法,达到内存共享效果,避免真复制,提高效率。
像 select、poll、epoll 都是I/O多路复用的具体的实现。
select是第一版IO复用,提出后暴漏了很多问题。
poll 修复了 select 的很多问题。
但是poll仍然不是线程安全的, 这就意味着不管服务器有多强悍,你也只能在一个线程里面处理一组 I/O 流。你当然可以拿多进程来配合了,不过然后你就有了多进程的各种问题。
epoll 可以说是 I/O 多路复用最新的一个实现,epoll 修复了poll 和select绝大部分问题, 比如:
横轴 Dead connections 是链接数的意思,叫这个名字只是它的测试工具叫deadcon。纵轴是每秒处理请求的数量,可看到epoll每秒处理请求的数量基本不会随着链接变多而下降的。poll 和/dev/poll 就很惨了。但 epoll 有个致命的缺点是只有 linux 支持。
比如平常Nginx为何可以支持4W的QPS是因为它会使用目标平台上面最高效的I/O多路复用模型。
然后你会发现上面的提到过的 *** 作都不是真正的异步,因为两个阶段总要等待会儿!而真正的异步 I/O 是内核数据准备好和数据从内核态拷贝到用户态这两个过程都不用等待。
很庆幸,Linux给我们准备了 aio_read 跟 aio_write 函数实现真实的异步,当用户发起aio_read请求后就会自动返回。内核会自动将数据从内核缓冲区拷贝到用户进程空间,应用进程啥都不用管。
我强力推荐C++后端开发免费学习地址:C/C++Linux服务器开发/后台架构师零声教育-学习视频教程-腾讯课堂
同步跟异步的区别在于 数据从内核空间拷贝到用户空间是否由用户线程完成 ,这里又分为同步阻塞跟同步非阻塞两种。
我们以同步非阻塞为例,如下可看到,在将数据从内核拷贝到用户空间这一过程,是由用户线程阻塞完成的。
可发现,用户在调用之后会立即返回,由内核完成数据的拷贝工作,并通知用户线程,进行回调。
在Java中,我们使用socket进行网络通信,IO主要有三种模式,主要看 内核支持 哪些。
同步阻塞IO ,每个客户端的Socket连接请求,服务端都会对应有个处理线程与之对应,对于没有分配到处理线程的连接就会被阻塞或者拒绝。相当于是 一个连接一个线程 。
BIO特点 :
常量:
主类:
服务端监听线程:
服务端处理线程:
客户端:
同步非阻塞IO之NIO :服务器端保存一个Socket连接列表,然后对这个列表进行轮询,如果发现某个Socket端口上有数据可读时说明读就绪,则调用该socket连接的相应读 *** 作。如果发现某个 Socket端口上有数据可写时说明写就绪,则调用该socket连接的相应写 *** 作。如果某个端口的Socket连接已经中断,则调用相应的析构方法关闭该端口。这样能充分利用服务器资源,效率得到了很大提高,在进行IO *** 作请求时候再用个线程去处理,是 一个请求一个线程 。Java中使用Selector、Channel、Buffer来实现上述效果。
每个线程中包含一个 Selector 对象,它相当于一个通道管理器,可以实现在一个线程中处理多个通道的目的,减少线程的创建数量。远程连接对应一个channel,数据的读写通过buffer均在同一个 channel 中完成,并且数据的读写是非阻塞的。通道创建后需要注册在 selector 中,同时需要为该通道注册感兴趣事件(客户端连接服务端事件、服务端接收客户端连接事件、读事件、写事件), selector 线程需要采用 轮训 的方式调用 selector 的 select 函数,直到所有注册通道中有兴趣的事件发生,则返回,否则一直阻塞。而后循环处理所有就绪的感兴趣事件。以上步骤解决BIO的两个瓶颈:
下面对以下三个概念做一个简单介绍,Java NIO由以下三个核心部分组成:
channel和buffer有好几种类型。下面是Java NIO中的一些主要channel的实现:
正如你所看到的,这些通道涵盖了UDP和TCP网络IO,以及文件IO。以下是Java NIO里关键的buffer实现:
在微服务阶段,一个请求可能涉及到多个不同服务之间的跨服务器调用,如果你想实现高性能的PRC框架来进行数据传输,那就可以基于Java NIO做个支持长连接、自定义协议、高并发的框架,比如Netty。Netty本身就是一个基于NIO的网络框架, 封装了Java NIO那些复杂的底层细节,给你提供简单好用的抽象概念来编程。比如Dubbo底层就是用的Netty。
AIO是异步非阻塞IO,相比NIO更进一步,进程读取数据时只负责发送跟接收指令,数据的准备工作完全由 *** 作系统来处理。
推荐一个零声教育C/C++后台开发的免费公开课程,个人觉得老师讲得不错,分享给大家:C/C++后台开发高级架构师,内容包括Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,C/C++Linux服务器开发/后台架构师零声教育-学习视频教程-腾讯课堂 立即学习
原文:阻塞、非阻塞、多路复用、同步、异步、BIO、NIO、AIO 一锅端
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)