如何比较hadoop中的文件和本地文件md5

如何比较hadoop中的文件和本地文件md5,第1张

需求

本地有文滑册件,hdfs也有文件,如果是同一个文件,则不同步,否则就同步文件

如果本地有的,hdfs无,则上传

如果本地无得,hdfs有,则删除

重构版本:hdfs sync 重构

思考

计算文件相或凯同,则计算md5值

如何算

本来想用hdfs的checksum,但那个是crc,每次写block会去算一下,最后是一组checksum,而本地文件系统默认不会计算这个值

后面就直接用流计算的:

def getHdfsFileMd5(path: Path, configuration: Configuration): String = {val dfs = FileSystem.get(configuration)val in = dfs.open(path)

Try {

DigestUtils.md5Hex(in)

} match { case Success(s) ⇒ in.close()dfs.close()s case Failure(e) ⇒ in.close()dfs.close()e.getMessage

}

} def getLocalFileMd5(file: File): String = {val in = new FileInputStream(file)

Try {

DigestUtils.md5Hex(in)

} match { case Success(s) ⇒ in.close()s case Failure(e) ⇒ in.close()e.getMessage

}

}1234567891011121314151617181920212223

设计

按照刚才的思路,可以分成下面几种情况

本地

HDFS

是否相同

文件文件相同

文件文件不同

文件文件夹无需比较

文件夹文件无需比较

文件夹文件夹无需比较

所以设置一个type:

trait Modeobject Mode{

type isSameFile = (Boolean,Boolean,Boolean)

}12345

而在模式匹配中信团宏也是这样去使用的

def syncHelper(localFile:File,hdfsPath:Path,configuration: Configuration) :Unit = {val fileSystem = FileSystem.get(configuration)val mode:Mode.isSameFile = (localFile.isFile,fileSystem.isFile(hdfsPath),

sameMd5(localFile,hdfsPath,fileSystem))

mode match { case(true,true,true) ⇒ logger.info(s"the file :${localFile.getName} in local and hdfs are same one") case (true,true,false) ⇒

logger.debug(s"the file: ${localFile.getName} in local and hdfs have same name,but they are different file")// copyFromLocal to hdfs by overwrite

val fileSystem = FileSystem.get(configuration)

fileSystem.copyFromLocalFile(false,true,new Path(localFile.getAbsolutePath),hdfsPath) case(true,false,_) ⇒

logger.debug(s"the file: ${localFile.getName} in local is file and in hdfs is dir")//first delete file in hdfs.then copyFromLocal to hdfs

fileSystem.delete(hdfsPath,true)

fileSystem.copyFromLocalFile(false,true,new Path(localFile.getAbsolutePath),hdfsPath) case (false,true,_) ⇒val fileSystem = FileSystem.get(configuration)

logger.debug(s"in local this is a dir and in hdfs is a file,upload ${localFile.getName} ")//first delete file in hdfs ,then copyFromLocal to hdfs

composeAction(localFile,hdfsPath,fileSystem) case (false,false,_) ⇒

logger.debug(s"both local and hdfs this is dir:${localFile.getName}")//three list ,which need update ,which need delete ,which need update

composeAction(localFile,hdfsPath,fileSystem)val childrenDir = localFile.listFiles().filter(_.isDirectory)val hdfsParent = hdfsPath.toString

childrenDir.foreach(file ⇒ syncHelper(file,new Path(s"$hdfsParent/${file.getName}"),configuration))

}

}1234567891011121314151617181920212223242526272829303132

辅助函数

那些需要被删除的:

def needDelete(localFile: File, hdfsPath: Path, configuration: Configuration) = {

FileSystem.get(configuration).listStatus(hdfsPath)

.map(_.getPath.getName).diff(localFile.listFiles().map(_.getName)).toList

}1234

那些需要被更新:

def needUpdate(localFile: File, hdfsPath: Path, configuration: Configuration) = {val tmpFile = FileSystem.get(configuration).listStatus(hdfsPath).map(_.getPath.getName)

.intersect(localFile.listFiles().map(_.getName))val localMd5 = localFile.listFiles().filter(_.isFile)

.filter(file ⇒ tmpFile.contains(file.getName))

.map(file ⇒ (file.getName, getLocalFileMd5(file)))val fileSystem = FileSystem.get(configuration)val hdfsMd5 = FileSystem.get(configuration).listStatus(hdfsPath)

.filter(path ⇒ fileSystem.isFile(path.getPath))

.filter(path ⇒ tmpFile.contains(path.getPath.getName))

.map(path ⇒ (path.getPath.getName, getHdfsFileMd5(path.getPath, configuration)))

localMd5.diff(hdfsMd5).map(_._1).toList

}1234567891011121314

那些需要被上传的:

