- 1 需求分析
- 1.1 数据一览
- 1.2 数据清洗
- 1.3 分类下的统计与排序
- 1.4 分类下的求均值与排序
- 1.5 多维度下的综合统计
- 2 技术实现
- 2.1 环境搭建
- 2.2 实现:数据清洗
- 2.3 实现:分类下的统计与排序
- 2.4 实现:分类下的求均值与排序
- 2.5 实现:多维度下的综合统计
- 3 完整代码与数据文件
如下图所示,共一万多条数据,除去首行,共 13036 条酒店数据。
…
不符合要求的数据为:
- 每条记录如果为NULL的字段数量大于等3;
- “星级、评论数、评分”这三个字段有一个为NULL;
- 重复的记录,将重复的去掉;
MapReduce 程序1:删除满足以上三个条件的记录(只要满足一个就需要被清理),并打印每类不符合要求的记录的数量
1.3 分类下的统计与排序
根据数据清洗的输出数据集,再编写两个 MapReduce 程序,具体需求如下:
MapReduce 程序2:统计各城市的酒店数量和房间数量
数据定义如下:
数据样式如下:
1.4 分类下的求均值与排序MapReduce 程序3:以城市房间数量降序排列并输出前10条统计结果
MapReduce 程序4:请根据数据清洗的输出数据集,编写Mapreduce程序统计各省直销拒单率,以城市直销拒单率升序排列并输出前10条统计结果,要求保留6为小数
数据定义如下:
数据样式如下:
2 技术实现 2.1 环境搭建MapReduce 程序5:以内蒙古、辽宁、四川、陕西、安徽为例,多维度分析说明几个省酒店的综合运营情况:分析维度:平均评分、酒店直销拒单率
本项目基于 Java 编写,在这里首先新建一个 Maven 工程,导入相关依赖如下。
2.2 实现:数据清洗org.apache.hadoop hadoop-client3.1.3 junit junit4.12 org.slf4j slf4j-log4j121.7.30
在这里回顾一下上文的具体需求:
【开始编写代码】
1、首先写一个工具类,用来封装判断上面三种条件的静态方法,并进行合理的单元测试,通过后再写接下来的逻辑代码。
public class ClearUtil { public static boolean ifNullFieldGreaterThree(String line) { String[] split = line.split(","); int count = 0; //记录每行NULL字段的数量 for (String field : split) { if (field.equals("NULL")) { count++; } if (count >= 3) { return true; } } return false; } public static boolean ifOneOfThreeIsNull(String line) { String[] split = line.split(","); if (split[6].equals("NULL") || split[10].equals("NULL") || split[11].equals("NULL")) { return true; } return false; } public static boolean ifRepeat(String line, HashSet hashSet, HashSet repeatSet) { if (!hashSet.contains(line)) { hashSet.add(line); return false; } repeatSet.add(line); return true; } public static boolean ifHotelNumIsNull(String line) { String[] split = line.split(","); return split[8].equals("NULL"); } }
2、在 Mapper 类(用 Mapper 类这个词老是让我联想到 SSM 架构中的 Mapper 层,一直感觉很别扭,但实现的接口确实就是 Mapper<…>)的 map 方法中我们处理前两个条件(“每条记录如果为NULL的字段数量大于等3”、““星级、评论数、评分”这三个字段有一个为NULL”),我的思路是进入 map 方法时用一个布尔类型的遍历来记录状态,开始默认为 true,若是在 map 方法的接下来的处理过程中,并未遇到上述两个条件之一,那么便仍是 true,否则,便被赋值为 false,这样,通过这个布尔类型的状态变量,便可以 *** 作此数据是否可以继续通行到 Reducer 中,而过程中的这两种情况的次数记录,可用 hadoop-client 提供的 Counter 类来解决,用 increment 方法进行每次的自增,最终配合 log4j 打印出来每种情况的次数,下面第三个条件的次数统计也同理。
(注:这个 Mapper 类中,我在编写时也额外补充了一段代码 if(ClearUtil.ifHotelNumIsNull(line)){...},主要是后续的一个需求中,要求统计酒店的房间数量,而房间数量这个字段在给定的数据表中,本身就存在大量为 NULL 的情况,但在课上,我看老师是在后续的那步 *** 作中用 try catch 来抛出异常,真是让我哭笑不得,这难道不是数据清洗阶段就要做好的吗,而且你那么多异常,多少也是很消耗资源的写法吧,十分无奈)
public class ClearMapper extends Mapper{ IntWritable iw = new IntWritable(1); HashSet hashSet = null; HashSet repeatSet = null; int noOne = 0; //第1种不满足的情况 int noTwo = 0; //第2种不满足的情况 int noThree = 0; //第3种不满足的情况 @Override protected void setup(Context context) throws IOException, InterruptedException { hashSet = new HashSet(); repeatSet = new HashSet(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Counter counter1 = context.getCounter("NUMS", "counter1"); Counter counter2 = context.getCounter("NUMS", "counter2"); boolean go = true; //是否进入 reducer 处理 String line = value.toString(); if (ClearUtil.ifNullFieldGreaterThree(line)) { counter1.increment(1); go = false; } if (ClearUtil.ifOneOfThreeIsNull(line)) { counter2.increment(1); go = false; } //【额外补充】 if (ClearUtil.ifHotelNumIsNull(line)) { go = false; } //可能包含重复,也可能不包含重复:使用 reduce 去重 if (go) { context.write(value, iw); } } }
3、第三个条件(重复的记录,将重复的去掉)的判断我让经历了前两个条件的判断后状态变量仍为 true 的数据进入到 Reducer 中进行,因为这第三个条件涉及统计次数,因此我们让这一整条记录作为键,而让 1 来作为值,这样便可以在 Reducer 中将同一键的记录进行归并,进而我们可以进行值的加和,从而判断出是否有重复的记录,值大于 2 时,键对应的记录就是出现了多次,是重复出现的情况。(注:其实这样也不一定能保证第三个条件的 Counter 次数统计就是正确的,因为可能其余数据也有重复的情况,只是在 Mapper 中就因为触犯了前两个条件,导致布尔状态变量为 false,进而根本就没有进入到 Reducer 中,因此,但是如果要改进的话,我能想到的方法大多都是需要牺牲很多空间的和降低效率的,因此十分得不偿失,暂时为想到好的改进方法,如果有知道的大佬也请在评论区一起交流)。
public class ClearReducer extends Reducer{ @Override protected void reduce(Text key, Iterable iter, Context context) throws IOException, InterruptedException { //记录重复数量 Counter counter3 = context.getCounter("NUMS", "counter3"); int sum = 0; for (IntWritable iw : iter) { sum += iw.get(); } //sum > 1 说明有重复的 value,使用计数器记录多出的数量 if (sum > 1) { counter3.increment(sum - 1); } context.write(NullWritable.get(), key); } }
4、主启动类
MapReduce 的启动类十分模板化,此处不再细讲。
public class MainClear { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息以及获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 关联本Driver程序的jar job.setJarByClass(MainClear.class); // 3 关联Mapper和Reducer的jar job.setMapperClass(ClearMapper.class); job.setReducerClass(ClearReducer.class); // 4 设置Mapper输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 设置最终输出kv类型 job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); // 6 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path("C:\Users\DELL\Desktop\hadoop课设\课设\hadoop-hotel\hotel.csv")); FileOutputFormat.setOutputPath(job, new Path("C:\Users\DELL\Desktop\hadoop课设\课设\hadoop-hotel\src\main\java\com\zlc\mapreduce\clear\out")); // 7 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
5、测试效果截图
(1)三个条件统计次数的各自情况(我这个是正确的结果,反而在课上时,老师给出的答案让我哭笑不得,分别是 2842,7148,2,而我这里的 9990 本身就等于 2842 + 7148,所以问题出在哪也就可见一斑了,那位老师她错把第一种条件的计数统计条件,”且“上了第二种条件,导致第一种情况的统计数量直接少了一大半)
(2)清洗后的数据,有 2983 行
在这里回顾一下上文的具体需求:
【开始编写代码】
1、针对此需求,我们自然可以想到以 省,市 作为 Key,以 酒店数量,房间数量 作为 Value,因为这样就可以在 Mapper 结束之后根据 省,市 来进行分组,从而 在 Reducer 中接收相同分组的数据,进而对之前的 VALUEOUT- 酒店数量,房间数量,也就是现在的 VALUEIN 进行字符串的转换、拆分、加和,从而最终统计成功,大体思路也就是这样,只要理解 Mapper 和 Reducer 类中的 map、reduce 方法的工作原理,那么实现这个功能并不困难,本文是属于项目记录型的文章,这些具体的工作原理不再赘述,读者如有需求可去搜索细节型的文章或资料。
代码直接都给出,这里没什么需要过多解释的,可能直接看代码也比文字更能表达思路。
2、Mapper
public class CensusMapper extends Mapper{ Text newKey = null; Text newValue = null; @Override protected void setup(Mapper .Context context) throws IOException, InterruptedException { newKey = new Text(); newValue = new Text(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //Key:省,市 //Value:酒店数量,房间数量 String[] split = value.toString().split(","); String province = split[3]; String city = split[4]; int hotelNum = 1; //一行记录就是一个酒店 String roomNum = split[8]; newKey.set(province + "," + city); newValue.set(hotelNum + "," + roomNum); context.write(newKey, newValue); } }
3、Reducer
public class CensusReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int hotelNum = 0; int roomNum = 0; for (Text value : values) { String[] split = value.toString().split(","); try { hotelNum += Integer.parseInt(split[0]); roomNum += Integer.parseInt(split[1]); } catch (NumberFormatException e) {} } context.write(key, new Text(hotelNum + "," + roomNum)); } }
4、主启动类
与上面数据清理步骤的启动类几乎相同,只需改一下类名、Key-Value 的类型,路径指向即可(这里的输入路径的指向是第一步数据清洗的结果文件,即对应的 p art-r-00000 文件)
5、测试效果截图
在这里回顾一下上文的具体需求:
【开始编写代码】
1、针对此需求,我们仍然可以以 省,市 作为 Key,以 房间数量 作为 Value,这样就可以在 Mapper 结束之后根据 省,市 来进行分组,从而 在 Reducer 中接收相同分组的数据,进而对 VALUEIN 房间数量进行字符串的转换、拆分、加和,从而最终统计成功,当然,这只是能满足统计数量的需求,但是,本需求需要的是还要对这个房间数量进行降序。在 Hadoop依赖中的 IntWritable,也就是表示正数的这个变量,有一个public int compareTo(IntWritable o) 方法,我们可以通过重写这个方法,来使得我们的结果有序,而还有一个条件也要满足,也即是我们重写后的 MyIntWritable 类,必须作为 Reducer 输出的键的类型,这样才可以进行排序显示。
2、MyIntWritable
public class MyIntWritable extends IntWritable { public MyIntWritable() { super(); } public MyIntWritable(int roomNum) { super(roomNum); } @Override public int compareTo(IntWritable o) { int thisValue = super.get(); int thatValue = o.get(); return thatValue - thisValue; //倒序 } }
2、Mapper
public class OrderMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { //Key:省,市 //Value:房间数量 String[] split = value.toString().split("t"); String[] splitRight = split[1].split(","); int roomNum = Integer.parseInt(splitRight[1]); context.write(new MyIntWritable(roomNum), new Text(split[0])); } }
3、Reducer
public class OrderReducer extends Reducer{ int count = 1; @Override protected void reduce(MyIntWritable key, Iterable values, Context context) throws IOException, InterruptedException { if (count <= 10) { for (Text value : values) { context.write(key, value); } count++; } } }
4、主启动类
(结构同上,略)
5、测试效果截图
在这里回顾一下上文的具体需求:
【开始编写代码】
1、此 2.4 的实现与 2.3 的实现几乎相同,只是 2.3 中的统计在这里变成了求均值,结果变为了限定小数位数浮点型小数,不再是整数,而解决均值,也就是在 Reducer 中预先统计当前来到 map 分组的数据个数,然后再用统计的结果除以个数即可,解决限定位数的小数问题,我这里的思路十分简单粗暴,也就是再写一个 Mapper、Reducer,对之前的 Value 再进行单独的小数处理,当然,由于要升序排序,而 DoubleWritable 类作为键时,由于其 compareTo 的具体实现就是会使结果序列升序排序,因此,可以直接让 DoubleWritable 类作为 Mapper 过程的键 ,升序的处理与后续小数位数的处理,我都放在了阶段二中完成,也就是 part2,阶段一,也就是 part1 只负责进行统计各省直销拒单率的平均值,我这种分两步处理的方式十分简单粗暴,但自然效率可能会低一些。
2、Mapper1
public class RejectionMapper extends Mapper{ Text newKey = null; Text newValue = null; @Override protected void setup(Context context) throws IOException, InterruptedException { newKey = new Text(); newValue = new Text(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (key.get() == 0) return; String line = value.toString(); String[] split = line.split(","); newKey.set(split[3]); //省份作为键 newValue.set(split[24]);//直销拒单率作为值 context.write(newKey, newValue); } }
3、Reducer1
public class RejectionReducer extends Reducer{ Text newValue = null; @Override protected void setup(Context context) throws IOException, InterruptedException { newValue = new Text(); } @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { double sum = 0; double num = 0; for (Text value : values) { String rate = value.toString(); try { sum += Double.parseDouble(rate.substring(0, rate.length() - 1)); } catch (NumberFormatException e) {} num++; } double result = sum / (num * 100); //result = Double.parseDouble(new DecimalFormat("0.000000").format(result)); //省份作为键 newValue.set(result + ""); //平均直销拒单率作为值 context.write(key, newValue); } }
4、主启动类
(结果同上,略)
5、此步的效果截图
6、接下来我们再写一对 Mapper、Recuder 来处理上述结果
7、Mapper2
public class RejectionMapper2 extends Mapper{ DoubleWritable newKey = null; Text newValue = null; @Override protected void setup(Context context) throws IOException, InterruptedException { newKey = new DoubleWritable(); newValue = new Text(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // Key: 拒销率 // Value: 省份 String line = value.toString(); String[] split = line.split("t"); double result = Double.parseDouble(split[1]); newKey.set(result); newValue.set(split[0]); context.write(newKey, newValue); } }
8、Recuder2
public class RejectionReducer2 extends Reducer{ @Override protected void reduce(DoubleWritable key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { for (int i = 0; i < 10; i++) { for (Text value : values) { //平均直销率,保留6位小数 String rate = new DecimalFormat("0.000000").format(Double.parseDouble(key.toString())); context.write(value, new Text(rate)); } } } }
9、主启动类
(结果同上,略)
10、此步的效果截图
在这里回顾一下上文的具体需求:
【开始编写代码】
1、针对此需求,我们仍然可以以 省 作为 Key,以 平均评分,酒店直销拒单率 作为 Value,这样就可以在 Mapper 结束之后根据 省 来进行分组,从而在 Reducer 中接收相同省份分组的数据,进而对 VALUEIN 平均评分,酒店直销拒单率 进行字符串的转换、拆分、加和,求均值,从而最终统计成功。
Mapper
public class CensusManyMapper extends Mapper{ Text newKey = null; Text newValue = null; @Override protected void setup(Context context) throws IOException, InterruptedException { newKey = new Text(); newValue = new Text(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (key.get() == 0) return; String line = value.toString(); String[] split = line.split(","); String province = split[3]; String score = split[10]; String hotelRejectRate = split[22]; if (!(province.equals("内蒙古") || province.equals("辽宁") || province.equals("四川") || province.equals("陕西") || province.equals("安徽"))) { return; } newKey.set(province); //省份作为键 newValue.set(score + "," + hotelRejectRate);//评分、酒店直销拒单率作为值 context.write(newKey, newValue); } }
2、Reducer
public class CensusManyReducer extends Reducer{ Text newValue = null; @Override protected void setup(Context context) throws IOException, InterruptedException { newValue = new Text(); } @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { double scoreSum = 0; double hotelRejectRateSum = 0; double num = 0; for (Text value : values) { String line = value.toString(); String[] split = line.split(","); scoreSum += Double.parseDouble(split[0]); hotelRejectRateSum += Double.parseDouble(split[1].substring(0, split[1].length() - 1)); num++; } double scoreResult = scoreSum / num; double hotelRejectRateResult = hotelRejectRateSum / (num * 100); newValue.set(scoreResult + "," + hotelRejectRateResult); context.write(key, newValue); } }
3、主启动类
(结果同上,略)
4、效果截图
3 完整代码与数据文件下载链接
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)