1.一开始的思路是如何去重置计数器,在map阶段的时候,计数时,如果大于等于n个字段时删除,并重置计数器,开始下一行计数,但是在map阶段的时候,没有去重置计数器,map自动为我们重置了,当时很纳闷,后来找了数据测试一下发现,map阶段每执行一行数据之后,会重新执行一次map,这也就说说,我们在计数一行的数据之后,map会重新的执行,也就把计数器重置了!!!下面是测试的代码
2.数据文件:
代码:
public class Test { public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //设置文件的输入和输出 FileInputFormat.setInputPaths(job,new Path("src/main/java/Code/切分测试/Input/Test.txt")); FileOutputFormat.setOutputPath(job,new Path("src/main/java/Code/切分测试/st")); //设置 job.setMapperClass(mapper.class); job.setReducerClass(reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Boolean bl= job.waitForCompletion(true); System.exit(bl?0:1); } public static class mapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); //TODO 剔除包含a字母大于3个的字段 int count=0; for (String s : split) { if (s.contains("a")){ count+=1; } if (count>=3){ return; } } context.write(value,new Text("")); } } public static class reducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(key,value); } } } }
结果:
那么我们去处理n字段的时候用同样的方法即可!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)