直接在Map阶段使用各种规则将数据进行过滤即可,不需使用Reduce阶段。
输入数据 Maven和log4j.properties配置参考 MapReduce统计流量案例 中的配置
自定义Mapper类实现(WebLogMapper)package com.test.mapreduce.weblog; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WebLogMapper extends Mapper自定义Driver类实现(WebLogDriver){ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.获取每一行转换为字符串 String line = value.toString(); // 2.调用自定义解析日志函数 boolean result = parseLog(line); // 3.判断日志不合法退出 if (!result){ return; } // 4.合法则写出 context.write(value, NullWritable.get()); } private boolean parseLog(String line) { // 2.1 字符串切割 String[] fields = line.split(" "); // 2.2 判断日志长度大于11为合法 if (fields.length > 11) { return true; }else { return false; } } }
package com.test.mapreduce.weblog; import com.atguigu.mapreduce.outputformat.LogDriver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WebLogDriver { public static void main(String[] args) throws Exception { // 1.创建配置信息Configuration对象并获取Job单例对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.设置关联本Driver job.setJarByClass(LogDriver.class); // 3.设置关联Mapper job.setMapperClass(WebLogMapper.class); // 4.设置最终输出的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 5.设置reducetask个数为0 job.setNumReduceTasks(0); // 6.设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path("D:\input")); FileOutputFormat.setOutputPath(job, new Path("D:\output")); // 7.提交 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }数据输出
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)