此前对大数据的知识了解的很少,于是晚上回去花时间突击大数据知识,白天就开始上手干,一边学一边做,总算在部门规定的时间,跟系统一起上线了。
后来的维护迭代就交给大数据去了,虽然接触大数据的时间不长,但是对我来说,确是很有意思的一段经历,觉得把当时匆匆学的知识点,再仔细回顾回顾,整理下。
01 大数据概述大数据: 就是对海量数据进行分析处理,得到一些有价值的信息,然后帮助企业做出判断和决策.处理流程: 1:获取数据 2:处理数据 3:展示结果02 Hadoop介绍Hadoop是一个分布式系基础框架,它允许使用简单的编程模型跨大型计算机的大型数据集进行分布式处理.它主要解决两个问题 大数据存储问题: HDFS 大数据计算问题:MapReduce2.1 问题一: 大文件怎么存储?假设一个文件非常非常大,大小为1PB/a.txt, 大到世界上所有的高级计算机都存储不下, 怎么办?为了保存大文件, 需要把文件放在多个机器上文件要分块 block(128M)不同的块放在不同的 HDFS 节点同时为了对外提供统一的访问, 让外部可以像是访问本机一样访问分布式文件系统有一个统一的 HDFS Master它保存整个系统的文件信息所有的文件元数据的修改都从 Master 开始2. 2 问题二: 大数据怎么计算?从一个网络日志文件中计算独立 IP, 以及其出现的次数如果数据量特别大,我们可以将,整个任务拆开, 划分为比较小的任务, 从而进行计算呢。
2.3 问题三: 如何将这些计算任务跑在集群中?如果能够在不同的节点上并行执行, 更有更大的提升, 如何把这些任务跑在集群中?可以设置一个集群的管理者, 这个地方叫做 Yarn这个集群管理者有一个 Master, 用于接收和分配任务这个集群管理者有多个 Slave, 用于运行任务2.4 Hadoop 的组成Hadoop分布式文件系统(HDFS) 提供对应用程序数据的高吞吐量访问的分布式文件系统Hadoop Common 其他Hadoop模块所需的Java库和实用程序。
这些库提供文件系统和 *** 作系统级抽象,并包含启动Hadoop所需的必要Java文件和脚本Hadoop MapReduce 基于YARN的大型数据集并行处理系统Hadoop YARN 作业调度和集群资源管理的框架2.5 Hadoop前生今世Hadoop最早起源于Nutch。
Nutch的设计目标是构建一个大型的全网搜索引擎,包括网页抓取、索引、查询等功能,但随着抓取网页数量的增加,遇到了严重的可扩展性问题——如何解决数十亿网页的存储和索引问题。
2003年、2004年谷歌发表的两篇论文为该问题提供了可行的解决方案。
——分布式文件系统(GFS),可用于处理海量网页的存储——分布式计算框架MAPREDUCE,可用于处理海量网页的索引计算问题。
Nutch的开发人员完成了相应的开源实现HDFS和MAPREDUCE,并从Nutch中剥离成为独立项目HADOOP,到2008年1月,HADOOP成为Apache顶级项目.狭义上来说,hadoop就是单独指代hadoop这个软件,HDFS :分布式文件系统MapReduce : 分布式计算系统广义上来说,hadoop指代大数据的一个生态圈,包括很多其他的软件2.6 hadoop的架构模型1.x的版本架构模型介绍文件系统核心模块:NameNode:集群当中的主节点,管理元数据(文件的大小,文件的位置,文件的权限),主要用于管理集群当中的各种数据secondaryNameNode:主要能用于hadoop当中元数据信息的辅助管理DataNode:集群当中的从节点,主要用于存储集群当中的各种数据数据计算核心模块:JobTracker:接收用户的计算请求任务,并分配任务给从节点TaskTracker:负责执行主节点JobTracker分配的任务2.x的版本架构模型介绍第一种:NameNode与ResourceManager单节点架构模型文件系统核心模块:NameNode:集群当中的主节点,主要用于管理集群当中的各种数据secondaryNameNode:主要能用于hadoop当中元数据信息的辅助管理DataNode:集群当中的从节点,主要用于存储集群当中的各种数据数据计算核心模块:ResourceManager:接收用户的计算请求任务,并负责集群的资源分配NodeManager:负责执行主节点APPmaster分配的任务第二种:NameNode单节点与ResourceManager高可用架构模型文件系统核心模块:NameNode:集群当中的主节点,主要用于管理集群当中的各种数据secondaryNameNode:主要能用于hadoop当中元数据信息的辅助管理DataNode:集群当中的从节点,主要用于存储集群当中的各种数据数据计算核心模块:ResourceManager:接收用户的计算请求任务,并负责集群的资源分配,以及计算任务的划分,通过zookeeper实现ResourceManager的高可用NodeManager:负责执行主节点ResourceManager分配的任务第三种:NameNode高可用与ResourceManager单节点架构模型文件系统核心模块:NameNode:集群当中的主节点,主要用于管理集群当中的各种数据,其中nameNode可以有两个,形成高可用状态DataNode:集群当中的从节点,主要用于存储集群当中的各种数据JournalNode:文件系统元数据信息管理数据计算核心模块:ResourceManager:接收用户的计算请求任务,并负责集群的资源分配,以及计算任务的划分NodeManager:负责执行主节点ResourceManager分配的任务第四种:NameNode与ResourceManager高可用架构模型文件系统核心模块:NameNode:集群当中的主节点,主要用于管理集群当中的各种数据,一般都是使用两个,实现HA高可用JournalNode:元数据信息管理进程,一般都是奇数个DataNode:从节点,用于数据的存储数据计算核心模块:ResourceManager:Yarn平台的主节点,主要用于接收各种任务,通过两个,构建成高可用NodeManager:Yarn平台的从节点,主要用于处理ResourceManager分配的任务03 Hadoop 核心介绍3.1 HDFSHDFS(Hadoop Distributed File System) 是一个 Apache Software Foundation 项目, 是 Apache Hadoop 项目的一个子项目. Hadoop 非常适于存储大型数据 (比如 TB 和 PB), 其就是使用 HDFS 作为存储系统. HDFS 使用多台计算机存储文件, 并且提供统一的访问接口, 像是访问一个普通文件系统一样使用分布式文件系统. HDFS 对数据文件的访问通过流的方式进行处理, 这意味着通过命令和 MapReduce 程序的方式可以直接使用 HDFS. HDFS 是容错的, 且提供对大数据集的高吞吐量访问.HDFS 的一个非常重要的特点就是一次写入、多次读取, 该模型降低了对并发控制的要求, 简化了数据聚合性, 支持高吞吐量访问. 而吞吐量是大数据系统的一个非常重要的指标, 吞吐量高意味着能处理的数据量就大.3.1.1 设计目标通过跨多个廉价计算机集群分布数据和处理来节约成本通过自动维护多个数据副本和在故障发生时来实现可靠性它们为存储和处理超大规模数据提供所需的扩展能力。
3.1.2 HDFS 的历史Doug Cutting 在做 Lucene 的时候, 需要编写一个爬虫服务, 这个爬虫写的并不顺利, 遇到了一些问题, 诸如: 如何存储大规模的数据, 如何保证集群的可伸缩性, 如何动态容错等2013年的时候, Google 发布了三篇论文, 被称作为三驾马车, 其中有一篇叫做 GFS, 是描述了 Google 内部的一个叫做 GFS 的分布式大规模文件系统, 具有强大的可伸缩性和容错性Doug Cutting 后来根据 GFS 的论文, 创造了一个新的文件系统, 叫做 HDFS3.1.3 HDFS 的架构NameNode 是一个中心服务器, 单一节点(简化系统的设计和实现), 负责管理文件系统的名字空间(NameSpace)以及客户端对文件的访问文件 *** 作, NameNode 是负责文件元数据的 *** 作, DataNode 负责处理文件内容的读写请求, 跟文件内容相关的数据流不经过 NameNode, 只询问它跟哪个 DataNode联系, 否则 NameNode 会成为系统的瓶颈副本存放在哪些 DataNode 上由 NameNode 来控制, 根据全局情况作出块放置决定, 读取文件时 NameNode 尽量让用户先读取最近的副本, 降低读取网络开销和读取延时NameNode 全权管理数据库的复制, 它周期性的从集群中的每个DataNode 接收心跳信合和状态报告, 接收到心跳信号意味着 DataNode 节点工作正常, 块状态报告包含了一个该 DataNode 上所有的数据列表3.1.4 HDFS文件副本和Block块存储所有的文件都是以 block 块的方式存放在 HDFS 文件系统当中, 在 Hadoop1 当中, 文件的 block 块默认大小是64M, hadoop2 当中, 文件的 block 块大小默认是 128M, block 块的大小可以通过 hdfs-site.xml 当中的配置文件进行指定<property> <name>dfs.block.size</name> <value>块大小 以字节为单位</value></property>(1)引入块机制的好处一个文件有可能大于集群中任意一个磁盘使用块抽象而不是文件可以简化存储子系统块非常适合用于数据备份进而提供数据容错能力和可用性(2)块缓存通常 DataNode 从磁盘中读取块, 但对于访问频繁的文件, 其对应的块可能被显式的缓存在 DataNode 的内存中, 以堆外块缓存的形式存在. 默认情况下,一个块仅缓存在一个 DataNode 的内存中,当然可以针对每个文件配置 DataNode 的数量. 作业调度器通过在缓存块的 DataNode 上运行任务, 可以利用块缓存的优势提高读 *** 作的性能.例如:连接(join) *** 作中使用的一个小的查询表就是块缓存的一个很好的候选用户或应用通过在缓存池中增加一个 Cache Directive 来告诉 NameNode 需要缓存哪些文件及存多久. 缓存池(Cache Pool) 是一个拥有管理缓存权限和资源使用的管理性分组.例如一个文件 130M, 会被切分成 2 个 block 块, 保存在两个 block 块里面, 实际占用磁盘 130M 空间, 而不是占用256M的磁盘空间(3)HDFS 文件权限验证HDFS 的文件权限机制与 Linux 系统的文件权限机制类似r:read w:write x:execute权限 x 对于文件表示忽略, 对于文件夹表示是否有权限访问其内容 如果 Linux 系统用户 zhangsan 使用 Hadoop 命令创建一个文件, 那么这个文件在 HDFS 当中的 Owner 就是 zhangsan HDFS 文件权限的目的, 防止好人做错事, 而不是阻止坏人做坏事. HDFS相信你告诉我你是谁, 你就是谁3.1.5 HDFS 的元信息和 SecondaryNameNode当 Hadoop 的集群当中, 只有一个 NameNode 的时候, 所有的元数据信息都保存在了 FsImage 与 Eidts 文件当中, 这两个文件就记录了所有的数据的元数据信息, 元数据信息的保存目录配置在了 hdfs-site.xml 当中<property> <name>dfs.namenode.name.dir</name> <value>file:///export/servers/hadoop-3.1.1/datas/namenode/namenodedatas</value></property><property> <name>dfs.namenode.edits.dir</name> <value>file:///export/servers/hadoop-3.1.1/datas/dfs/nn/edits</value></property>(1) FsImage 和 Edits 详解editsedits 存放了客户端最近一段时间的 *** 作日志客户端对 HDFS 进行写文件时会首先被记录在 edits 文件中edits 修改时元数据也会更新每次 HDFS 更新时 edits 先更新后客户端才会看到最新信息fsimageNameNode 中关于元数据的镜像, 一般称为检查点, fsimage 存放了一份比较完整的元数据信息因为 fsimage 是 NameNode 的完整的镜像, 如果每次都加载到内存生成树状拓扑结构,这是非常耗内存和CPU, 所以一般开始时对 NameNode 的 *** 作都放在 edits 中fsimage 内容包含了 NameNode 管理下的所有 DataNode 文件及文件 block 及 block 所在的 DataNode 的元数据信息.随着 edits 内容增大, 就需要在一定时间点和 fsimage 合并(2)fsimage 中的文件信息查看官方查看文档使用命令 hdfs oivcd /export/servers/hadoop-3.1.1/datas/namenode/namenodedatashdfs oiv -i fsimage_0000000000000000864 -p XML -o hello.xml(3) edits 中的文件信息查看官方查看文档使用命令 hdfs oevcd /export/servers/hadoop-3.1.1/datas/dfs/nn/editshdfs oev -i edits_0000000000000000865-0000000000000000866 -o myedit.xml -p XML(4) SecondaryNameNode 如何辅助管理 fsimage 与 edits 文件?SecondaryNameNode 定期合并 fsimage 和 edits, 把 edits 控制在一个范围内配置 SecondaryNameNodeSecondaryNameNode 在 conf/masters 中指定在 masters 指定的机器上, 修改 hdfs-site.xml<property> <name>dfs.http.address</name> <value>host:50070</value> </property> 修改 core-site.xml, 这一步不做配置保持默认也可以<!– 多久记录一次 HDFS 镜像, 默认 1小时 –> <property> <name>fs.checkpoint.period</name> <value>3600</value> </property> <!– 一次记录多大, 默认 64M –> <property> <name>fs.checkpoint.size</name> <value>67108864</value> </property>SecondaryNameNode 通知 NameNode 切换 editlogSecondaryNameNode 从 NameNode 中获得 fsimage 和 editlog(通过http方式)SecondaryNameNode 将 fsimage 载入内存, 然后开始合并 editlog, 合并之后成为新的 fsimageSecondaryNameNode 将新的 fsimage 发回给 NameNodeNameNode 用新的 fsimage 替换旧的 fsimage(5)特点完成合并的是 SecondaryNameNode, 会请求 NameNode 停止使用 edits, 暂时将新写 *** 作放入一个新的文件中 edits.newSecondaryNameNode 从 NameNode 中通过 Http GET 获得 edits, 因为要和 fsimage 合并, 所以也是通过 Http Get 的方式把 fsimage 加载到内存, 然后逐一执行具体对文件系统的 *** 作, 与 fsimage 合并, 生成新的 fsimage, 然后通过 Http POST 的方式把 fsimage 发送给 NameNode. NameNode 从 SecondaryNameNode 获得了 fsimage 后会把原有的 fsimage 替换为新的 fsimage, 把 edits.new 变成 edits. 同时会更新 fstimeHadoop 进入安全模式时需要管理员使用 dfsadmin 的 save namespace 来创建新的检查点SecondaryNameNode 在合并 edits 和 fsimage 时需要消耗的内存和 NameNode 差不多, 所以一般把 NameNode 和 SecondaryNameNode 放在不同的机器上3.1.6 HDFS 文件写入过程Client 发起文件上传请求, 通过 RPC 与 NameNode 建立通讯, NameNode 检查目标文件是否已存在, 父目录是否存在, 返回是否可以上传Client 请求第一个 block 该传输到哪些 DataNode 服务器上NameNode 根据配置文件中指定的备份数量及机架感知原理进行文件分配, 返回可用的 DataNode 的地址如: A, B, CHadoop 在设计时考虑到数据的安全与高效, 数据文件默认在 HDFS 上存放三份, 存储策略为本地一份, 同机架内其它某一节点上一份, 不同机架的某一节点上一份。
Client 请求 3 台 DataNode 中的一台 A 上传数据(本质上是一个 RPC 调用,建立 pipeline ), A 收到请求会继续调用 B, 然后 B 调用 C, 将整个 pipeline 建立完成, 后逐级返回 clientClient 开始往 A 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存), 以 packet 为单位(默认64K), A 收到一个 packet 就会传给 B, B 传给 C. A 每传一个 packet 会放入一个应答队列等待应答数据被分割成一个个 packet 数据包在 pipeline 上依次传输, 在 pipeline 反方向上, 逐个发送 ack(命令正确应答), 最终由 pipeline 中第一个 DataNode 节点 A 将 pipelineack 发送给 Client当一个 block 传输完成之后, Client 再次请求 NameNode 上传第二个 block 到服务 13.1.7. HDFS 文件读取过程Client向NameNode发起RPC请求,来确定请求文件block所在的位置;NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode 都会返回含有该 block 副本的 DataNode 地址; 这些返回的 DN 地址,会按照集群拓扑结构得出 DataNode 与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离 Client 近的排靠前;心跳机制中超时汇报的 DN 状态为 STALE,这样的排靠后;Client 选取排序靠前的 DataNode 来读取 block,如果客户端本身就是DataNode,那么将从本地直接获取数据(短路读取特性);底层上本质是建立 Socket Stream(FSDataInputStream),重复的调用父类 DataInputStream 的 read 方法,直到这个块上的数据读取完毕;当读完列表的 block 后,若文件读取还没有结束,客户端会继续向NameNode 获取下一批的 block 列表;读取完一个 block 都会进行 checksum 验证,如果读取 DataNode 时出现错误,客户端会通知 NameNode,然后再从下一个拥有该 block 副本的DataNode 继续读。
read 方法是并行的读取 block 信息,不是一块一块的读取;NameNode 只是返回Client请求包含块的DataNode地址,并不是返回请求块的数据;最终读取来所有的 block 会合并成一个完整的最终文件。
3.1.8. HDFS 的 API *** 作(1)导入 Maven 依赖<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository></repositories><dependencies> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>3.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency></dependencies>(2)概述在 Java 中 *** 作 HDFS, 主要涉及以下 Class:Configuration该类的对象封转了客户端或者服务器的配置FileSystem该类的对象是一个文件系统对象, 可以用该对象的一些方法来对文件进行 *** 作, 通过 FileSystem 的静态方法 get 获得该对象FileSystem fs = FileSystem.get(conf) get 方法从 conf 中的一个参数 fs.defaultFS 的配置值判断具体是什么类型的文件系统如果我们的代码中没有指定 fs.defaultFS, 并且工程 ClassPath 下也没有给定相应的配置, conf 中的默认值就来自于 Hadoop 的 Jar 包中的 core-default.xml默认值为 file:///, 则获取的不是一个 DistributedFileSystem 的实例, 而是一个本地文件系统的客户端对象(3)获取 FileSystem 的几种方式第一种方式@Testpublic void getFileSystem() throws URISyntaxException, IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), configuration); System.out.println(fileSystem.toString());}第二种方式@Testpublic void getFileSystem2() throws URISyntaxException, IOException { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020"); FileSystem fileSystem = FileSystem.get(new URI("/"), configuration); System.out.println(fileSystem.toString());}第三种方式@Testpublic void getFileSystem3() throws URISyntaxException, IOException { Configuration configuration = new Configuration(); FileSystem fileSystem = FileSystem.newInstance(new URI("hdfs://192.168.52.250:8020"), configuration); System.out.println(fileSystem.toString());}第四种方式@Testpublic void getFileSystem4() throws Exception{ Configuration configuration = new Configuration(); configuration.set("fs.defaultFS","hdfs://192.168.52.250:8020"); FileSystem fileSystem = FileSystem.newInstance(configuration); System.out.println(fileSystem.toString());}(4)遍历 HDFS 中所有文件递归遍历@Testpublic void listFile() throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.100:8020"), new Configuration()); FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/")); for (FileStatus fileStatus : fileStatuses) { if(fileStatus.isDirectory()){ Path path = fileStatus.getPath(); listAllFiles(fileSystem,path); }else{ System.out.println("文件路径为"+fileStatus.getPath().toString()); } }}public void listAllFiles(FileSystem fileSystem,Path path) throws Exception{ FileStatus[] fileStatuses = fileSystem.listStatus(path); for (FileStatus fileStatus : fileStatuses) { if(fileStatus.isDirectory()){ listAllFiles(fileSystem,fileStatus.getPath()); }else{ Path path1 = fileStatus.getPath(); System.out.println("文件路径为"+path1); } }}使用 API 遍历@Testpublic void listMyFiles()throws Exception{ //获取fileSystem类 FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration()); //获取RemoteIterator 得到所有的文件或者文件夹,第一个参数指定遍历的路径,第二个参数表示是否要递归遍历 RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fileSystem.listFiles(new Path("/"), true); while (locatedFileStatusRemoteIterator.hasNext()){ LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); System.out.println(next.getPath().toString()); } fileSystem.close();}(5)下载文件到本地@Testpublic void getFileToLocal()throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration()); FSDataInputStream open = fileSystem.open(new Path("/test/input/install.log")); FileOutputStream fileOutputStream = new FileOutputStream(new File("c:\install.log")); IOUtils.copy(open,fileOutputStream ); IOUtils.closeQuietly(open); IOUtils.closeQuietly(fileOutputStream); fileSystem.close();}(6)HDFS 上创建文件夹@Testpublic void mkdirs() throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration()); boolean mkdirs = fileSystem.mkdirs(new Path("/hello/mydir/test")); fileSystem.close();}(7)HDFS 文件上传@Testpublic void putData() throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration()); fileSystem.copyFromLocalFile(new Path("file:///c:\install.log"),new Path("/hello/mydir/test")); fileSystem.close();}(8)伪造用户停止hdfs集群,在node01机器上执行以下命令cd /export/servers/hadoop-3.1.1sbin/stop-dfs.sh修改node01机器上的hdfs-site.xml当中的配置文件cd /export/servers/hadoop-3.1.1/etc/hadoopvim hdfs-site.xml<property> <name>dfs.permissions.enabled</name> <value>true</value></property>修改完成之后配置文件发送到其他机器上面去scp hdfs-site.xml node02:$PWDscp hdfs-site.xml node03:$PWD重启hdfs集群cd /export/servers/hadoop-3.1.1sbin/start-dfs.sh随意上传一些文件到我们hadoop集群当中准备测试使用cd /export/servers/hadoop-3.1.1/etc/hadoophdfs dfs -mkdir /confighdfs dfs -put *.xml /confighdfs dfs -chmod 600 /config/core-site.xml使用代码准备下载文件@Testpublic void getConfig()throws Exception{ FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop"); fileSystem.copyToLocalFile(new Path("/config/core-site.xml"),new Path("file:///c:/core-site.xml")); fileSystem.close();}(9)小文件合并由于 Hadoop 擅长存储大文件,因为大文件的元数据信息比较少,如果 Hadoop 集群当中有大量的小文件,那么每个小文件都需要维护一份元数据信息,会大大的增加集群管理元数据的内存压力,所以在实际工作当中,如果有必要一定要将小文件合并成大文件进行一起处理在我们的 HDFS 的 Shell 命令模式下,可以通过命令行将很多的 hdfs 文件合并成一个大文件下载到本地cd /export/servershdfs dfs -getmerge /config/*.xml ./hello.xml既然可以在下载的时候将这些小文件合并成一个大文件一起下载,那么肯定就可以在上传的时候将小文件合并到一个大文件里面去@Testpublic void mergeFile() throws Exception{ //获取分布式文件系统 FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.52.250:8020"), new Configuration(),"hadoop"); FSDataOutputStream outputStream = fileSystem.create(new Path("/bigfile.xml")); //获取本地文件系统 LocalFileSystem local = FileSystem.getLocal(new Configuration()); //通过本地文件系统获取文件列表,为一个集合 FileStatus[] fileStatuses = local.listStatus(new Path("file:///F:\传智播客大数据离线阶段课程资料\3、大数据离线第三天\上传小文件合并")); for (FileStatus fileStatus : fileStatuses) { FSDataInputStream inputStream = local.open(fileStatus.getPath()); IOUtils.copy(inputStream,outputStream); IOUtils.closeQuietly(inputStream); } IOUtils.closeQuietly(outputStream); local.close(); fileSystem.close();}04 MapReduce介绍MapReduce思想在生活中处处可见。
或多或少都曾接触过这种思想。
MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。
可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
MapReduce运行在yarn集群ResourceManagerNodeManager这两个阶段合起来正是MapReduce思想的体现。
4.1 MapReduce设计思想和架构MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
Hadoop MapReduce构思:分而治之对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略。
并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。
不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!统一构架,隐藏系统层细节如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。
MapReduce最大的亮点在于通过抽象模型和计算框架把需要做什么(what need to do)与具体怎么做(how to do)分开了,为程序员提供一个抽象和高层的编程接口和框架。
程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。
如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点集群的自动调度使用。
构建抽象模型:Map和ReduceMapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型Map: 对一组数据元素进行某种重复式的处理;Reduce: 对Map的中间结果进行某种进一步的结果整理。
Map和Reduce为程序员提供了一个清晰的 *** 作接口抽象描述。
MapReduce处理的数据类型是键值对。
MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:Map: (k1; v1) → [(k2; v2)]Reduce: (k2; [v2]) → [(k3; v3)]MapReduce 框架结构一个完整的mapreduce程序在分布式运行时有三类实例进程:MRAppMaster 负责整个程序的过程调度及状态协调MapTask 负责map阶段的整个数据处理流程ReduceTask 负责reduce阶段的整个数据处理流程4.2 MapReduce编程规范MapReduce 的开发一共有八个步骤, 其中 Map 阶段分为 2 个步骤,Shuffle 阶段 4个步骤,Reduce 阶段分为 2 个步骤Map 阶段 2 个步骤设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步自定义 Map 逻辑, 将第一步的结果转换成另外的 Key-Value(K2和V2) 对, 输出结果Shuffle 阶段 4 个步骤对输出的 Key-Value 对进行分区对不同分区的数据按照相同的 Key 排序(可选) 对分组过的数据初步规约, 降低数据的网络拷贝对数据进行分组, 相同 Key 的 Value 放入一个集合中Reduce 阶段 2 个步骤对多个 Map 任务的结果进行排序以及合并, 编写 Reduce 函数实现自己的逻辑, 对输入的 Key-Value 进行处理, 转为新的 Key-Value(K3和V3)输出设置 OutputFormat 处理并保存 Reduce 输出的 Key-Value 数据转换为代码,例子如下Map阶段public class WordCountMapper extends Mapper<Text,Text,Text, LongWritable> { /** * K1-----V1 * A -----A * B -----B * C -----C * * K2-----V2 * A -----1 * B -----1 * C -----1 * * @param key * @param value * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key,new LongWritable(1)); }}Reduce阶段public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0L; for (LongWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); }}shuffle阶段,举一个分区的例子:public class WordCountPartitioner extends Partitioner<Text, LongWritable> { @Override public int getPartition(Text text, LongWritable longWritable, int i) { if (text.toString().length() > 5) { return 1; } return 0; }}主方法public class JobMain extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(),new JobMain(),args); } @Override public int run(String[] strings) throws Exception { Job job = Job.getInstance(super.getConf(), "wordcout"); job.setJarByClass(JobMain.class); //输入 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("/")); //map job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //shuffle阶段 job.setPartitionerClass(WordCountPartitioner.class); job.setNumReduceTasks(2); //reduce阶段 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //输出 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("/")); return 0; }}4.4 MapTask运行机制具体步骤:读取数据组件 InputFormat(默认 TextInputFormat) 会通过 getSplits 方法对输入目录中文件进行逻辑切片规划得到 splits, 有多少个 split 就对应启动多少个MapTask . split 与 block 的对应关系默认是一对一将输入文件切分为 splits 之后, 由 RecordReader 对象 (默认是LineRecordReader)进行读取, 以 n 作为分隔符, 读取一行数据, 返回 <key,value> . Key 表示每行首字符偏移值,Value 表示这一行文本内容读取 split 返回 <key,value> , 进入用户自己继承的 Mapper 类中,执行用户重写的 map 函数, RecordReader 读取一行这里调用一次Mapper 逻辑结束之后, 将 Mapper 的每条结果通过 context.write 进行collect数据收集. 在 collect 中, 会先对其进行分区处理,默认使用 HashPartitioner。
MapReduce 提供 Partitioner 接口, 它的作用就是根据 Key 或 Value 及Reducer 的数量来决定当前的这对输出数据最终应该交由哪个 Reduce task处理, 默认对 Key Hash 后再以 Reducer 数量取模. 默认的取模方式只是为了平均 Reducer 的处理能力, 如果用户自己对 Partitioner 有需求, 可以订制并设置到 Job 上。
接下来, 会将数据写入内存, 内存中这片区域叫做环形缓冲区, 缓冲区的作用是批量收集Mapper 结果, 减少磁盘 IO 的影响. 我们的 Key/Value 对以及 Partition 的结果都会被写入缓冲区. 当然, 写入之前,Key 与 Value 值都会被序列化成字节数组。
环形缓冲区其实是一个数组, 数组中存放着 Key, Value 的序列化数据和 Key,Value 的元数据信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及Value 的长度. 环形结构是一个抽象概念缓冲区是有大小限制, 默认是 100MB. 当 Mapper 的输出结果很多时, 就可能会撑爆内存, 所以需要在一定条件下将缓冲区中的数据临时写入磁盘, 然后重新利用这块缓冲区. 这个从内存往磁盘写数据的过程被称为 Spill, 中文可译为溢写. 这个溢写是由单独线程来完成, 不影响往缓冲区写 Mapper 结果的线程.溢写线程启动时不应该阻止 Mapper 的结果输出, 所以整个缓冲区有个溢写的比例 spill.percent . 这个比例默认是 0.8, 也就是当缓冲区的数据已经达到阈值 buffer size * spill percent = 100MB * 0.8 = 80MB , 溢写线程启动,锁定这 80MB 的内存, 执行溢写过程. Mapper 的输出结果还可以往剩下的20MB 内存中写, 互不影响当溢写线程启动后, 需要对这 80MB 空间内的 Key 做排序 (Sort). 排序是 MapReduce模型默认的行为, 这里的排序也是对序列化的字节做的排序如果 Job 设置过 Combiner, 那么现在就是使用 Combiner 的时候了. 将有相同 Key 的 Key/Value 对的 Value 加起来, 减少溢写到磁盘的数据量.Combiner 会优化 MapReduce 的中间结果, 所以它在整个模型中会多次使用那哪些场景才能使用 Combiner 呢? 从这里分析, Combiner 的输出是Reducer 的输入, Combiner 绝不能改变最终的计算结果. Combiner 只应该用于那种 Reduce 的输入 Key/Value 与输出 Key/Value 类型完全一致, 且不影响最终结果的场景. 比如累加, 最大值等. Combiner 的使用一定得慎重, 如果用好, 它对 Job 执行效率有帮助, 反之会影响 Reducer 的最终结果合并溢写文件, 每次溢写会在磁盘上生成一个临时文件 (写之前判断是否有 Combiner),如果 Mapper 的输出结果真的很大, 有多次这样的溢写发生, 磁盘上相应的就会有多个临时文件存在. 当整个数据处理结束之后开始对磁盘中的临时文件进行 Merge 合并, 因为最终的文件只有一个, 写入磁盘, 并且为这个文件提供了一个索引文件, 以记录每个reduce对应数据的偏移量4.5 ReduceTask工作机制Reduce 大致分为 copy、sort、reduce 三个阶段,重点在前两个阶段。
copy 阶段包含一个 eventFetcher 来获取已完成的 map 列表,由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inMemoryMerger 和 onDiskMerger,分别将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge。
待数据 copy 完成之后,copy 阶段就完成了,开始进行 sort 阶段,sort 阶段主要是执行 finalMerge *** 作,纯粹的 sort 阶段,完成之后就是 reduce 阶段,调用用户定义的 reduce 函数进行处理详细步骤:Copy阶段 ,简单地拉取数据。
Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
Merge阶段 。
这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。
Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。
merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。
默认情况下第一种形式不启用。
当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。
与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。
第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
合并排序 。
把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
对排序后的键值对调用reduce方法 ,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。
4.6 Shuffle具体流程map 阶段处理的数据如何传递给 reduce 阶段,是 MapReduce 框架中最关键的一个流程,这个流程就叫 shuffleshuffle: 洗牌、发牌 ——(核心机制:数据分区,排序,分组,规约,合并等过程)Collect阶段 :将 MapTask 的结果输出到默认大小为 100M 的环形缓冲区,保存的是key/value,Partition 分区信息等。
Spill阶段 :当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的 *** 作,如果配置了 combiner,还会将有相同分区号和 key 的数据进行排序。
Merge阶段 :把所有溢出的临时文件进行一次合并 *** 作,以确保一个 MapTask 最终只产生一个中间数据文件。
Copy阶段 :ReduceTask 启动 Fetcher 线程到已经完成 MapTask 的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
Merge阶段 :在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并 *** 作。
Sort阶段 :在对数据进行合并的同时,会进行排序 *** 作,由于 MapTask 阶段已经对数据进行了局部的排序,ReduceTask 只需保证 Copy 的数据的最终整体有效性即可。
Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快缓冲区的大小可以通过参数调整, 参数:mapreduce.task.io.sort.mb 默认100M
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)