def needUpload(localFile: File, hdfsPath: Path, configuration: Configuration) = {

localFile.listFiles().map(_.getName).diff(

FileSystem.get(configuration).listStatus(hdfsPath).map(_.getPath.getName)).toList

}12345

组合函数:

def composeAction(localFile:File,hdfsPath:Path,fileSystem: FileSystem) = {val configuration = fileSystem.getConfval deleteList = needDelete(localFile,hdfsPath,configuration)val uploadList = needUpload(localFile,hdfsPath,configuration)val updateList = needUpdate(localFile,hdfsPath,configuration)val concatList = uploadList ++ updateListval localParent = localFile.getAbsolutePathval hdfsParent = hdfsPath.toString

logger.debug("deleting which file need delete")val deleteFileSystem = FileSystem.get(configuration)

deleteList.foreach(name ⇒ deleteFileSystem.delete(new Path(s"$hdfsParent/$name"),true))

logger.debug("deleted")

logger.debug("uploading which file need upload or update")val concatFileSystem = FileSystem.get(configuration)

concatList.foreach(name ⇒ concatFileSystem.copyFromLocalFile(false,true, new Path(s"$localParent/$name"),new Path(s"$hdfsParent/$name")))

logger.debug("uploaded")

}12345678910111213141516171819

测试

package

package object sync {

lazy val sync = PathSyncer lazy val fileMd5Spec = new File(new File("src/test/scala/com/ximalaya/data/sync/Md5Spec.scala").getAbsolutePath) lazy val pathHDFS = new Path("/tmp/todd/a") lazy val pathLcoal = new File("/Users/cjuexuan/data/testfs/a") lazy val resources = Seq("/Users/cjuexuan/conf/hadoop/hadoop/core-site.xml","/Users/cjuexuan/conf/hadoop/hadoop/hdfs-site.xml")

implicit def getHadoopConf(resources: Seq[String]): Configuration = {

resources.foldLeft(new Configuration()) { case (conf, path) ⇒ conf.addResource(new Path(path))

conf

}

}

}12345678910111213141516

pathSpec:

import java.io.Fileimport org.apache.hadoop.fs.Pathimport org.scalatest.{FlatSpec, Matchers}/**

* Created by todd.chen on 16/3/15.

* email : [email protected]

*/class PathSpec extends FlatSpec with Matchers{

val path = new Path("/tmp/todd") val file = new File("/Users/cjuexuan/data/testfs") "File in hdfs and not in local" should "delete" in {

sync.needDelete(file,path,resources).length should be (1)

sync.needDelete(file,path,resources).head should be ("user_info")

} "File in local and not in hdfs" should "update" in {

sync.needUpload(file,path,resources).length should be (1)

sync.needUpload(file,path,resources).head should be ("b")

} "File in local and hdfs have diff md5" should "update" in {

sync.needUpdate(file,path,resources).length should be (1)

sync.needUpdate(file,path,resources).head should be ("c")

}

}12345678910111213141516171819202122232425262728293031

md5Spec:

import org.scalatest.{FlatSpec, Matchers}import scala.com.ximalaya.data.sync._/**

* Created by todd.chen on 16/3/14.

* email : [email protected]

*/class Md5Spec extends FlatSpec with Matchers{

"file md5Spec" should "get md5 with String type" in {

assert(sync.getLocalFileMd5(fileMd5Spec).isInstanceOf[String])

} "file md5Spec's md5 " should "format to int type with hex" in{

assert(BigInt(sync.getLocalFileMd5(fileMd5Spec),16).isInstanceOf[BigInt])

} "hdfs path" should "get md5 with String type " in {

assert(sync.getHdfsFileMd5(pathHDFS,getHadoopConf(resources)).isInstanceOf[String])

} "Same file in hdfs and lcoalSystem" should "have same md5" in{val localMd5 = sync.getLocalFileMd5(pathLcoal)val hdfsMd5 = sync.getHdfsFileMd5(pathHDFS,getHadoopConf(resources))

localMd5 should be (hdfsMd5)

}

}123456789101112131415161718192021222324252627282930313233

syncSpec:

import java.io.Fileimport org.apache.hadoop.fs.Pathimport org.scalatest.{FlatSpec, Matchers}/**

* Created by todd.chen on 16/3/15.

* email : [email protected]

*/class SyncerSpec extends FlatSpec with Matchers{// val hdfsSystem = FileSystem.get(resources)

val helper = PathSyncer "file in local and in hdfs is same file" should "do nothing" in {//val hdfsSystem = FileSystem.get(resources)

val local = new File("/Users/cjuexuan/data/testfs/a")val hdfs = new Path("/tmp/todd/a")val localMd5 = helper.getLocalFileMd5(local)val oldHDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)

localMd5 should be (oldHDFSMd5)

helper.syncHelper(local,hdfs,resources)val newHDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)

