目录
1.前情回顾
2.Map端表合并分析
2.1 将产品表缓存起来
2.2 在进行map之前加载缓存路径
2.3 打开文件,创建流对象
2.4 逐行读取产品表,并存放在字典中
2.5 关闭流
3. 完整代码
3.1 编写mapper程序
3.2 编写Driver程序
1.前情回顾
Hadoop案例:Mapreduce解决多个关联表整合问题(Redue Join)_小M呀~之大数据系列-CSDN博客在实际工作中可能会遇到这样的需求,将多个关联的表格整合到一张表中。https://blog.csdn.net/baidu_41833099/article/details/121745351上一篇文章是在Reduce做表合并,这种方式容易导致数据倾斜问题,因为当数据量很大的时候,多个MapTask数据全部汇总到Rdducer端处理会增大Reduer的负荷量,降低降低计算效率。因此,本文将在Map端进行多表合并。
2.Map端表合并思路分析将数据量比较小的表,缓存起来,在map之前获得缓存表。在map段进行合并,合并后就是我们想要的结果,此时并不需要进行reducer。因此,这里在Driver程序中将Reducer设置为0即可。
(注意:这种方式只适用于关联表中有小表的情形)
2.1 将产品表缓存起来//加载缓存路径 job.addCacheFile(new URI("file:///C:/ZProject/bigdata/input/tablecache/pd.txt"));2.2 在进行map之前加载缓存路径
//获取缓存路径 URI[] cacheFiles = context.getCacheFiles();2.3 打开文件,创建流对象
//获取文件对象,并开流 FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));2.4 逐行读取产品表,并存放在字典中
//通过包装流转换为reader,方便按行读取 BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"UTF-8")); //逐行读取按行处理 String line; while (StringUtils.isNotEmpty(line=reader.readLine())){ //切割一行 //0 1 小米 String[] split = line.split("t"); pdMap.put(split[0],split[1]);2.5 关闭流
//关闭流 IOUtils.closeStream(reader);3. 完整代码 3.1 编写mapper程序
package com.yangmin.mapreduce.mapjoin; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; public class MapJoinMapper extends Mapper3.2 编写Driver程序{ private Map pdMap = new HashMap<>(); private Text text = new Text(); @Override //任务开始前将pd数据缓存进来 protected void setup(Context context) throws IOException { //获取缓存路径 URI[] cacheFiles = context.getCacheFiles(); //获取文件对象,并开流 FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(new Path(cacheFiles[0])); //通过包装流转换为reader,方便按行读取 BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"UTF-8")); //逐行读取按行处理 String line; while (StringUtils.isNotEmpty(line=reader.readLine())){ //切割一行 //0 1 小米 String[] split = line.split("t"); pdMap.put(split[0],split[1]); } IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("t"); //获取商品名称 String pName = pdMap.get(split[1]); //重新拼接 String s = split[0] +"t"+ pName +"t"+ split[2]; text.set(s); //写出 context.write(text, NullWritable.get()); } }
package com.yangmin.mapreduce.mapjoin; import jdk.nashorn.internal.scripts.JO; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class MapJoinDriver { public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { //获取配置信息和Join Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //关联map job.setMapperClass(MapJoinMapper.class); //设置map的kv输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //设置最终kv输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //加载缓存路径 job.addCacheFile(new URI("file:///C:/ZProject/bigdata/input/tablecache/pd.txt")); // Map 端 Join 的逻辑不需要 Reduce 阶段,设置 reduceTask 数量为 0 job.setNumReduceTasks(0); //设置输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path("C:\ZProject\bigdata\input\inputtable2")); FileOutputFormat.setOutputPath(job,new Path("C:\ZProject\bigdata\output\output_mapjoin")); //提交 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)