package com.cn.demo_outputformat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class MyOutPutFormat extends FileOutputFormat{ @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataOutputStream fsDataOutputStream01 = fs.create(new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\good_Comment\1.txt")); FSDataOutputStream fsDataOutputStream02 = fs.create(new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\bad_Comment\1.txt")); MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream01,fsDataOutputStream02); return myRecordWriter; } }
package com.cn.demo_outputformat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class MyRecordWriter extends RecordWriter{ private FSDataOutputStream fsDataOutputStream01; private FSDataOutputStream fsDataOutputStream02; public MyRecordWriter() { } //带参构造方法接收流 public MyRecordWriter(FSDataOutputStream fsDataOutputStream01,FSDataOutputStream fsDataOutputStream02) { this.fsDataOutputStream01 = fsDataOutputStream01; this.fsDataOutputStream02 = fsDataOutputStream02; } @Override public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException { //K2 V2判断好评差评 String[] splits = text.toString().split("t"); if(Integer.parseInt(splits[9])<=1){ //利用流的写出输出文件 fsDataOutputStream01.write(text.toString().getBytes()); fsDataOutputStream01.write("rn".getBytes()); }else{ //利用流的写出输出文件 fsDataOutputStream02.write(text.toString().getBytes()); fsDataOutputStream01.write("rn".getBytes()); } //关闭流 IOUtils.closeStream(fsDataOutputStream01); IOUtils.closeStream(fsDataOutputStream02); } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { } }
--------------------------------------------主程序--------------------------------------
job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setOutputFormatClass(MyOutPutFormat.class); MyOutPutFormat.setOutputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop、大数据离线第五天、大数据离线第五天\自定义outputformat\output"));
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)