说起零拷贝之前,先来了解下服务器中文件数据通过网络传输到客户端的流程。作为应用服务器,其中会有很多从磁盘中读取数据,然后应用程序对加载到内存中的数据进行处理,然后通过网卡发送给客户端,传统数据处理通过以下两个函数实现:
知磨坦在这个过程中,数据流转的大致过程如下:
可以见到,在这个过程中发生了2次cpu copy和2次DMA copy,以及发生了数次cpu状态切换。 这个 *** 作对于应用服务器来说很频繁,因此带来的开销也是非常大。
因此所谓的零拷贝就是,让其中的2次cpu拷贝省略掉,因为这两次cpu拷贝的数据其实已经在内存中,没有必要再让cpu参与进来进行数据的拷贝,浪费cpu。在大量文件读写的时候,这个优化带来的收益还是比较可观的。
零拷贝的实现方式有两种:
mmap通过虚拟内存映射,让多个虚拟地址指向同一个物理内存地址,用户空间的虚拟地址和内核空间的虚拟地址指游中向同一个物理内存地址,这样用户空间和内核空间共享同一个内存数据。这样DMA引擎从磁盘上加载的数据不需要在内核空间和用户空间进行复制,减少了一次cpu拷贝。
sendfile通过系统调用,并且规定了in_fd文件描述符必须是可以mmap的,sendfile只能将文件数据发送到socket中,sendfile减少了一次cpu状态的切换
无论是mmap结合write方式还是sendfile方式都只是减少了一次cpu拷贝,而后DMA引擎还具有了收集功能,可以在内核缓存区发送到socket缓冲区的时候避免掉cpu复制,只是将缓冲区地址和数据长度发送给socket缓冲区,然后DMA引擎通过收集功能搭桐直接读取收集数据发送到网卡中。这里依赖DMA引擎的收集功能省略掉了最后一次cpu拷贝,到此才是真正的零拷贝。
所谓的零拷贝就是避免数据在内核空间缓存区和用户空间缓缓冲区之间的复制,避免掉2次cpu复制,释放cpu。
在RocketMq中采用的是mmap()结合write()方式来实现零拷贝。
在java中还可以通过FileChannel.transferTo()来实现数据从文件描述符传输到socket中,它的底层是通过sendfile系统调用来实现。
1、把旧目录下logs、store移动到新硬盘目录下,比如:/disk1/rocketmq-data
2、修改数据存储路径
打开conf/broker.conf,增加以下内容:
#存储路径
storePathRootDir=/disk1/rocketmq-data/store
#commitLog存储路径
storePathCommitLog=/disk1/rocketmq-data/store/commitlog
#消费队列存储路径
storePathConsumeQueue=/盯档disk1/rocketmq-data/store/consumequeue
#消息索引存储路径
storePathIndex=/disk1/rocketmq-data/store/index
#checkpoint 文件存储路径
storeCheckpoint=/disk1/rocketmq-data/store/checkpoint
#abort 文件存储路径
abortFile=/disk1/rocketmq-data/store/abort
3、修改凯李乱日志存储路径
进扰晌入到:conf,执行以下命令:
sudo sed -i 's#${user.home}#/disk1/rocketmq-data#g' *.xml
4、重启RocketMQ
• 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。• 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
• 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
• 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将碧蠢知不同量级的消息进行拆分,使用不同的 Topic。
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。
通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。
RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。
Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件悔消存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行档谨消息消费。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)