-
如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即Hbase提供的HFileOutputFormat类。
-
它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
-
仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。
-
Hbase集群与Hadoop集群为同一集群,即Hbase所基于的HDFS为生成HFile的MR的集群
- 生成HFile部分
package day53; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class BulkLoadingTest { public static class BLMapper extends Mapper说明{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); String mdn = split[0]; String start_time = split[1]; // 经度 String longitude = split[4]; // 维度 String latitude = split[5]; String rowkey = mdn + "_" + start_time; KeyValue lg = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lg".getBytes(), longitude.getBytes()); KeyValue lt = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lt".getBytes(), latitude.getBytes()); context.write(new ImmutableBytesWritable(rowkey.getBytes()),lg); context.write(new ImmutableBytesWritable(rowkey.getBytes()),lt); } } public static void main(String[] args) throws Exception { Configuration conf = HbaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181"); // job Job job = Job.getInstance(conf); job.setJarByClass(BulkLoadingTest.class); job.setJobName("BulkLoading"); // 设置reduce个数 job.setNumReduceTasks(2); // 配置map任务 job.setMapperClass(BLMapper.class); // 一共需要两个地方有序,每个reduce有序,reduce task里的数据也要有序 // 保证全局有序,即两个reduce task之间有序 // 需要在shuffer中的partition-sort阶段,进行排序分组 // 这里使用预设好的class job.setPartitionerClass(SimpleTotalOrderPartitioner.class); // 配置reduce任务 // KeyValueSortReducer 保证在每个Reduce内部有序 job.setReducerClass(KeyValueSortReducer.class); // 输入输出路径 FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path("/data/DIANXIN.csv")); FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("/data/hfile")); // 将reduce输出的数据格式化为HFile // 需要用到表的元数据,来格式化数据,匹配列簇等等 Connection conn = ConnectionFactory.createConnection(conf); Table dianxin_bulk = conn.getTable(TableName.valueOf("dianxin_bulk")); // 获取dianxin_bulk表region定位器,表可能存储分块存储于不同region RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf("dianxin_bulk")); // 使用HFileOutputFormat2将输出的数据按照HFile的形式格式化 HFileOutputFormat2.configureIncrementalLoad(job,dianxin_bulk,regionLocator); // 等到MapReduce任务执行完成 job.waitForCompletion(true); // 接下来将hfile写至hbase表中 LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); load.doBulkLoad(new Path("/data/hfile"), conn.getAdmin(),dianxin_bulk ,regionLocator); } }
- 最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
- 最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。
- MR例子中HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到Hbase的时候,作为一个整体的Region,key是绝对有序的。
- MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库Hbase,相当于move HFile到Hbase的Region中,HFile子目录的列族内容没有了,但不能直接使用mv命令移动,因为直接移动不能更新Hbase的元数据。
- HFile入库到Hbase通过Hbase中 LoadIncrementalHFiles的doBulkLoad方法,对生成的HFile文件入库
总结
1、put 添加的是一整条信息(按照rowkey)
2、keyvalue 添加的是一条cell
3、hfile 输出到 hdfs 的路径必须得是一个不存在的路径,系统会生成该文件夹
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)