自定义OutputFormat

自定义OutputFormat,第1张

自定义OutputFormat
package com.cn.demo_outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MyOutPutFormat extends FileOutputFormat {
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataOutputStream fsDataOutputStream01 = fs.create(new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\good_Comment\1.txt"));
        FSDataOutputStream fsDataOutputStream02 = fs.create(new Path("file:///D:\dsj\baishi课件\hadoop\5、大数据离线第五天\5、大数据离线第五天\自定义outputformat\bad_Comment\1.txt"));

        MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream01,fsDataOutputStream02);
        return myRecordWriter;
    }
}

package com.cn.demo_outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class MyRecordWriter extends RecordWriter {
    private FSDataOutputStream fsDataOutputStream01;
    private FSDataOutputStream fsDataOutputStream02;
    public MyRecordWriter() {
    }
    
    //带参构造方法接收流
    public MyRecordWriter(FSDataOutputStream fsDataOutputStream01,FSDataOutputStream fsDataOutputStream02) {
        this.fsDataOutputStream01 = fsDataOutputStream01;
        this.fsDataOutputStream02 = fsDataOutputStream02;
    }
    @Override
    public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
        //K2 V2判断好评差评
        String[] splits = text.toString().split("t");
        if(Integer.parseInt(splits[9])<=1){
            //利用流的写出输出文件
            fsDataOutputStream01.write(text.toString().getBytes());
            fsDataOutputStream01.write("rn".getBytes());
        }else{
            //利用流的写出输出文件
            fsDataOutputStream02.write(text.toString().getBytes());
            fsDataOutputStream01.write("rn".getBytes());
        }
        
        //关闭流
        IOUtils.closeStream(fsDataOutputStream01);
        IOUtils.closeStream(fsDataOutputStream02);

    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

    }
}

--------------------------------------------主程序--------------------------------------

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

job.setOutputFormatClass(MyOutPutFormat.class);
MyOutPutFormat.setOutputPath(job,new Path("file:///D:\dsj\baishi课件\hadoop、大数据离线第五天、大数据离线第五天\自定义outputformat\output"));

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5624252.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存