自定义Hbase-MapReDuce1——将HDFS中的数据写入Hbase表中
文章目录1)FruitMapper2)FruitReducer3)FruitDriver4)打包上传到hbase下
1)FruitMapperpackage com.yingzi.mr1; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FruitMapper extends Mapper2)FruitReducer{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(key,value); } }
package com.yingzi.mr1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import java.io.IOException; public class FruitReducer extends TableReducer3)FruitDriver{ String cf1 = null; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration configuration = context.getConfiguration(); cf1 = configuration.get("cf1"); } @Override protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { //1.遍历values:1001 Apple Red for (Text value : values) { //2.获取每一行数据 String[] fields = value.toString().split("t"); //3.构建Put对象 Put put = new Put(Bytes.toBytes(fields[0])); //4.给Put对象赋值 put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1])); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2])); //5.写出 context.write(NullWritable.get(),put); } } }
package com.yingzi.mr1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.FileInputStream; public class FruitDriver implements Tool { //定义一个Configuration private Configuration configuration = null; @Override public int run(String[] strings) throws Exception { //1.获取Job对象 Job job = Job.getInstance(configuration); //2.设置驱动类路径 job.setJarByClass(FruitDriver.class); //3.设置Mapper&Mapper输出的KV类型 job.setMapperClass(FruitMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); //4.设置Reducer类 TableMapReduceUtil.initTableReducerJob(strings[1], FruitReducer.class, job); //5.设置输入参数 FileInputFormat.setInputPaths(job,new Path(strings[0])); //6.提交任务 boolean result = job.waitForCompletion(true); return result?0:1; } @Override public void setConf(Configuration configuration) { this.configuration = configuration; } @Override public Configuration getConf() { return configuration; } public static void main(String[] args) { try { Configuration configuration = new Configuration(); int run = ToolRunner.run(configuration, new FruitDriver(), args); System.exit(run); } catch (Exception e) { e.printStackTrace(); } } }4)打包上传到hbase下
(1)先创建fruit1表
(2)将HDFS上的文件写入fruit1表中
yarn jar Hbase-demo-1.0-SNAPSHOT.jar com.yingzi.mr1.FruitDriver /fruit.tsv fruit1
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)