hadoop的日志主要是MapReduce程序,运行过程中,产生的一些数据日志,除了系统的日志外,还包含一些自己在测试时候,或者线上环境输出的日志,这部分日志通常会被放在userlogs这个文件夹下面,可以在mapred-site.xml里面配置运行日志的输出目录,测试文件内容如下:
<?xml version="1.0"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<!-- jobtracker的master地址-->
<property>
<name>mapred.job.tracker</name>
<value>192.168.75.130:9001</value>
</property>
<property>
<!-- hadoop的日志输出指定目录-->
<name>mapred.local.dir</name>
<value>/root/hadoop1.2/mylogs</value>
</property>
</configuration>
配置好,日志目录后,我们就可以把这个配置文件,分发到各个节点上,然后启动hadoop。
下面我们看来下在eclipse环境中如何调试,散仙在setup,map和reduce方法中,分别使用System打印了一些数据,当我们使用local方式跑MR程序时候,日志并不会被记录下来,而是直接会在控制台打印,散仙的测试代码如下:
package com.qin.testdistributed
import java.io.File
import java.io.FileReader
import java.io.IOException
import java.net.URI
import java.util.Scanner
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.filecache.DistributedCache
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.log4j.pattern.LogEvent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import com.qin.operadb.WriteMapDB
/**
* 测试hadoop的全局共享文件
* 使用DistributedCached
*
* 大数据技术交流群: 37693216
* @author qindongliang
*
* ***/
public class TestDistributed {
private static Logger logger=LoggerFactory.getLogger(TestDistributed.class)
private static class FileMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Path path[]=null
/**
* Map函数前调用
*
* */
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
logger.info("开始启动setup了哈哈哈哈")
// System.out.println("运行了.........")
Configuration conf=context.getConfiguration()
path=DistributedCache.getLocalCacheFiles(conf)
System.out.println("获取的路径是: "+path[0].toString())
// FileSystem fs = FileSystem.get(conf)
FileSystem fsopen= FileSystem.getLocal(conf)
// FSDataInputStream in = fsopen.open(path[0])
// System.out.println(in.readLine())
// for(Path tmpRefPath : path) {
// if(tmpRefPath.toString().indexOf("ref.png") != -1) {
// in = reffs.open(tmpRefPath)
// break
// }
// }
// FileReader reader=new FileReader("file://"+path[0].toString())
// File f=new File("file://"+path[0].toString())
// FSDataInputStream in=fs.open(new Path(path[0].toString()))
// Scanner scan=new Scanner(in)
// while(scan.hasNext()){
// System.out.println(Thread.currentThread().getName()+"扫描的内容: "+scan.next())
// }
// scan.close()
//
// System.out.println("size: "+path.length)
}
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// System.out.println("map aaa")
//logger.info("Map里的任务")
System.out.println("map里输出了")
// logger.info()
context.write(new Text(""), new IntWritable(0))
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
logger.info("清空任务了。。。。。。")
}
}
private static class FileReduce extends Reducer<Object, Object, Object, Object>{
@Override
protected void reduce(Object arg0, Iterable<Object> arg1,
Context arg2)throws IOException, InterruptedException {
System.out.println("我是reduce里面的东西")
}
}
public static void main(String[] args)throws Exception {
JobConf conf=new JobConf(TestDistributed.class)
//conf.set("mapred.local.dir", "/root/hadoop")
//Configuration conf=new Configuration()
// conf.set("mapred.job.tracker","192.168.75.130:9001")
//读取person中的数据字段
//conf.setJar("tt.jar")
//注意这行代码放在最前面,进行初始化,否则会报
String inputPath="hdfs://192.168.75.130:9000/root/input"
String outputPath="hdfs://192.168.75.130:9000/root/outputsort"
Job job=new Job(conf, "a")
DistributedCache.addCacheFile(new URI("hdfs://192.168.75.130:9000/root/input/f1.txt"), job.getConfiguration())
job.setJarByClass(TestDistributed.class)
System.out.println("运行模式: "+conf.get("mapred.job.tracker"))
/**设置输出表的的信息 第一个参数是job任务,第二个参数是表名,第三个参数字段项**/
FileSystem fs=FileSystem.get(job.getConfiguration())
Path pout=new Path(outputPath)
if(fs.exists(pout)){
fs.delete(pout, true)
System.out.println("存在此路径, 已经删除......")
}
/**设置Map类**/
// job.setOutputKeyClass(Text.class)
//job.setOutputKeyClass(IntWritable.class)
job.setMapOutputKeyClass(Text.class)
job.setMapOutputValueClass(IntWritable.class)
job.setMapperClass(FileMapper.class)
job.setReducerClass(FileReduce.class)
FileInputFormat.setInputPaths(job, new Path(inputPath)) //输入路径
FileOutputFormat.setOutputPath(job, new Path(outputPath))//输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1)
}
}
线上配合Zookeeper做了HA,所以可以平滑重启,直接使用以下命令先重启standby的NameNode,在将standby切换为active,在重启另一台NameNode就可以了。# 先重启standby
cd ${HADOOP_HOME}/sbin
./hadoop-daemon.sh stop namenode
./hadoop-daemon.sh start namenode
# 切换active节点,这里的nn2将被切换为active节点
hdfs haadmin -failover <nn1><nn2>
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)