localMd5 should be (newHDFSMd5)

} "file in local and in hdfs with same name and diff file" should "be update" in {//val hdfsSystem = FileSystem.get(resources)

val local = new File("/Users/cjuexuan/data/testfs/b")val hdfs = new Path("/tmp/todd/b")val localMd5 = helper.getLocalFileMd5(local)val oldHDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)

localMd5 should not be oldHDFSMd5

helper.syncHelper(local,hdfs,resources)val newHDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)

localMd5 should be (newHDFSMd5)

} "in local is file and same path in hdfs is dir" should " be remove hdfs dir and upload local file" in {val local = new File("/Users/cjuexuan/data/testfs/b")val hdfs = new Path("/tmp/todd/b")val localMd5 = helper.getLocalFileMd5(local)

helper.syncHelper(local,hdfs,resources)val HDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)

localMd5 should be (HDFSMd5)

} "in local is dir and in hdfs is file" should "be remove hdfs file and upload local dir" in {val local = new File("/Users/cjuexuan/data/testfs")val hdfs = new Path("/tmp/todd/testfs")

helper.syncHelper(local,hdfs,resources)

} "both in local and hdfs is dir" should "sync to same" in {val local = new File("/Users/cjuexuan/data/testfs")val hdfs = new Path("/tmp/todd/testfs")

helper.syncHelper(local,hdfs,resources)

}

}

Hadoop中的文件格式大致上分为面向行和面向列两类:

面向行:TextFile、SequenceFile、MapFile、Avro Datafile

二进制格式文件大小比文本文件大。

生产环境常隐枣用,作为原始表的存储格式,会占用更多磁盘资源,对它的 解析开销一般会比二进制格式高 几十倍以上。

Hadoop API 提供的一种二进制文件,它将数据以<key,value>的形式序列化到文件中。这种二进制文件内部使用Hadoop 的标准的Writable 接口实现序列化和反序列化。它与Hadoop API中的MapFile 是互相册碰兼容的。

MapFile即为排序后的SequeneceFile,它会额外生成一个索引文件提供按键的查找。文件不支持复写 *** 作,不能向已存在的SequenceFile(MapFile)追加存储记录,在执行文件写 *** 作的时候,该文件是不可读取的。

Avro是一种用于支持数据密集型的二进制文件格州携谈式。它的文件格式更为紧凑,若要读取大量数据时,Avro能够提供更好的序列化和反序列化性能。并且Avro数据文件天生是带Schema定义的,所以它不需要开发者在API 级别实现自己的Writable对象。最近多个Hadoop 子项目都支持Avro 数据格式,如Pig 、Hive、Flume、Sqoop和Hcatalog。

面向列:Parquet 、RCFile、ORCFile

RCFile是Hive推出的一种专门面向列的数据格式。 它遵循“先按列划分,再垂直划分”的设计理念。当查询过程中,针对它并不关心的列时,它会在IO上跳过这些列。

ORCFile (Optimized Record Columnar File)提供了一种比RCFile更加高效的文件格式。其内部将数据划分为默认大小为250M的Stripe。每个Stripe包括索引、数据和Footer。索引存储每一列的最大最小值,以及列中每一行的位置。

Parquet 是一种支持嵌套结构的列式存储格式。Parquet 的存储模型主要由行组(Row Group)、列块(Column Chuck)、页(Page)组成。

1、行组,Row Group:Parquet 在水平方向上将数据划分为行组,默认行组大小与 HDFS Block 块大小对齐,Parquet 保证一个行组会被一个 Mapper 处理。

2、列块,Column Chunk:行组中每一列保存在一个列块中,一个列块具有相同的数据类型,不同的列块可以使用不同的压缩。

3、页,Page:Parquet 是页存储方式,每一个列块包含多个页,一个页是最小的编码的单位,同一列块的不同页可以使用不同的编码方式。

一般原始表数据使用文本格式存储,其他的都是列式存储。

目前在Hadoop中常用的几种压缩格式:lzo,gzip,snappy,bzip2,主要特性对比如下:

其性能对比如下:

2.1 lzo

hadoop中最流行的压缩格式,压缩/解压速度也比较快,合理的压缩率,支持split。适用于较大文本的处理。

对于lzo压缩,常用的有LzoCodec和lzopCodec,可以对sequenceFile和TextFile进行压缩。对TextFile压缩后,mapred对压缩后的文件默认是不能够进行split *** 作,需要对该lzo压缩文件进行index *** 作,生成lzo.index文件,map *** 作才可以进行split。如果设置LzoCodec,那么就生成.lzo后缀的文件,可以用LzoIndexer 进行支持split的index计算,如果设置LzopCodec,那么生成.lzo_deflate后缀的文件,不支持建立index。


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/tougao/12136868.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-21
下一篇 2023-05-21

发表评论

登录后才能评论

评论列表(0条)

保存