HDFS写入HBase

HDFS写入HBase,第1张

HDFS写入HBase

自定义Hbase-MapReDuce1——将HDFS中的数据写入Hbase表中

文章目录

1)FruitMapper2)FruitReducer3)FruitDriver4)打包上传到hbase下

1)FruitMapper
package com.yingzi.mr1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class FruitMapper extends Mapper {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        context.write(key,value);
    }
}
2)FruitReducer
package com.yingzi.mr1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;


public class FruitReducer extends TableReducer {
    String cf1 = null;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        Configuration configuration = context.getConfiguration();

        cf1 = configuration.get("cf1");
    }

    @Override
    protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException {

        //1.遍历values:1001   Apple   Red
        for (Text value : values) {

            //2.获取每一行数据
            String[] fields = value.toString().split("t");

            //3.构建Put对象
            Put put = new Put(Bytes.toBytes(fields[0]));

            //4.给Put对象赋值
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(fields[1]));
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("color"),Bytes.toBytes(fields[2]));

            //5.写出
            context.write(NullWritable.get(),put);

        }
    }
}
3)FruitDriver
package com.yingzi.mr1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.FileInputStream;


public class FruitDriver implements Tool {

    //定义一个Configuration
    private Configuration configuration = null;

    @Override
    public int run(String[] strings) throws Exception {

        //1.获取Job对象
        Job job = Job.getInstance(configuration);

        //2.设置驱动类路径
        job.setJarByClass(FruitDriver.class);

        //3.设置Mapper&Mapper输出的KV类型
        job.setMapperClass(FruitMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        //4.设置Reducer类
        TableMapReduceUtil.initTableReducerJob(strings[1],
                FruitReducer.class,
                job);

        //5.设置输入参数
        FileInputFormat.setInputPaths(job,new Path(strings[0]));

        //6.提交任务
        boolean result = job.waitForCompletion(true);

        return result?0:1;
    }

    @Override
    public void setConf(Configuration configuration) {
        this.configuration = configuration;
    }

    @Override
    public Configuration getConf() {
        return configuration;
    }

    public static void main(String[] args) {

        try {
            Configuration configuration = new Configuration();
            int run = ToolRunner.run(configuration, new FruitDriver(), args);
            System.exit(run);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
4)打包上传到hbase下

(1)先创建fruit1表

(2)将HDFS上的文件写入fruit1表中

yarn jar Hbase-demo-1.0-SNAPSHOT.jar com.yingzi.mr1.FruitDriver /fruit.tsv fruit1

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5715435.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存