共享单车之数据分析第5关:统计共享单车线路流量

共享单车之数据分析第5关:统计共享单车线路流量,第1张

共享单车之数据分析第5关:统计共享单车线路流量
package com.educoder.bigData.sharedbicycle;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;

import com.educoder.bigData.util.HbaseUtil;


public class LineTotalMapReduce extends Configured implements Tool {

	public static final byte[] family = "info".getBytes();

	public static class MyMapper extends TableMapper {
		protected void map(ImmutableBytesWritable rowKey, Result result, Context context)
				throws IOException, InterruptedException {
			            
						String start_latitude = Bytes.toString(result.getValue(family, "start_latitude".getBytes()));
						String start_longitude = Bytes.toString(result.getValue(family, "start_longitude".getBytes()));
						String stop_latitude = Bytes.toString(result.getValue(family, "stop_latitude".getBytes()));
						String stop_longitude = Bytes.toString(result.getValue(family, "stop_longitude".getBytes()));
						String departure = Bytes.toString(result.getValue(family, "departure".getBytes()));
						String destination = Bytes.toString(result.getValue(family, "destination".getBytes()));
						IntWritable doubleWritable = new IntWritable(1);
						context.write(new Text(start_latitude + "-" + start_longitude + "_" + stop_latitude + "-" + stop_longitude + "_" + departure + "-" + destination), doubleWritable);
		 
		 
		 
		 
		 
			
		}
	}

	public static class MyTableReducer extends TableReducer {
		@Override
		public void reduce(Text key, Iterable values, Context context)
				throws IOException, InterruptedException {
			            
						int totalNum = 0;
						for (IntWritable num : values) {
							int d = num.get();
							totalNum += d;
						}
						Put put = new Put(Bytes.toBytes(key.toString() + totalNum ));
						put.addColumn(family, "lineTotal".getBytes(), Bytes.toBytes(String.valueOf(totalNum)));
						context.write(null, put);
		 
		 
		 
		 
		 
			
		}

	}

	public int run(String[] args) throws Exception {
		// 配置Job
		Configuration conf = HbaseUtil.conf;
		// Scanner sc = new Scanner(System.in);
		// String arg1 = sc.next();
		// String arg2 = sc.next();
		String arg1 = "t_shared_bicycle";
		String arg2 = "t_bicycle_linetotal";
		try {
			HbaseUtil.createTable(arg2, new String[] { "info" });
		} catch (Exception e) {
			// 创建表失败
			e.printStackTrace();
		}
		Job job = configureJob(conf, new String[] { arg1, arg2 });
		return job.waitForCompletion(true) ? 0 : 1;
	}

	private Job configureJob(Configuration conf, String[] args) throws IOException {
		String tablename = args[0];
		String targetTable = args[1];
		Job job = new Job(conf, tablename);
		Scan scan = new Scan();
		scan.setCaching(300);
		scan.setCacheBlocks(false);// 在mapreduce程序中千万不要设置允许缓存
		// 初始化Mapreduce程序
		TableMapReduceUtil.initTableMapperJob(tablename, scan, MyMapper.class, Text.class, IntWritable.class, job);
		// 初始化Reduce
		TableMapReduceUtil.initTableReducerJob(targetTable, // output table
				MyTableReducer.class, // reducer class
				job);
		job.setNumReduceTasks(1);
		return job;
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5618473.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存