如何让Hadoop读取以gz结尾的文本格式的文件

如何让Hadoop读取以gz结尾的文本格式的文件,第1张

####背景: 搜索引擎在build全量时,会产生数G的xml的中间文件,我需要去查询这些中间文件中,是否有某个特殊的字符。xml文件有很多,每个都有几百M,存储在hdfs上,而且是以gz结尾的文本格式的文件。 查找时,我是写了一个实现Tool接口,继承自Configured类的MapReduce,这样就可以传入自定义的参数给我的MapReduce程序了。需要在文件里Grep的内容,就是以参数的形式传入的。 写完代码调试时,问题来了银激雹,会报这个异常: 14/10/17 12:06:33 INFO mapred.JobClient: Task Id : attempt_201405252001_273614_m_000013_0, Status : FAILED java.io.IOException: incorrect header check at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method) at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:221) at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:81) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:75) at java.io.InputStream.read(InputStream.java:85) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208) at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:390) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:324) at org.apache.hadoop.mapred.Child$4.run(Child.java:268) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115) at org.apache.hadoop.mapred.Child.main(Child.java:262) ###分析过程: 通过上面的异常,立马猜想到是由于我的文件是gz结尾,所以hadoop把它当作了压缩文件,然后尝试解压缩后读取,所以解压失败了。于是去问google,没有搜到能够直接解决我问题的锋帆答案,但是搜到了此铅乱处相关的源代码:[LineRecordReader.java](http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/mapreduce/lib/input/)于是尝试着去阅读代码来解决问题,这个类很简单,继承自RecordReader,没有看到next函数和readLine函数,那就应该是基类实现的。很快发现了看名字是跟压缩解码相关的代码: private CompressionCodecFactory compressionCodecs = null... compressionCodecs = new CompressionCodecFactory(job)final CompressionCodec codec = compressionCodecs.getCodec(file)... if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job)} else{ ... in = new LineReader(fileIn, job)} 此处file就是拿到的文件路径,可以看到,应该就是通过CompressionCode.getCode(file)函数,拿到的codec类,然后读取的时候出异常了。那怎么让MapReduce程序把这个.gz文件当作普通的文本文件呢?再点进去看[CompressionCodeFactory.java](http://grepcode.com/file/repository.cloudera.com/content/repositories/releases/com.cloudera.hadoop/hadoop-core/0.20.2-737/org/apache/hadoop/io/compress/CompressionCodecFactory.java#CompressionCodecFactory.%3Cinit%3E%28org.apache.hadoop.conf.Configuration%29)的代码。getCodec函数的代码如下: /** * Find the relevant compression codec for the given file based on its * filename suffix. * @param file the filename to check * @return the codec object */ public CompressionCodec getCodec(Path file) { CompressionCodec result = nullif (codecs != null) { String filename = file.getName()String reversedFilename = new StringBuffer(filename).reverse().toString()SortedMap subMap = codecs.headMap(reversedFilename)if (!subMap.isEmpty()) { String potentialSuffix = subMap.lastKey()if (reversedFilename.startsWith(potentialSuffix)) { result = codecs.get(potentialSuffix)} } } return result} 就是根据文件名称匹配来得到对应的解压缩类。咋们按图索骥,去看看codecs是在哪里赋值的: /** * Find the codecs specified in the config value io.compression.codecs * and register them. Defaults to gzip and zip. */ public CompressionCodecFactory(Configuration conf) { codecs = new TreeMap ()List >codecClasses = getCodecClasses(conf)if (codecClasses == null) { addCodec(new GzipCodec())addCodec(new DefaultCodec())} else { Iterator >itr = codecClasses.iterator()while (itr.hasNext()) { CompressionCodec codec = ReflectionUtils.newInstance(itr.next(), conf)addCodec(codec)} } } 看样子从配置文件里,拿不到编码相关的配置,就会默认把GzipCodec,DefaultCodec加进去。再跟到getCodecClasses(conf)函数里去: /** * Get the list of codecs listed in the configuration * @param conf the configuration to look in * @return a list of the Configuration classes or null if the attribute * was not set */ public static List >getCodecClasses(Configuration conf) { String codecsString = conf.get("io.compression.codecs")if (codecsString != null) { List >result = new ArrayList >()StringTokenizer codecSplit = new StringTokenizer(codecsString, ",")while (codecSplit.hasMoreElements()) { String codecSubstring = codecSplit.nextToken()if (codecSubstring.length() != 0) { try { Classcls = conf.getClassByName(codecSubstring)if (!CompressionCodec.class.isAssignableFrom(cls)) { throw new IllegalArgumentException("Class " + codecSubstring + " is not a CompressionCodec")} result.add(cls.asSubclass(CompressionCodec.class))} catch (ClassNotFoundException ex) { throw new IllegalArgumentException("Compression codec " + codecSubstring + " not found.", ex)} } } return result} else { return null} } 从这个函数里能够看到编码的配置是 **io.compression.codecs** 。可以看到,我们必须返回非null的result,那么直接让io.compression.codecs配置成空,应该就可以了,此时返回的result里面没有任何元素。 ###问题解决方案: 试了一把,执行这个MapReduce程序时,加上 **-Dio.compression.codecs=,** 的参数,就可以了: hadoop jar ./dumptools-0.1.jar ddump.tools.mr.Grep -Dio.compression.codecs=, "adgroupId=319356697" doc val

