文件channel用法

文件channel用法,第1张

当我们注释掉 buffer.clear() 后,发现会无限循环下去,并且也会将 input.txt 中的内容无限写入 output.txt

原因是什么呢?

参考自 ReadableByteChannel 接口的 read() 文档

NioTest.txt文件就由

变成了

虽然在idea中没有变化,但是用notepad++打开查看后,其实已经发生了变化。

客户端用 输入以下命令,建立连接

关闭不再需要使用的 channel 并不是必须的。跟其他资源比如打开的文件、socket 连接不一样,这类资源使用完后不关闭后会造成句柄泄露,channel 使用完后不关闭也没有关系,channel 没有被任何协程用到后最终会被 GC 回收。关闭 channel 一般是用来通知其他协程某个任务已经完成了。golang 也没有直接提供判断 channel 是否已经关闭的接口,虽然可以用其他不太优雅的方式自己实现一个:

不过实现一个这样的接口也没什么必要。因为就算通过 isClosed() 得到当前 channel 当前还未关闭,如果试图往 channel 里写数据,仍然可能会发生 panic ,因为在调用 isClosed() 后,其他协程可能已经把 channel 关闭了。

关闭 channel 时应该注意以下准则:

1)不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ;

2)有多个写入端时,不要在写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ;

3)如果只有一个写入端,可以在这个写入端放心关闭 channel 。

关闭 channel 粗暴一点的做法是随意关闭,如果产生了 panic 就用 recover 避免进程挂掉。稍好一点的方案是使用标准库的 sync 包来做关闭 channel 时的协程同步,不过使用起来也稍微复杂些

Channel被设计为Event中转临时缓冲区,存储Source收集并且没有被Sink读取的Event,为平衡Source收集和Sink读取数据的速度,可视为Flume内部的消息队列。Channel线程安全并且具有事务性,支持source写失败重复写和sink读失败重复读等 *** 作。

常用的Channel类型有Memory Channel、File Channel、KafkaChannel等。

对比Channel, Memory Channel读写速度快,但是存储数据量小,Flume进程挂掉、服务器停机或者重启都会导致数据丢失。部署Flume Agent的线上服务器内存资源充足、不关心数据丢失的场景下可以使用。

将 event 写入磁盘文件,与 Memory Channel 相比存储容量大,无数据丢失风险。File Channle 数据存储路径可以配置多磁盘文件路径,通过磁盘并行写入提高FileChannel 性能。Flume 将 Event 顺序写入到 File Channel 文件的末尾,在配置文件中通过设置 maxFileSize 参数配置数据文件大小,当被写入的文件大小达到上限时 Flume 会重新创建新的文件存储写入的 Event。当然数据文件数量也不会无限增长,当一个已关闭的只读数据文件中的 Event 被读取完成,并且 Sink 已经提交读取完成的事务,则 Flume 将删除存储该数据的文件。Flume 通过设置检查点和备份检查点实现在 Agent 重启之后快速将 File Channle 中的数据按顺序回放到内存中,保证在 Agent 失败重启后仍然能够快速安全地提供服务。

将Kafka作为Channel存储,Kafka是分布式、可扩展、高容错、高吞吐的分布式系统,Kafka通过优秀的架构设计充分利用磁盘顺序特性,在廉价的硬件条件下完成高效的消息发布和订阅。

Memory Channel在使用的过程中受内存容量的限制不能缓存大量的消息,并且如果Memory Channel中的消息没来得及写入Sink,此时Agent出现故障就会造成数据丢失。File Channel虽然能够缓存更多的消息,但如果缓存下来的消息还没有写入Sink,此时Agent出现故障则File Channel中的消息不能被继续使用,直到该Agent重新恢复才能够继续使用File Channel中的消息。Kafka Channel相对于Memory Channel和File Channel存储容量更大、容错能力更强,弥补了其他两种Channel的短板,如果合理利用Kafka的性能,能够达到事半功倍的效果。

有了Kafka Channel可以在日志收集层只配置Source组件和Kafka Channel组件,不需要再配置Sink组件,减少了日志收集层启动的进程数并且有效降低服务器内存、磁盘等资源使用率,日志汇聚层可以只配置Kafka Channel和Sink,不需要再配置Source,减少日志汇聚层的进程数,这样的配置既能降低服务器的资源使用率又能减少Event在网络之间的传输,有效提高日志采集系统的性能。

Kafka Channel相关 *** 作在org.apache.flume.channel.kafka包的KafkaChannel类定义,

kafka相关参数的默认值在org.apache.kafka.clients.CommonClientConfigs包中的KafkaChannel-Configuration中。

Kafka的通用配置参数在配置文件中都以“kafka.”为前缀,针对Producer或者Consumer的相关配置以“kafka.producer. ”或者“kafka.consumer. ”为前缀,

源码 KafkaChannelConfiguration 中相关默认配置参数定义如下:

说明:agent_name 没有配置Source,只配置了Channel和Sink,使用的Channel类型为Kafka Channel,主题名称为“test_channel”, consumer组id为“test-consumer”, Sink类型为 hdfs 滚动生成文件,对接的Channel为KafkaChannel channel_name。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/12016599.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-20
下一篇 2023-05-20

发表评论

登录后才能评论

评论列表(0条)

保存