第五章 YARN

第五章 YARN,第1张

第五章 YARN 5.1 概念

YARN主要负责集群资源的管理和调度。支持主从结构。主节点最多可以有2个,从节点可以有多个。YARN主要管理内存和CPU两种资源类型。

ResourceManager

主要负责集群资源的分配和管理

NodeManager

主要负责当前机器的资源管理。当NodeManager节点启动的时候会自动向ResourceManage注册,将当前节点的可用CPU和内存注册到ResourceManager。这样所有的NodeManager注册完成之后,ResourceManager就知道集群的资源总量。默认为每个子节点分配8G内存和8个CPU

  # 默认每个节点8G内存
  
    Amount of physical memory, in MB, that can be allocated 
    for containers. If set to -1 and
    yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
    automatically calculated(in case of Windows and Linux).
    In other cases, the default is 8192MB.
    
    yarn.nodemanager.resource.memory-mb
    -1
  
# CPU 分配个数

    Number of vcores that can be allocated
    for containers. This is used by the RM scheduler when allocating
    resources for containers. This is not used to limit the number of
    CPUs used by YARN containers. If it is set to -1 and
    yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
    automatically determined from the hardware in case of Windows and Linux.
    In other cases, number of vcores is 8 by default.
    yarn.nodemanager.resource.cpu-vcores
    -1
  

在yarn-default.xml 中定义了默认值,源码位置 hadoop-3.2.0-srchadoop-yarn-projecthadoop-yarnhadoop-yarn-commonsrcmainresourcesyarn-default.xml

5.2 常见调度器

FIFO(先进先出调度策略)

是先进先出的,大家都是排队的,如果你的任务申请不到足够的资源,那你就等着,等前面的任务执行结束释放了资源之后你再执行。这种在有些时候是不合理的,因为我们有一些任务的优先级比较高,我们希望任务提交上去立刻就开始执行,这个就实现不了了。

Capacity Scheduler(FIFP Scheduler的多队列版本)

它是FifoScheduler的多队列版本,就是我们先把集群中的整块资源划分成多份,我们可以人为的给这些资源定义使用场景,例如图里面的queue A里面运行普通的任务,queueB中运行优先级比较高的任务。这两个队列的资源是相互对立的

FailScheduler(多队列,多用户共享资源)

支持多个队列,每个队列可以配置一定的资源,每个队列中的任务共享其所在队列的所有资源,不需要排队等待资源
具体是这样的,假设我们向一个队列中提交了一个任务,这个任务刚开始会占用整个队列的资源,当你再提交第二个任务的时候,第一个任务会把他的资源释放出来一部分给第二个任务使用

在实际工作中使用Capacity Scheduler,在Hadoop2以后多队列版本就是集群中的默认调度器了。

5.3 配置

需求

配置三个队列 default(默认任务) offline(离线任务) online(实时任务),三个任务的占比是7:2:1

更改配置文件

修改配置文件 capacity-scheduler.xml

vi/data/soft/hadoop-3.2.0/etc/hadoop/capacity-scheduler.xml

 
    yarn.scheduler.capacity.root.queues 
    default,online,offline 
    队列列表,多个队列之间使用逗号分割 
 
 
    yarn.scheduler.capacity.root.default.capacity 
    70 
    default队列70% 
 
 
    yarn.scheduler.capacity.root.online.capacity 
    10 
    online队列10% 
 
 
    yarn.scheduler.capacity.root.offline.capacity 
    20 
    offline队列20% 
 
 
    yarn.scheduler.capacity.root.default.maximum-capacity 
    70 
    Default队列可使用的资源上限. 
 
 
    yarn.scheduler.capacity.root.online.maximum-capacity 
    10 
    online队列可使用的资源上限. 
 
 
   yarn.scheduler.capacity.root.offline.maximum-capacity 
   20 
   offline队列可使用的资源上限. 

同步到其他节点

scp -rq etc/hadoop/capacity-scheduler.xml bigdata02:/data/soft/hadoop-3.2.0/etc/hadoop/
scp -rq etc/hadoop/capacity-scheduler.xml bigdata03:/data/soft/hadoop-3.2.0/etc/hadoop/

重启集群

sbin/stop-all.sh
sbin/start-all.sh

代码实现

package com.imooc.mr;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;



public class WordCountJobQueue {
    
    public static class MyMapper extends Mapper{
        // 使用Logger进行日志输出
        //Logger logger = LoggerFactory.getLogger(MyMapper.class);
        
        @Override
        protected void map(LongWritable k1,Text v1,Context context) throws IOException, InterruptedException {
            // 输出 k1 ,v1 的值
            //System.out.println("=<"+k1.get()+","+v1.toString()+">");
            //使用log 输出
           // logger.info("=<"+k1.get()+","+v1.toString()+">");
            String[] words = v1.toString().split(" ");
            // 迭代切割出来的单词数据
            for(String word:words){
                Text k2 = new Text(word);
                LongWritable v2 = new LongWritable(1L);
                //System.out.println("");
                //logger.info("");
                context.write(k2,v2);
            }
        }
    }
    
    public static class MyReduce extends Reducer{
        //Logger logger = LoggerFactory.getLogger(MyReduce.class);
        @Override
        protected void reduce(Text k2, Iterable v2s, Context context) throws IOException, InterruptedException {
            //创建一个sum变量,保存v2s的值
            long sum = 0L;
            for(LongWritable v2:v2s){
                sum +=v2.get();
            }
            // 组装k3,v3
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            // 输出k3,v3的值
            //System.out.println("=<"+k3.toString()+","+v3.get()+">");
            // 使用Logger 进行日志输出
            //logger.info("=<"+k3.toString()+","+v3.get()+">");
            // 把结果写出去
            context.write(k3,v3);


        }
    }
    
    public static void main(String[] args) {
        try {
            // 指定Job需要配置的参数
            Configuration conf = new Configuration();
            //解析命令行中通过-D传递过来的参数,添加到conf中
            String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            // 创建一个Job
            Job job = Job.getInstance(conf);
            //
            job.setJarByClass(WordCountJobQueue.class);
            //指定输入路径
            FileInputFormat.setInputPaths(job,new Path(remainingArgs[0]));
            // 指定输出路径
            FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));
            // 指定map 相关的代码
            job.setMapperClass(MyMapper.class);
            // 指定 K2的类型
            job.setMapOutputKeyClass(Text.class);
            // 指定v2的类型
            job.setMapOutputValueClass(LongWritable.class);
            // 指定reduce相关的代码
            job.setReducerClass(MyReduce.class);
            // 指定K3的类型
            job.setMapOutputKeyClass(Text.class);
            // 指定v3的类型
            job.setOutputValueClass(LongWritable.class);
            // 提交Job
            job.waitForCompletion(true);

        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

重新编译打包 执行命令 hadoop_demo-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJobQueue -Dmapreduce.job.queuename=offline /test/hello.txt /out_queue_offline

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5717499.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存