MD具体代码和hdfs数据格式详解

MD具体代码和hdfs数据格式详解,第1张

M/D具体代码和hdfs数据格式详解

WordCount类

一、drver 类

  • Configuration:与HDFS中的Configuration一致,负责参数的加载和传递
    Job:作业,是对一轮MapReduce任务的抽象,即一个MapReduce的执行全过程的管理类
    FileInputFormat:指定输入数据的工具类,用于指定任务的输入数据路径
    FileOutputFormat:指定输出数据的工具类,用于指定任务的输出数据路径
  • 实现
  1. 先得到集群的配置参数,用 Configuration类
  2. 将集群参数设置到本次的job实例中,Job类
  3. 指定本次执行的主类,也就是本drver类
  4. 指定map类、combiner类、reducer类
  5. 指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型
  6. 指定输入数据的路径、输出路径,并要求该输出路径一定是不存在的(他会读取指定目录下的所有文件,不能读目录,然后生成并输出在指定的输出目录下),FileInputFormat,FileOutputFormat
  7. 指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!

二、map类编写

  • Mapper:是MapReduce计算框架中Map过程的封装
    Text:Hadoop对Java String类的封装,适用于Hadoop对文本字符串的处理
    IntWritable:Hadoop对JavaInteger类的封装,适用于Hadoop整型的处理
    Context:Hadoop环境基于上下文的 *** 作对象,如Map中key/value的输出、分布式缓存数据、分布式参数传递等
    StringTokenizer:对String对象字符串的 *** 作类,做基于空白字符的切分 *** 作工具类
  • 代码编写
    • 类继承Mapper泛型里Object是偏移量,Text是传入文本,另一个是输出文本,IntWritable是词频
    • 暂存每个词和它的的词频(可以避免下面重复申请耗费空间)
      1. 编写核心map方法 *** 作(Object Text)
      2. 引入一个Context对象,可以创建也可以在入参列表里加一个
      3. 用传进来的Text类型的每行文本,初始化StringTokenizer相当于分割成字符数组
      4. 循环取得每个分割出来的元素放到刚才定义过的词对象中
      5. 通过Context对象将map输出(词 , 词频)

三、reduce类编写

  • Reducer:是MapReduce计算框架中Reduce过程的封装
  • 实现
    • 类继承Reducer,接收shuffle过来的k,v对,它有一个GroupingComparator分组,把k相同的合并一起,把v做成一个集合,成形式,传给下面的reduce方法,GroupingComparator在分组问题中可以通过重写方法实现.
    • 将IntWritable 类型value和在mapper一样在类里定义
    • reduce方法编写
      1. 它的入参是k,数字类型的转换器接收的value集合和Context
      2. 计算value的和,可用加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值,求出和sum
      3. 将sum赋值给上面IntWritable 类型value
      4. 用 context.write(key, value);将计算结果逐条输出

执行

  • 打包,上传
  • 运行
    yarn jar 包名(加全类名可以指定运行哪个类) 要分析的文件夹 想要创建并输出分析结果的文件夹(目录)

三合一标准代码实现

把三个类放一块就行了,公用的东西可以提出来进一步优化

MapReduce Shell应用

查看

  • mapred称为一级命令,直接输入mapred回车,即可查看二级命令
  • 输入一级命令mapred后,再任意输入一个二级命令,即可查看三级命令

  • 查看当前任务列表
    mapred job -list

  • 终止(kill)一个任务的执行

     job -kill job-id
    
  • 查看日志

    mapred job -logs job-id
    

MapReduce技术特征

  1. 向“外”横向扩展,而非向“上”纵向扩展
  2. 移动计算,把处理向数据迁移(数据本地性)
  3. 顺序处理数据、避免随机访问数据
  4. 为应用开发隐藏系统底层细节
  5. 平滑无缝的可扩展性
  6. 推测执行
    • 采用推测执行机制,发现某个任务的运行速度远低于任务平均速度,会为慢的任务启动一个备份任务,同时运行。哪个先运行完,采用哪个结果
  7. 失效被认为是常态

相关技术问题

1.判断一个输入串的类型

  1. 正则表达式
    • Java实现正则表达式的核心类
      • Pattern
        获取pattern实现,Pattern.compile

      • Matcher
        Pattern.matcher可以得到matcher实例
        其有3个匹配方法
        Matches:全部匹配
        lookingAt: 前向匹配
        find:任意匹配
        String digitalRegex="[u4e00-u9fa5]+";
        String input=“中国123”;
        Pattern pattern= Pattern.compile(digitalRegex);

                Matcher matcher=pattern.matcher(input);
        

        matcher 返回布尔类型true为是

  2. 强制转化
    强制转化,报错就不是
  3. 现成的api
    StringUtils.isNumbers();//判断是不是数字
    2.hdfs当中数据可分block块存储和可切分计算的问题
  4. 是不是所有的文件都可以分块存储
    是,都是二进制文件,可以切分
  5. 是不是所有的文件都可以进行切分
    不是因为涉及到计算,需要针对数据的格式进行区分对待。

