MapReduce统计流量案例(自定义序列化类)基本实现

MapReduce统计流量案例(自定义序列化类)基本实现,第1张

MapReduce统计流量案例(自定义序列化类)基本实现

输入数据(以t间隔)

   id    手机号          IP地址      访问域名(有的有有的无)  上行流量 下行流量 状态码

1	11111111111	120.196.100.99	100	900	200
2	11111111112	120.196.100.92	www.baidu.com	200	200	200
3	11111111113	120.196.100.93	800	200	200
4	11111111114	120.196.100.95	30	970	200
5	11111111116	120.196.100.95	www.baidu.com	105	895	200
6	11111111115	120.196.100.99	100	900	200
7	11111111112	120.196.100.59	www.baidu.com	300	300	200
8	11111111118	120.196.100.96	150	850	200

Maven必须配置

注意:Windos本地运行需要确定本地有Hadoop依赖并确保和Pom配置文件中版本一致,WordCountDriver中第6点输入输出需要自行修改



    4.0.0

    com.test2
    mapredceDemo1
    1.0-SNAPSHOT

    
        
        
            org.apache.hadoop
            hadoop-client
            3.0.1
        
        
        
            junit
            junit
            4.12
        
        
        
            org.slf4j
            slf4j-log4j12
            1.7.30
        
    

    
        
            
            
                maven-compiler-plugin
                3.6.1
                
                    1.8
                    1.8
                
            
            
            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    

resources目录下log4j.properties 配置

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

自定义Writable类实现(FlowBean)

package com.test.mapreduce.writable;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class FlowBean implements Writable {

    private Long upFlow;   // 上行流量
    private Long downFlow; // 下行流量
    private Long sumFlow;  // 总流量

    public Long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Long upFlow) {
        this.upFlow = upFlow;
    }

    public Long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Long downFlow) {
        this.downFlow = downFlow;
    }

    public Long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    
    public FlowBean() {
    }

    
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        // 序列化Value数据
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);

    }

    
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        // 反序列化数据
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    
    @Override
    public String toString() {
        return upFlow + "t" + downFlow + "t" + sumFlow;
    }
}

自定义Mapper类实现(FlowMapper)

package com.test.mapreduce.writable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper {

    // 定义Text对象,用于封装数据
    private Text k = new Text();
    // 定义FlowBean对象,用于封装数据
    private FlowBean v = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1. 读取每一行
        String line = value.toString();

        // 2. 字符串切割
        String[] dataList = line.split("t");

        // 3. 获取需要封装的数据
        String phone = dataList[1];                   // 手机号
        String up    = dataList[dataList.length - 3]; // 上行流量
        String down  = dataList[dataList.length - 2]; // 下行流量

        // 4. 封装K,V
        k.set(phone);
        v.setUpFlow(Long.parseLong(up));
        v.setDownFlow(Long.parseLong(down));
        v.setSumFlow();

        // 5. 输出K,V
        context.write(k, v);
    }
}

自定义Reducer类实现(FlowReducer)

package com.test.mapreduce.writable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer {

    // 定义flowBean对象,用于封装Value
    private FlowBean v = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        // 1. 每次调用reduce初始化 总上行和总下行流量(因为同一个key可能会有多个数据)
        long totalUp   = 0;
        long totalDown = 0;

        // 2. 计算总下行和总上行
        for (FlowBean flowBean : values) {
            totalUp   += flowBean.getUpFlow();
            totalDown += flowBean.getDownFlow();
        }

        // 3. 封装K,V
        v.setUpFlow(totalUp);
        v.setDownFlow(totalDown);
        v.setSumFlow();

        // 4. 输出K,V
        context.write(key, v);
    }
}

自定义Reducer类实现(FlowDriver)

package com.test.mapreduce.writable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.创建配置信息Configuration对象并获取Job单例对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2.设置关联本Driver程序的jar
        job.setJarByClass(FlowDriver.class);

        // 3.设置关联Mapper和Reducer的jar
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 4.设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5. 设置最终输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6.设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\output"));

        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }
}

输出数据

  手机号   总上行流量  总下行流量  总流量

11111111111	100	900	1000
11111111112	500	500	1000
11111111113	800	200	1000
11111111114	30	970	1000
11111111115	100	900	1000
11111111116	105	895	1000
11111111118	150	850	1000

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5684375.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存