package com.educoder.bigData.sharedbicycle; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import com.educoder.bigData.util.HbaseUtil; public class LineTotalMapReduce extends Configured implements Tool { public static final byte[] family = "info".getBytes(); public static class MyMapper extends TableMapper{ protected void map(ImmutableBytesWritable rowKey, Result result, Context context) throws IOException, InterruptedException { String start_latitude = Bytes.toString(result.getValue(family, "start_latitude".getBytes())); String start_longitude = Bytes.toString(result.getValue(family, "start_longitude".getBytes())); String stop_latitude = Bytes.toString(result.getValue(family, "stop_latitude".getBytes())); String stop_longitude = Bytes.toString(result.getValue(family, "stop_longitude".getBytes())); String departure = Bytes.toString(result.getValue(family, "departure".getBytes())); String destination = Bytes.toString(result.getValue(family, "destination".getBytes())); IntWritable doubleWritable = new IntWritable(1); context.write(new Text(start_latitude + "-" + start_longitude + "_" + stop_latitude + "-" + stop_longitude + "_" + departure + "-" + destination), doubleWritable); } } public static class MyTableReducer extends TableReducer { @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int totalNum = 0; for (IntWritable num : values) { int d = num.get(); totalNum += d; } Put put = new Put(Bytes.toBytes(key.toString() + totalNum )); put.addColumn(family, "lineTotal".getBytes(), Bytes.toBytes(String.valueOf(totalNum))); context.write(null, put); } } public int run(String[] args) throws Exception { // 配置Job Configuration conf = HbaseUtil.conf; // Scanner sc = new Scanner(System.in); // String arg1 = sc.next(); // String arg2 = sc.next(); String arg1 = "t_shared_bicycle"; String arg2 = "t_bicycle_linetotal"; try { HbaseUtil.createTable(arg2, new String[] { "info" }); } catch (Exception e) { // 创建表失败 e.printStackTrace(); } Job job = configureJob(conf, new String[] { arg1, arg2 }); return job.waitForCompletion(true) ? 0 : 1; } private Job configureJob(Configuration conf, String[] args) throws IOException { String tablename = args[0]; String targetTable = args[1]; Job job = new Job(conf, tablename); Scan scan = new Scan(); scan.setCaching(300); scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存 // 初始化Mapreduce程序 TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, IntWritable.class, job); // 初始化Reduce TableMapReduceUtil.initTableReducerJob(targetTable, // output table MyTableReducer.class, // reducer class job); job.setNumReduceTasks(1); return job; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)