内存不足时的大文件排序算法(spark shuffle的排序算法)

内存不足时的大文件排序算法(spark shuffle的排序算法),第1张

内存不足时的大文件排序算法(spark shuffle的排序算法) 1、问题场景

        例如:当前磁盘2T,内存16G,文件大小500G。现在需要对500G的大文件进行排序,并存道另一个文件中。

        抽象:当前有包含10000行数的文件,再只能使用长度为1000的数组的前提下,对文件中的数据进行排序,并存道另一个文件中。

2、问题分析

        首先,数据的总量大于了所能使用的内存数据结构的容量,所以肯定不能完全通过内存排序。因此需要采用一种分治的思想来实现全局排序。

        (1)数组总量10000,数组容量1000,所以可以分十次读取数据,每次读取1000条数据,再内存中对1000条数据进行排序。

        (2)单次排完序的数据写入磁盘临时文件。总供会有10个局部有序的磁盘文件。

        (3)同时打开10个磁盘文件,创建reader。每个磁盘文件都读入第一行数据,即局部最小的数据。

        (4)找到10个局部最小数据中的最小值。

        (5)对每个磁盘文件,判断当前最小值是否等于全局最小值。如果等于则写出,并获取下一行数据,继续比较。

        (6)当所有文件都比较并输出完之后,重新计算当前10个局部最小数据中的最小值。继续比较输出,直到所有磁盘文件全部读完。

        (7)此时输出的文件就是一个全局有序的文件。

3、生成测试数据
public class CreateTestData {
    public static void main(String[] args) throws IOException {

        String filePath = "C:\Users\yangshen\code\java\stu-project\src\main\resources\input.txt";
        BufferedWriter writer = new BufferedWriter(new FileWriter(filePath));

        Random random = new Random(1);

        int num = 0;
        while (num < 10000) {
            writer.write(String.valueOf(random.nextInt(10000)));
            writer.newline();
            num++;
        }

        writer.close();
    }
}

4、排序算法
public class SortWithMuchData {
    public static void main(String[] args) throws Exception {
        String fileIn = "C:\Users\yangshen\code\java\stu-project\src\main\resources\input.txt";
        String fileOut = "C:\Users\yangshen\code\java\stu-project\src\main\resources\output.txt";
        new Sort(fileIn, fileOut, 1000).run();
    }
}

class Sort {

    private final String fileIn;
    private final String fileOut;
    private int[] data;
    private final int size;
    private final List tempFiles;

    public Sort(String fileIn, String fileOut, int size) {
        this.fileIn = fileIn;
        this.fileOut = fileOut;
        this.data = new int[size];
        this.size = size;
        this.tempFiles = new ArrayList<>();
    }

    public void run() throws Exception {
        System.out.println("begin...");
        BufferedReader reader = new BufferedReader(new FileReader(fileIn));

        String line;
        int num = 0;
        while ((line = reader.readLine()) != null) {
            if (num < size) {
                data[num] = Integer.parseInt(line);
                num++;
            } else {
                sortAndSpill();
                data = new int[size];
                num = 0;
            }
        }

        if (data.length > 0) {
            sortAndSpill();
        }

        merge();
        clear();
        System.out.println("end...");
    }

    
    private void merge() throws Exception {
        new Merge(tempFiles, fileOut).run();
    }

    
    private void clear() {
        for (String tempFile : tempFiles) {
            File file = new File(tempFile);
            if (file.exists()) {
                file.delete();
            }
        }
    }

    
    private void sortAndSpill() throws IOException {
        Arrays.sort(data);
        spill();
    }

    
    private void spill() throws IOException {
        String tempFileName = createTempFileName();
        BufferedWriter writer = new BufferedWriter(new FileWriter(tempFileName));
        for (int d : data) {
            writer.write(String.valueOf(d));
            writer.newline();
        }
        writer.close();
        System.out.println("生成临时文件:" + tempFileName);
        tempFiles.add(tempFileName);
    }

    
    private String createTempFileName() {
        return "C:\Users\yangshen\code\java\stu-project\src\main\resources\" + UUID.randomUUID().toString() + ".txt";
    }

    static class Merge {

        private int minValue;
        private final int fileNum;
        private final int[] tmpData;
        private final BufferedReader[] readers;
        private final String[] tempFiles;
        private final BufferedWriter writer;

        public Merge(List tempFiles, String fileOut) throws IOException {
            this.minValue = -1;
            this.fileNum = tempFiles.size();
            this.tempFiles = tempFiles.toArray(new String[this.fileNum]);
            this.tmpData = new int[this.fileNum];
            this.readers = new BufferedReader[this.fileNum];
            this.writer = new BufferedWriter(new FileWriter(fileOut));
            init();
        }

        private void init() throws IOException {
            for (int i = 0; i < tempFiles.length; i++) {
                readers[i] = new BufferedReader(new FileReader(tempFiles[i]));
                tmpData[i] = Integer.parseInt(readers[i].readLine());
            }
        }

        public void run() throws IOException {
            while (true) {
                int ok = 0;
                resetMinValue();
                for (int i = 0; i < tmpData.length; i++) {
                    while (tmpData[i] == minValue) {
                        System.out.println(tmpData[i]);
                        write(String.valueOf(tmpData[i]));
                        fillTempData(i);
                    }
                    if (tmpData[i] == -1) {
                        ok++;
                    }
                }
                if (ok == fileNum) {
                    break;
                }
            }

            writer.close();
        }

        
        private void fillTempData(int i) throws IOException {
            String line;
            if ((line = readers[i].readLine()) != null) {
                tmpData[i] = Integer.parseInt(line);
            } else {
                tmpData[i] = -1;
            }
        }

        
        private void write(String item) throws IOException {
            writer.write(item);
            writer.newline();
        }

        
        private void resetMinValue() {
            minValue = Arrays.stream(tmpData).filter(x -> x != -1).min().getAsInt();
        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5704924.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存