Hadoop支持的文件系统知培由很多(见下图),HDFS只是其中一种实现。Java抽象类 org.apache.hadoop.fs.FileSystem 定义了Hadoop中一个文件系统的客户端接口,并且该抽象类有几个具体实现。Hadoop一般使用URI(下图)方案来选取合适的文件系统实例进行交互。

特别的,HDFS文件系统的 *** 作可以使用 FsSystem shell 、客户端(http rest api、Java api、C api等)。

FsSystem shell 的用法基本同本地shell类似,命令可参考 FsSystem shell

Hadoop是用Java写的,通过Java Api( FileSystem 类)可以调用大部分Hadoop文件系统的交互 *** 作。更详细的介绍可参考 hadoop Filesystem 。

非Java开发的应用可以使用由WebHDFS协议提供的HTTP REST API,但是HTTP比原生的Java客户端要慢,所以不到万不得已尽量不要使用HTTP传输特大数据。通过HTTP来访问HDFS有两种方法:

两种如图

在第一种情况中,namenode和datanode内嵌的web服务作为WebHDFS的端节点运行(是否启用WebHDFS可通过dfs.webhdfs.enabled设置,默认为true)。文件元数据在namenode上,文件读写 *** 作首先被发往namenode,有namenode发送一个HTTP重定向至某个客户端,指示以流的方式传输文件数据的目的或源datanode。

第二种方法依靠一个或多个独立代理服务器通过HTTP访问HDFS。所有集群的网络通信都需要通过代理,因此客户端从来不直接访问namenode或datanode。使用代理后可以使用更严格的防火墙策略和带宽策略。

HttpFs代理提供和WebHDFS相同的HTTP接口,这样客户端能够通过webhdfs URI访问搭晌唯接口。HttpFS代理启动独立于namenode和datanode的守护进程,使用httpfs.sh 脚本,默认在一个不同的端口上监听(14000)。

下图描述了

读文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。

对上图的解释如下:

在读取过程中, 如果 FSDataInputStream 在和一个 datanode 进行交流时出现了一个错误,他就去试一试下一个最接近的块,他当然也会记住刚才发生错误的 datanode 以至于之后不会再在这个 datanode 上进行没必要的尝试。 DFSInputStream 也会在 datanode 上传输出的数据上核查检查数(checknums).如果损坏的块被发现了, DFSInputStream 就试图从另一个拥有备份的 datanode 中去读取备份块中的数据。

