package com.chris.hbase.mr2; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class Fruit2Mapper extends TableMapper2.构建 Fruit2Reducer 类,用于将读取到的 fruit 表中的数据写入到 fruit2 表中{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // 创建put对象 Put put = new Put(key.get()); //1.获取数据 for (Cell cell : value.rawCells()) { //2.判断当前的cell是否为 name 列 if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //3.给put对象赋值 put.add(cell); } } //4.写出 context.write(key,put); } }
package com.chris.hbase.mr2; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class Fruit2Reducer extends TableReducer3. 构建Driver类驱动{ @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { // 遍历写出 for (Put put : values) { context.write(NullWritable.get(),put); } } }
package com.chris.hbase.mr2; import com.chris.hbase.mr1.FruitDriver; import com.chris.hbase.mr1.FruitMapper; import com.chris.hbase.mr1.FruitReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import javax.management.ImmutableDescriptor; public class Fruit2Driver implements Tool { // 定义一个configuration private Configuration configuration = null; @Override public int run(String[] args) throws Exception { // 1. 获取job对象 Job job = Job.getInstance(configuration); // 2. 设置驱动类路径 job.setJarByClass(Fruit2Driver.class); // 3.设置Mapper & 输出的KV类型 / 传入mapper的表是fruit TableMapReduceUtil.initTableMapperJob("fruit", new Scan(),Fruit2Mapper.class, ImmutableBytesWritable.class, Put.class,job); // 4.设置Reducer类 / reducer传出的表是fruit2 TableMapReduceUtil.initTableReducerJob("fruit2", Fruit2Reducer.class,job); // 5.提交任务 boolean result = job.waitForCompletion(true); return result?0:1; } @Override public void setConf(Configuration conf) { configuration = conf; } @Override public Configuration getConf() { return configuration; } public static void main(String[] args) { try { // 如果要本地运行,换configuration,还要在resource中添加hbase-site.xml Configuration configuration = HbaseConfiguration.create(); // Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new Fruit2Driver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }4. 在本地运行,往resource文件夹中添加hbase-site.xml的配置信息 5. 在hbase端口创建fruit2表,运行driver中的主程序 6.查询 *** 作结果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)