MR任务读取HFile

MR任务读取HFile,第1张

MR任务读取HFile

近期需要开发个功能,通过MR批量读取HFile文件,并以Result的格式写入到其他存储系统中。过程中遇到了一些坑,在此记录下。

前期调研

基于MR的Hbase数据读取,常用的是基于Hbase官方的TableMapper实现。但是这里需要读取HFile文件,且不想经过Hbase(BulkLoad场景下,读取增量数据,但不想全扫描Hbase表),那这个方案就不合适啦。

整理流程:

1、遍历输入HDFS制定路径,筛选HFile文件绝对路径作为MR任务的输入

2、Mapper阶段,将HFile文件解析为单条的cell数据,并将作为map阶段的输出

3、根据rowkey聚合,按rowkey组合为Hbase Result对象(单行单列的Cell转换为单行多列的Result),写入ES(聚合为Result是为了防止以cell方式写入ES覆盖的问题)

1. 自定义inputformat实现

1.1 继承FileInputFormat

public class HFileInputFormat extends FileInputFormat {

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        return new HFileRecordReader((FileSplit) split, context
                .getConfiguration());
    }
}

1.2 实现RecordReader,继承org.apache.hadoop.mapreduce.RecordReader

public class HFileRecordReader extends RecordReader{

    private HFile.Reader reader;
    private final HFileScanner scanner;
    private long entryNumber = 0L;

    public HFileRecordReader(FileSplit split, Configuration conf)
            throws IOException {
        final Path path = split.getPath();
        reader = HFile.createReader(FileSystem.get(conf), path , conf);
        HFile.FileInfo fi = (HFile.FileInfo) reader.loadFileInfo();
        scanner = reader.getScanner(false, false);
        System.out.println(fi.size());
        scanner.seekTo();
    }

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        entryNumber++;
        return scanner.next();
    }

    @Override
    public Object getCurrentKey() throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return new ImmutableBytesWritable(CellUtil.cloneRow((KeyValue)scanner.getCell()));
    }

    @Override
    public Object getCurrentValue() throws IOException, InterruptedException {
        return (KeyValue)scanner.getCell();
    }

    
    @Override
    public float getProgress() throws IOException, InterruptedException {
        if (reader != null) {
            return (entryNumber / reader.getEntries());
        }
        return 1;
    }
    
    
    @Override
    public void close() throws IOException {
        if (reader != null) {
            reader.close();
        }
    }
}
2. cell无法作为map阶段的输出,需要实现Writeable接口
public class HFileRecord implements Writable {
    private byte operate;
    private long timestamp;
    private String row;
    private String family;
    private String qualifier;
    private String value;

    public HFileRecord () {}

    public HFileRecord (byte operate, long timestamp, String row, String family, String qualifier, String value) {
        this.operate = operate;
        this.timestamp = timestamp;
        this.row = row;
        this.family = family;
        this.qualifier = qualifier;
        this.value =value;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeByte(operate);
        out.writeLong(timestamp);
        out.writeUTF(row);
        out.writeUTF(family);
        out.writeUTF(qualifier);
        out.writeUTF(value);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.operate = in.readByte();
        this.timestamp = in.readLong();
        this.row = in.readUTF();
        this.family = in.readUTF();
        this.qualifier = in.readUTF();
        this.value = in.readUTF();
    }

    public byte getOperate() {
        return this.operate;
    }

    public long getTimestamp() {
        return this.timestamp;
    }

    public String getRow() {
        return this.row;
    }

    public String getFamily() {
        return this.family;
    }

    public String getQualifier() {
        return this.qualifier;
    }

    public String getValue() {
        return this.value;
    }
}
3、编写mapper阶段逻辑
public class HFileReadMapper extends Mapper {

    private static final Log LOG = LogFactory.getLog(HFileReadMapper.class);
    String outKey = null;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        context.getCounter("Convert", "mapper").increment(1);
    }

    @Override
    protected void map(ImmutableBytesWritable key, Cell cell, Context context) throws IOException, InterruptedException {
        HFileRecord hFileRecord = new HFileRecord(cell.getTypeByte(),
                cell.getTimestamp(),
                new String(CellUtil.cloneRow(cell)),
                new String(CellUtil.cloneFamily(cell)),
                new String(CellUtil.cloneQualifier(cell)),
                new String(CellUtil.clonevalue(cell)));
        context.write(new Text(CellUtil.cloneRow(cell)), hFileRecord);
    }
}
4、编写reducer阶段逻辑
public class HFileReadReducer extends Reducer {
    private static final Log LOG = LogFactory.getLog(HFileReadMapper.class);
    Result result;
    List cellAll = Lists.newArrayListWithCapacity(10);

    @Override
    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
        super.setup(context);
    }

    @Override
    protected void reduce(Text text, Iterable cellList, Context context) throws IOException, InterruptedException {
        for (HFileRecord hr : cellList) {
            cellAll.add(CellUtil.createCell(hr.getRow().getBytes(),
                    hr.getFamily().getBytes(),
                    hr.getQualifier().getBytes(),
                    hr.getTimestamp(),
                    hr.getOperate(),
                    hr.getValue().getBytes()));
        }
        cellAll.sort(CellComparator.getInstance());
        result = Result.create(cellAll);
        System.out.println("==============Get Value Start================");
        for (int i=0; i<10; i++) {
            String field = "field" + i;
            byte[] bytei = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes(field));
            if (null != bytei) {
                System.out.println("----" + field + " is : " + new String(bytei));
            }
        }
        System.out.println("==============Get Value End================");
        cellAll.clear();
    }
}
5、mr任务相关设置
public class HFileToIndexer extends Configured implements Tool {

    private static final Logger LOG = LogManager.getLogger(HFileToIndexer.class);

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new HFileToIndexer(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception {
        try {
            Configuration conf = HbaseConfiguration.create();

            // create job
            Job job = Job.getInstance(conf, "HFileToIndexer: Scan HFiles to Indexer");
            job.setJarByClass(HFileToIndexer.class);
            job.setInputFormatClass(HFileInputFormat.class);

            job.setMapperClass(HFileReadMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(HFileRecord.class);

            job.setReducerClass(HFileReadReducer.class);

            // Run a mapper-only MR job that sends index documents directly to a live ES instance.
            job.setOutputFormatClass(NullOutputFormat.class);
            // 设置reduce为0,即没有reduce步骤
            // job.setNumReduceTasks(0);

            // HDFS遍历指定路径,并添加到 inputPath 中
            String hfilePath = args[0];
            try {
                FileSystem fs = FileSystem.get(conf);
                RemoteIterator lt = fs.listFiles(new Path(hfilePath), true);
                while (lt.hasNext()) {
                    LocatedFileStatus file = lt.next();
                    if(file.isFile() && file.getPath().getName().length() == 32) {
                        LOG.info("file name is :" + file.getPath().getName());
                        FileInputFormat.addInputPath(job, new Path(file.getPath().toString()));
                    }
                }
            } catch (IOException ioe) {
                LOG.error(ioe.getMessage());
            }

            if (!job.waitForCompletion(true)) {
                LOG.error("Failure");
            } else {
                LOG.info("Success");
                return 0;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return 1;
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5715992.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存