在这个设计中一个重要的方面就是客户端直接从 datanode 上检索数据,并通过 namenode 指导来得到每一个块的最佳 datanode。这种设计允许 HDFS 扩展大量的并发客户谨液端,因为数据传输只是集群上的所有 datanode 展开的。期间,namenode 仅仅只需要服务于获取块位置的请求(块位置信息是存放在内存中,所以效率很高)。如果不这样设计,随着客户端数据量的增长,数据服务就会很快成为一个瓶颈。

我们知道,相对于客户端(之后就是 mapreduce task 了),块的位置有以下可能性:

我们认为他们对于客户端的带宽递减,距离递增(括号中表示距离)。示意图如下:

如果集群中的机器都在同一个机架上,我们无需其他配置,若集群比较复杂,由于hadoop无法自动发现网络拓扑,所以需要额外配置网络拓扑。

基本读取程序,将文件内容输出到console

FileSystemCat

随机读取

展开原码

下图描述了写文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。

对上图的解释如下:

如果在任何一个 datanode 在写入数据的时候失败了,接下来所做的一切对客户端都是透明的:首先, pipeline 被关闭,在确认队列中的剩下的包会被添加进数据队列的起始位置上,以至于在失败的节点下游的任 何节点都不会丢失任何的包。然后与 namenode 联系后,当前在一个好的 datanode 会联系 namenode, 给失败节点上还未写完的块生成一个新的标识ID, 以至于如果这个失败的 datanode 不久后恢复了,这个不完整的块将会被删除。失败节点会从 pipeline 中移除,然后剩下两个好的 datanode 会组成一个的新的 pipeline ,剩下的 这些块的包(也就是刚才放在数据队列队首的包)会继续写进 pipeline 中好的 datanode 中。最后,namenode 注意到块备份数小于规定的备份数,他就安排在另一个节点上创建完成备份,直接从已有的块中复制就可以。然后一直到满足了备份数( dfs.replication )。如果有多个节点的写入失败了,如果满足了最小备份数的设置( dfs.namenode.repliction.min ),写入也将会成功,然后剩下的备份会被集群异步的执行备份,直到满足了备份数( dfs.replication )。

创建目录

文件压缩有两大好处:

Hadoop 对于压缩格式的是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如 lzo,gz,bzip2 等)。Hadoop 会根据压缩格式的扩展名自动选择相对应的解码器来解压数据,此过程完全是 Hadoop 自动处理,我们只需要确保输入的压缩文件有扩展名。

Hadoop中有多种压缩格式、算法和工具,下图列出了常用的压缩方法。

表中的“是否可切分”表示对应的压缩算法是否支持切分,也就是说是否可以搜索数据流的任意位置并进一步往下读取数据,可切分的压缩格式尤其适合MapReduce。

所有的压缩算法都需要权衡空间/时间:压缩和解压缩速度更快,其代价通常是只能节省少量的空间。不同的压缩工具有不同的特性:

更详细的比较如下

1.压缩性能比较

2.优缺点

另外使用hadoop原生(native)类库比其他java实现有更快的压缩和解压缩速度。特征比较如下:

使用容器文件格式结合压缩算法也能更好的提高效率。顺序文件、Arvo文件、ORCFiles、Parqurt文件同时支持压缩和切分。

压缩举例(Java)

压缩

解压缩

六、文件序列化

序列化是指将结构化数据转换为字节流以便在网络上传输或写到磁盘进行永久存储。反序列化狮子将字节流转换回结构化对象的逆过程。

序列化用于分布式数据处理的两大领域:进程间通信和永久存储。

对序列化的要求时是格式紧凑(高效使用存储空间)、快速(读写效率高)、可扩展(可以透明地读取老格式数据)且可以互 *** 作(可以使用不同的语言读写数据)。

Hadoop使用的是自己的序列化格式 Writable ,它绝对紧凑、速度快,但不太容易用java以外的语言进行扩展或使用。

当然,用户也可以使用其他序列化框架或者自定义序列化方式,如 Avro 框架。

Hadoop内部还使用了 Apache Thrift 和 Protocal Buffers 来实现RPC和数据交换。


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/tougao/12200942.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-21
下一篇 2023-05-21

发表评论

登录后才能评论

评论列表(0条)

保存