3.企业当中的研发环境的分类

  1. 生产环境-线上环境-product
  2. 试验田环境-高度模拟生产环境
  3. 测试环境-test
  4. 开发环境-dev
  5. QA环境(质量监控,独立的环境)

hdfs文件格式

面向行/列 类型名称 是否可切分 优点 缺点 适用场景
面向行 .txt 是 查看,编辑简单 无压缩占空间大、传输压力大、数据解析开销大 学习练习
面向列 .seq 是 原生,k,v二进制存储 数据堆起来的,本地查看不方便 生产环境,map输出的默认格式
面向列 .rc 是 横着切,放一个块里,再按列存储,加载快、查询快、空间利用率高、可高负载 好,但不是最好,中庸 学习生产
面向列 .orc 是 和.rc一样优点,支持新的数据类型,提高了速度 同上 同上

可切分性 类型名称 是否原生 优点 缺点 适用于
可切分 lzo(.lzo) 否 压缩/解压速度快,合理的压缩率 压缩率比gzip低,非原生、需要native安装 压缩完成后>=200M为宜,越大越好
可切分 bzip2(.bz2) 是 高压缩率超过gzip,原生支持、不需要native安装,用linux bzip可解压 *** 作 压缩/解压速率慢 处理速度要求不高、压缩率要求高的情况
不可切分 gzip(.gz) 是 压缩/解压速度快,原生/native都支持,使用方便 不可切分,对cpu要求高 128M上下适宜
不可切分 snappy(.snappy) 否 高压缩/解压速度,合理的压缩率 压缩率比gzip低,非原生、需要native安装 适合作为map到reduce或是job数据流中间的数据传输格式

输出gzip格式

1.先在driver里使用参数解析器,就可以输入系统参数了

 //参数解析器
             GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
           String[] remainingArgs = optionParser.getRemainingArgs();//Remaining是除了系统命令之外的应用参数

输入和输出路径也要改成remainingArgs[0]``remainingArgs[1]

2.命令差别

通过shell命令改动,添加参数设置模板:
yarn jar jar_path main_class_path -Dk1=v1参数列表
具体应用:

yarn jar TlHadoopCore-jar-with-dependencies.jar   
com.tianliangedu.examples.WordCountV2 
-Dmapred.output.compress=true 
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec 
/tmp/tianliangedu/input /tmp/tianliangedu/output19

自定义Partition

原来划分Partition是通过哈希值对reduce求模,return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;其中Integer.MAX_VALUE是用来让哈希值不为负,一般不需要,但是有时候可能重写

1.reduce数量确认时机

  1. 在Job提交后,任务正式开始计算之前即已经确定
  2. Map数量的确定:由输入数据文件的总大小、数据格式、块大小综合确定,待冲刺环节详解
  3. Reduce数量确定:系统根据输入数据量的大小自动确定,有固定的计算公式,超过1g就是两个

2.自定义reduce数量

yarn jar TlHadoopCore-jar-with-dependencies.jar 
com.tianliangedu.examples.WordCountV2 
-Dmapred.output.compress=true 
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec 
-Dmapred.reduce.tasks=2 
/tmp/tianliangedu/input /tmp/tianliangedu/output38

这个可以写个脚本,建一个.sh文件,首行#! /bin/bash表明bash运行,写命令,用sh 文件名运行

3.自定义Partition实现

通过继承Partitioner类,自定义实现Partition

    public static class MyHashPartitioner extends Partitioner {

        
        public int getPartition(K key, V value, int numReduceTasks) {
            return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks;
            // return key.toString().charAt(0);
        }
}

只需要改它的判定函数,代码就是判断小于q的是0大于的是1再和reduce模

4.通过代码中指定partition来实现

在driver中指定map类下指定

job.setPartitionerClass(MyHashPartitioner.class);

通过配置参数实现

  • 不需要driver指定,在shell里指定,(不用改代码,增加了灵活性)
  • yarn jar TlHadoopCore-jar-with-dependencies.jar 
    com.tianliangedu.examples.SelfDefinePartitioner4ShellConfigure 
    -Dmapred.output.compress=true 
    -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec 
    -Dmapred.reduce.tasks=2 
    -Dmapreduce.job.partitioner.class=com.tianliangedu.examples.MyHashPartitioner 
    /tmp/tianliangedu/input /tmp/tianliangedu/output44
    
    增加了-Dmapreduce.job.partitioner.class=com.tianliangedu.examples.MyHashPartitioner 一条系统命令来指定Partition类去掉combiner

因为有的计算比如平均数不能有combiner,所以可以在driver注释指定combiner类来去点combiner

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/4666284.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存