如何记录hadoop mapreduce运行日志

如何记录hadoop mapreduce运行日志,第1张

hadoop的日志主要是MapReduce程序,运行过程中,产生的一些数据日志,除了系统的日志外,还包含一些自己在测试时候,或者线上环境输出的日志,这部分日志通常会被放在userlogs这个文件夹下面,可以在mapred-site.xml里面配置运行日志的输出目录,测试文件内容如下:

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

<!-- jobtracker的master地址-->

<property>

<name>mapred.job.tracker</name>

<value>192.168.75.130:9001</value>

</property>

<property>

<!-- hadoop的日志输出指定目录-->

<name>mapred.local.dir</name>

<value>/root/hadoop1.2/mylogs</value>

</property>

</configuration>

配置好,日志目录后,我们就可以把这个配置文件,分发到各个节点上,然后启动hadoop。

下面我们看来下在eclipse环境中如何调试,散仙在setup,map和reduce方法中,分别使用System打印了一些数据,当我们使用local方式跑MR程序时候,日志并不会被记录下来,而是直接会在控制台打印,散仙的测试代码如下:

package com.qin.testdistributed

import java.io.File

import java.io.FileReader

import java.io.IOException

import java.net.URI

import java.util.Scanner

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.filecache.DistributedCache

import org.apache.hadoop.fs.FSDataInputStream

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.Path

import org.apache.hadoop.io.IntWritable

import org.apache.hadoop.io.LongWritable

import org.apache.hadoop.io.Text

import org.apache.hadoop.mapred.JobConf

import org.apache.hadoop.mapreduce.Job

import org.apache.hadoop.mapreduce.Mapper

import org.apache.hadoop.mapreduce.Reducer

import org.apache.hadoop.mapreduce.lib.db.DBConfiguration

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

import org.apache.log4j.pattern.LogEvent

import org.slf4j.Logger

import org.slf4j.LoggerFactory

import com.qin.operadb.WriteMapDB

/**

* 测试hadoop的全局共享文件

* 使用DistributedCached

*

* 大数据技术交流群: 37693216

* @author qindongliang

*

* ***/

public class TestDistributed {

private static Logger logger=LoggerFactory.getLogger(TestDistributed.class)

private static class FileMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

Path path[]=null

/**

* Map函数前调用

*

* */

@Override

protected void setup(Context context)

throws IOException, InterruptedException {

logger.info("开始启动setup了哈哈哈哈")

// System.out.println("运行了.........")

Configuration conf=context.getConfiguration()

path=DistributedCache.getLocalCacheFiles(conf)

System.out.println("获取的路径是:  "+path[0].toString())

//  FileSystem fs = FileSystem.get(conf)

FileSystem fsopen= FileSystem.getLocal(conf)

// FSDataInputStream in = fsopen.open(path[0])

// System.out.println(in.readLine())

//       for(Path tmpRefPath : path) {

//           if(tmpRefPath.toString().indexOf("ref.png") != -1) {

//               in = reffs.open(tmpRefPath)

//               break

//           }

//       }

// FileReader reader=new FileReader("file://"+path[0].toString())

//      File f=new File("file://"+path[0].toString())

// FSDataInputStream in=fs.open(new Path(path[0].toString()))

//     Scanner scan=new Scanner(in)

//       while(scan.hasNext()){

//       System.out.println(Thread.currentThread().getName()+"扫描的内容:  "+scan.next())

//       }

//       scan.close()

//

// System.out.println("size: "+path.length)

}

@Override

protected void map(LongWritable key, Text value,Context context)

throws IOException, InterruptedException {

// System.out.println("map    aaa")

//logger.info("Map里的任务")

System.out.println("map里输出了")

// logger.info()

context.write(new Text(""), new IntWritable(0))

}

@Override

protected void cleanup(Context context)

throws IOException, InterruptedException {

logger.info("清空任务了。。。。。。")

}

}

private static class  FileReduce extends Reducer<Object, Object, Object, Object>{

@Override

protected void reduce(Object arg0, Iterable<Object> arg1,

Context arg2)throws IOException, InterruptedException {

System.out.println("我是reduce里面的东西")

}

}

public static void main(String[] args)throws Exception {

JobConf conf=new JobConf(TestDistributed.class)

//conf.set("mapred.local.dir", "/root/hadoop")

//Configuration conf=new Configuration()

// conf.set("mapred.job.tracker","192.168.75.130:9001")

//读取person中的数据字段

//conf.setJar("tt.jar")

//注意这行代码放在最前面,进行初始化,否则会报

String inputPath="hdfs://192.168.75.130:9000/root/input"

String outputPath="hdfs://192.168.75.130:9000/root/outputsort"

Job job=new Job(conf, "a")

DistributedCache.addCacheFile(new URI("hdfs://192.168.75.130:9000/root/input/f1.txt"), job.getConfiguration())

job.setJarByClass(TestDistributed.class)

System.out.println("运行模式:  "+conf.get("mapred.job.tracker"))

/**设置输出表的的信息  第一个参数是job任务,第二个参数是表名,第三个参数字段项**/

FileSystem fs=FileSystem.get(job.getConfiguration())

Path pout=new Path(outputPath)

if(fs.exists(pout)){

fs.delete(pout, true)

System.out.println("存在此路径, 已经删除......")

}

/**设置Map类**/

// job.setOutputKeyClass(Text.class)

//job.setOutputKeyClass(IntWritable.class)

job.setMapOutputKeyClass(Text.class)

job.setMapOutputValueClass(IntWritable.class)

job.setMapperClass(FileMapper.class)

job.setReducerClass(FileReduce.class)

FileInputFormat.setInputPaths(job, new Path(inputPath))  //输入路径

FileOutputFormat.setOutputPath(job, new Path(outputPath))//输出路径

System.exit(job.waitForCompletion(true) ? 0 : 1)

}

}

线上配合Zookeeper做了HA,所以可以平滑重启,直接使用以下命令先重启standby的NameNode,在将standby切换为active,在重启另一台NameNode就可以了。

# 先重启standby

cd ${HADOOP_HOME}/sbin

./hadoop-daemon.sh stop namenode

./hadoop-daemon.sh start namenode

# 切换active节点,这里的nn2将被切换为active节点

hdfs haadmin -failover <nn1><nn2>


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/7880145.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-11
下一篇 2023-04-11

发表评论

登录后才能评论

评论列表(0条)

保存