近期需要开发个功能,通过MR批量读取HFile文件,并以Result的格式写入到其他存储系统中。过程中遇到了一些坑,在此记录下。
整理流程:前期调研
基于MR的Hbase数据读取,常用的是基于Hbase官方的TableMapper实现。但是这里需要读取HFile文件,且不想经过Hbase(BulkLoad场景下,读取增量数据,但不想全扫描Hbase表),那这个方案就不合适啦。
1、遍历输入HDFS制定路径,筛选HFile文件绝对路径作为MR任务的输入
2、Mapper阶段,将HFile文件解析为单条的cell数据,并将
3、根据rowkey聚合,按rowkey组合为Hbase Result对象(单行单列的Cell转换为单行多列的Result),写入ES(聚合为Result是为了防止以cell方式写入ES覆盖的问题)
1. 自定义inputformat实现1.1 继承FileInputFormat
public class HFileInputFormat extends FileInputFormat{ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new HFileRecordReader((FileSplit) split, context .getConfiguration()); } }
1.2 实现RecordReader,继承org.apache.hadoop.mapreduce.RecordReader
public class HFileRecordReader2. cell无法作为map阶段的输出,需要实现Writeable接口extends RecordReader{ private HFile.Reader reader; private final HFileScanner scanner; private long entryNumber = 0L; public HFileRecordReader(FileSplit split, Configuration conf) throws IOException { final Path path = split.getPath(); reader = HFile.createReader(FileSystem.get(conf), path , conf); HFile.FileInfo fi = (HFile.FileInfo) reader.loadFileInfo(); scanner = reader.getScanner(false, false); System.out.println(fi.size()); scanner.seekTo(); } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { } @Override public boolean nextKeyValue() throws IOException, InterruptedException { entryNumber++; return scanner.next(); } @Override public Object getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return new ImmutableBytesWritable(CellUtil.cloneRow((KeyValue)scanner.getCell())); } @Override public Object getCurrentValue() throws IOException, InterruptedException { return (KeyValue)scanner.getCell(); } @Override public float getProgress() throws IOException, InterruptedException { if (reader != null) { return (entryNumber / reader.getEntries()); } return 1; } @Override public void close() throws IOException { if (reader != null) { reader.close(); } } }
public class HFileRecord implements Writable { private byte operate; private long timestamp; private String row; private String family; private String qualifier; private String value; public HFileRecord () {} public HFileRecord (byte operate, long timestamp, String row, String family, String qualifier, String value) { this.operate = operate; this.timestamp = timestamp; this.row = row; this.family = family; this.qualifier = qualifier; this.value =value; } @Override public void write(DataOutput out) throws IOException { out.writeByte(operate); out.writeLong(timestamp); out.writeUTF(row); out.writeUTF(family); out.writeUTF(qualifier); out.writeUTF(value); } @Override public void readFields(DataInput in) throws IOException { this.operate = in.readByte(); this.timestamp = in.readLong(); this.row = in.readUTF(); this.family = in.readUTF(); this.qualifier = in.readUTF(); this.value = in.readUTF(); } public byte getOperate() { return this.operate; } public long getTimestamp() { return this.timestamp; } public String getRow() { return this.row; } public String getFamily() { return this.family; } public String getQualifier() { return this.qualifier; } public String getValue() { return this.value; } }3、编写mapper阶段逻辑
public class HFileReadMapper extends Mapper4、编写reducer阶段逻辑{ private static final Log LOG = LogFactory.getLog(HFileReadMapper.class); String outKey = null; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); context.getCounter("Convert", "mapper").increment(1); } @Override protected void map(ImmutableBytesWritable key, Cell cell, Context context) throws IOException, InterruptedException { HFileRecord hFileRecord = new HFileRecord(cell.getTypeByte(), cell.getTimestamp(), new String(CellUtil.cloneRow(cell)), new String(CellUtil.cloneFamily(cell)), new String(CellUtil.cloneQualifier(cell)), new String(CellUtil.clonevalue(cell))); context.write(new Text(CellUtil.cloneRow(cell)), hFileRecord); } }
public class HFileReadReducer extends Reducer5、mr任务相关设置{ private static final Log LOG = LogFactory.getLog(HFileReadMapper.class); Result result; List cellAll = Lists.newArrayListWithCapacity(10); @Override protected void setup(Reducer.Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void reduce(Text text, Iterable | cellList, Context context) throws IOException, InterruptedException { for (HFileRecord hr : cellList) { cellAll.add(CellUtil.createCell(hr.getRow().getBytes(), hr.getFamily().getBytes(), hr.getQualifier().getBytes(), hr.getTimestamp(), hr.getOperate(), hr.getValue().getBytes())); } cellAll.sort(CellComparator.getInstance()); result = Result.create(cellAll); System.out.println("==============Get Value Start================"); for (int i=0; i<10; i++) { String field = "field" + i; byte[] bytei = result.getValue(Bytes.toBytes("f1"), Bytes.toBytes(field)); if (null != bytei) { System.out.println("----" + field + " is : " + new String(bytei)); } } System.out.println("==============Get Value End================"); cellAll.clear(); } }
public class HFileToIndexer extends Configured implements Tool { private static final Logger LOG = LogManager.getLogger(HFileToIndexer.class); public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new HFileToIndexer(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { try { Configuration conf = HbaseConfiguration.create(); // create job Job job = Job.getInstance(conf, "HFileToIndexer: Scan HFiles to Indexer"); job.setJarByClass(HFileToIndexer.class); job.setInputFormatClass(HFileInputFormat.class); job.setMapperClass(HFileReadMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(HFileRecord.class); job.setReducerClass(HFileReadReducer.class); // Run a mapper-only MR job that sends index documents directly to a live ES instance. job.setOutputFormatClass(NullOutputFormat.class); // 设置reduce为0,即没有reduce步骤 // job.setNumReduceTasks(0); // HDFS遍历指定路径,并添加到 inputPath 中 String hfilePath = args[0]; try { FileSystem fs = FileSystem.get(conf); RemoteIteratorlt = fs.listFiles(new Path(hfilePath), true); while (lt.hasNext()) { LocatedFileStatus file = lt.next(); if(file.isFile() && file.getPath().getName().length() == 32) { LOG.info("file name is :" + file.getPath().getName()); FileInputFormat.addInputPath(job, new Path(file.getPath().toString())); } } } catch (IOException ioe) { LOG.error(ioe.getMessage()); } if (!job.waitForCompletion(true)) { LOG.error("Failure"); } else { LOG.info("Success"); return 0; } } catch (Exception e) { e.printStackTrace(); } return 1; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)