Yarn中的资源调度

Yarn中的资源调度,第1张

Yarn中的资源调度

文章目录

三种调度策略YARN多资源队列配置和使用资源队列配置Java代码参考文献

三种调度策略


从左到右依次为FIFO Scheduler、Capacity Scheduler、Fair Scheduler策略,下面对这三种策略进行分别介绍

FIFO Scheduler:先进先出(first in, first out)调度策略
任务依次进行,前面的任务执行结束后才能释放资源,这种在有些时候是不合理的,因为有一些任务的优先级比较高,我们希望任务提交上去立刻就开始执行,这个就实现不了了。

Capacity Scheduler是FifoScheduler的多队列版本。我们先把集群中的整块资源划分成多份,我们可以人为的给这些资源定义使用场景,例如图里面的queue A里面运行普通的任务,queueB中运行优先级比较高的任务。这两个队列的资源是相互对立的。
【注意】队列内部还是按照先进先出的规则

FairScheduler:多队列,多用户共享资源。每个队列可以配置一定的资源,每个队列中的任务共享其所在队列的所有资源,不需要排队等待资源。假设我们向一个队列中提交了一个任务1,这个任务1刚开始会占用整个队列的资源,但是当你提交任务2的时候,任务1会把它的资源释放出来一部分给任务2使用

在实际工作中我们一般都是使用CapacityScheduler,从hadoop2开始,CapacityScheduler是集群中的默认调度器了

YARN多资源队列配置和使用

需求:希望增加2个队列,一个是online队列(运行实时任务),一个是offline队列(运行离线任务)
然后向offline队列中提交一个MapReduce任务。
第一步
修改集群中etc/hadoop目录下的capacity-scheduler.xml配置文件。修改和增加以下参数,针对已有的参数,修改value中的值,针对没有的参数,则直接增加。这里的default是需要保留的,增加online,offline,这三个队列的资源比例为7:1:2,具体的比例需要根据实际的业务需求来,看你们那些类型的任务比较多,对应的队列中资源比例就调高一些。

[root@bigdata01 hadoop]# vi capacity-scheduler.xml
  
    yarn.scheduler.capacity.root.queues
    default,online,offline
    队列列表,多个队列之间使用逗号分割
  
  
    yarn.scheduler.capacity.root.default.capacity
    70
    default队列70%
  
  
    yarn.scheduler.capacity.root.online.capacity
    10
    online队列10%
  
  
    yarn.scheduler.capacity.root.offline.capacity
    20
    offline队列20%
  
  
    yarn.scheduler.capacity.root.default.maximum-capacity
    70
    Default队列可使用的资源上限.
  
  
    yarn.scheduler.capacity.root.online.maximum-capacity
    10
    online队列可使用的资源上限.
  
  
    yarn.scheduler.capacity.root.offline.maximum-capacity
    20
    offline队列可使用的资源上限.
  

同步集群节点

[root@bigdata01 hadoop]# scp -rq capacity-scheduler.xml bigdata02:/data/soft/hadoop-3.2.0/etc/hadoop/
[root@bigdata01 hadoop]# scp -rq capacity-scheduler.xml bigdata03:/data/soft/hadoop-3.2.0/etc/hadoop/

重启集群

[root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh 
[root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh 

第二步
在job中添加一行代码

   //解析命令行中-D后面传递过来的参数,添加到conf中
 String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

执行

[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.bigdata.mr.WordCountJobQueue -Dmapreduce.job.queuename=offline /test/hello.txt /outqueue
资源队列配置Java代码
public class WordCountJobQueue {
    
    public static class MyMapper extends Mapper{
        Logger logger = LoggerFactory.getLogger(MyMapper.class);
        
        @Override
        protected void map(LongWritable k1, Text v1, Context context)
                throws IOException, InterruptedException {
            //输出k1,v1的值          
            //k1 代表的是每一行数据的行首偏移量,v1代表的是每一行内容
            //对获取到的每一行数据进行切割,把单词切割出来
            String[] words = v1.toString().split(" ");
            //迭代切割出来的单词数据
            for (String word : words) {
                //把迭代出来的单词封装成的形式
                Text k2 = new Text(word);
                LongWritable v2 = new LongWritable(1L);
                //把写出去
                context.write(k2,v2);
            }
        }
    }


    
    public static class MyReducer extends Reducer{
        Logger logger = LoggerFactory.getLogger(MyReducer.class);
        
        @Override
        protected void reduce(Text k2, Iterable v2s, Context context)
                throws IOException, InterruptedException {
            //创建一个sum变量,保存v2s的和
            long sum = 0L;
            //对v2s中的数据进行累加求和
            for(LongWritable v2: v2s){
                //输出k2,v2的值               
                sum += v2.get();
            }

            //组装k3,v3
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            //输出k3,v3的值          
            // 把结果写出去
            context.write(k3,v3);
        }
    }

    
    public static void main(String[] args) {
        try{

            //指定Job需要的配置参数
            Configuration conf = new Configuration();
            //解析命令行中-D后面传递过来的参数,添加到conf中
            String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            //创建一个Job
            Job job = Job.getInstance(conf);
            //注意,这一行必须设置,否则在集群中执行的时候是找不到WordCountJob这个类的
            job.setJarByClass(WordCountJobQueue.class);            
            //指定输入路径(可以是文件,也可以是目录)
            FileInputFormat.setInputPaths(job,new Path(remainingArgs[0]));
            //指定输出路径(只能指定一个不存在的目录)
            FileOutputFormat.setOutputPath(job,new Path(remainingArgs[1]));
            
            //指定map相关的代码
            job.setMapperClass(MyMapper.class);
            //指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            //指定v2的类型
            job.setMapOutputValueClass(LongWritable.class);            

            //指定reduce相关的代码
            job.setReducerClass(MyReducer.class);
            //指定k3的类型
            job.setOutputKeyClass(Text.class);
            //指定v3的类型
            job.setOutputValueClass(LongWritable.class);

            //提交job
            job.waitForCompletion(true);
        }catch(Exception e){
            e.printStackTrace();
        }

    }
}
参考文献

https://www.imooc.com/wiki/BigData:慕课网《大数据开发工程师体系课程》

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5700703.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存