自定义Hbase-MapReDuce2——从Hbase表中读取数据,经过MR,再返回到一个新的Hbase表中
文章目录1)需求分析2)编写Mapper3)编写Reducer4)编写Driver5)本地连接Hbase6)检验
1)需求分析
从fruit表中抽出info:name信息,放入fruit2表中
package com.yingzi.mr2; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class Fruit2Mapper extends TableMapper3)编写Reducer{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //构建Put对象 Put put = new Put(key.get()); //1.获取数据 for (Cell cell : value.rawCells()) { //2.判断当前的cell是否为“name”列 if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //3.给Put对象赋值 put.add(cell); } } //4.写出 context.write(key,put); } }
package com.yingzi.mr2; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import java.io.IOException; public class Fruit2Reducer extends TableReducer4)编写Driver{ @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { //遍历写出 for (Put value : values) { context.write(NullWritable.get(),value); } } }
package com.yingzi.mr2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Fruit2Driver implements Tool { //定义配置信息 Configuration configuration = null; @Override public int run(String[] strings) throws Exception { //1.获取Job对象 Job job = Job.getInstance(configuration); //2.设置主类路径 job.setJarByClass(Fruit2Driver.class); //3.设置Mapper&输出KV类型 TableMapReduceUtil.initTableMapperJob("fruit", new Scan(), Fruit2Mapper.class, ImmutableBytesWritable.class, Put.class, job); //4.设置Reducer&输出的表 TableMapReduceUtil.initTableReducerJob("fruit2", Fruit2Reducer.class, job); //5.提交任务 boolean result = job.waitForCompletion(true); return result?0:1; } @Override public void setConf(Configuration configuration) { this.configuration = configuration; } @Override public Configuration getConf() { return configuration; } public static void main(String[] args) { try { Configuration configuration = new Configuration(); ToolRunner.run(configuration,new Fruit2Driver(),args); } catch (Exception e) { e.printStackTrace(); } } }5)本地连接Hbase
在resources目录下配置hbase-site.xml文件
内容为虚拟机上的/opt/module/hbase-1.3.1/conf/hbase-site.xml
6)检验运行Driver程序,查看“fruit2”表信息如下:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)