返回顶部

收藏

Java多线程实现文件快速切分

更多

前段时间需要进行大批量数据导入,DBA给提供的是CVS文件,但是每个CVS文件都好几个GB大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。

[Java]代码 ``` java import org.apache.log4j.LogManager; import org.apache.log4j.Logger;

import java.io.; import java.util.; import java.util.concurrent.*;

public class FileSplitUtil {

private final static Logger log = LogManager.getLogger(FileSplitUtil.class);
private static final long originFileSize = 1024 * 1024 * 100;// 100M
private static final int blockFileSize = 1024 * 1024 * 64;// 防止中文乱码,必须取2的N次方
/**
 * CVS文件分隔符
 */
private static final char cvsSeparator = '^';
public static  void  main(String args[]){
    long start = System.currentTimeMillis();
    try {
        String fileName = "D:\\csvtest\\aa.csv";
        File sourceFile = new File(fileName);
        if (sourceFile.length() >= originFileSize) {
            String cvsFileName = fileName.replaceAll("\\\\", "/");
            FileSplitUtil fileSplitUtil = new FileSplitUtil();
            List<String> parts=fileSplitUtil.splitBySize(cvsFileName, blockFileSize);
            for(String part:parts){
                System.out.println("partName is:"+part);
            }
        }
        System.out.println("总文件长度"+sourceFile.length()+",拆分文件耗时:" + (System.currentTimeMillis() - start) + "ms.");
    }catch (Exception e){
        log.info(e.getStackTrace());
    }

}

/**
 * 拆分文件
 *
 * @param fileName 待拆分的完整文件名
 * @param byteSize 按多少字节大小拆分
 * @return 拆分后的文件名列表
 */
public List<String> splitBySize(String fileName, int byteSize)
        throws IOException, InterruptedException {
    List<String> parts = new ArrayList<String>();
    File file = new File(fileName);
    int count = (int) Math.ceil(file.length() / (double) byteSize);
    int countLen = (count + "").length();
    RandomAccessFile raf = new RandomAccessFile(fileName, "r");
    long totalLen = raf.length();
    CountDownLatch latch = new CountDownLatch(count);

    for (int i = 0; i < count; i++) {
        String partFileName = file.getPath() + "."
                + leftPad((i + 1) + "", countLen, '0') + ".cvs";
        int readSize=byteSize;
        long startPos=(long)i * byteSize;
        long nextPos=(long)(i+1) * byteSize;
        if(nextPos>totalLen){
            readSize= (int) (totalLen-startPos);
        }
        new SplitRunnable(readSize, startPos, partFileName, file, latch).run();
        parts.add(partFileName);
    }
    latch.await();//等待所有文件写完
    //由于切割时可能会导致行被切断,加工所有的的分割文件,合并行
    mergeRow(parts);
    return parts;
}

/**
 * 分割处理Runnable
 *
 * @author supeidong
 */
private class SplitRunnable implements Runnable {
    int byteSize;
    String partFileName;
    File originFile;
    long startPos;
    CountDownLatch latch;
    public SplitRunnable(int byteSize, long startPos, String partFileName,
                         File originFile, CountDownLatch latch) {
        this.startPos = startPos;
        this.byteSize = byteSize;
        this.partFileName = partFileName;
        this.originFile = originFile;
        this.latch = latch;
    }

    public void run() {
        RandomAccessFile rFile;
        OutputStream os;
        try {
            rFile = new RandomAccessFile(originFile, "r");
            byte[] b = new byte[byteSize];
            rFile.seek(startPos);// 移动指针到每“段”开头
            int s = rFile.read(b);
            os = new FileOutputStream(partFileName);
            os.write(b, 0, s);
            os.flush();
            os.close();
            latch.countDown();
        } catch (IOException e) {
            log.error(e.getMessage());
            latch.countDown();
        }
    }
}

/**
 * 合并被切断的行
 *
 * @param parts
 */
private void mergeRow(List<String> parts) {
    List<PartFile> partFiles = new ArrayList<PartFile>();
    try {
        //组装被切分表对象
        for (int i=0;i<parts.size();i++) {
            String partFileName=parts.get(i);
            File splitFileTemp = new File(partFileName);
            if (splitFileTemp.exists()) {
                PartFile partFile = new PartFile();
                BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(splitFileTemp),"gbk"));
                String firstRow = reader.readLine();
                String secondRow = reader.readLine();
                String endRow = readLastLine(partFileName);
                partFile.setPartFileName(partFileName);
                partFile.setFirstRow(firstRow);
                partFile.setEndRow(endRow);
                if(i>=1){
                    String prePartFile=parts.get(i - 1);
                    String preEndRow = readLastLine(prePartFile);
                    partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow));
                }

                partFiles.add(partFile);
                reader.close();
            }
        }
        //进行需要合并的行的写入
        for (int i = 0; i < partFiles.size() - 1; i++) {
            PartFile partFile = partFiles.get(i);
            PartFile partFileNext = partFiles.get(i + 1);
            StringBuilder sb = new StringBuilder();
            if (partFileNext.getFirstIsFull()) {
                sb.append("\r\n");
                sb.append(partFileNext.getFirstRow());
            } else {
                sb.append(partFileNext.getFirstRow());
            }
            writeLastLine(partFile.getPartFileName(),sb.toString());
        }
    } catch (Exception e) {
        log.error(e.getMessage());
    }
}

/**
 * 得到某个字符出现的次数
 * @param s
 * @return
 */
private int getCharCount(String s) {
    int count = 0;
    for (int i = 0; i < s.length(); i++) {
        if (s.charAt(i) == cvsSeparator) {
            count++;
        }
    }
    return count;
}

/**
 * 采用BufferedInputStream方式读取文件行数
 *
 * @param filename
 * @return
 */
public int getFileRow(String filename) throws IOException {
    InputStream is = new BufferedInputStream(new FileInputStream(filename));
    byte[] c = new byte[1024];
    int count = 0;
    int readChars = 0;
    while ((readChars = is.read(c)) != -1) {
        for (int i = 0; i < readChars; ++i) {
            if (c[i] == '\n')
                ++count;
        }
    }
    is.close();
    return count;
}

/**
 * 读取最后一行数据
 * @param filename
 * @return
 * @throws IOException
 */
private String readLastLine(String filename) throws IOException {
    // 使用RandomAccessFile , 从后找最后一行数据
    RandomAccessFile raf = new RandomAccessFile(filename, "r");
    long len = raf.length();
    String lastLine = "";
    if(len!=0L) {
        long pos = len - 1;
        while (pos > 0) {
            pos--;
            raf.seek(pos);
            if (raf.readByte() == '\n') {
                lastLine = raf.readLine();
                lastLine=new String(lastLine.getBytes("8859_1"), "gbk");
                break;
            }
        }
    }
    raf.close();
    return lastLine;
}
/**
 * 修改最后一行数据
 * @param fileName
 * @param lastString
 * @return
 * @throws IOException
 */
private void writeLastLine(String fileName,String lastString){
    try {
        // 打开一个随机访问文件流,按读写方式
        RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");
        // 文件长度,字节数
        long fileLength = randomFile.length();
        //将写文件指针移到文件尾。
        randomFile.seek(fileLength);
        //此处必须加gbk,否则会出现写入乱码
        randomFile.write(lastString.getBytes("gbk"));
        randomFile.close();
    } catch (IOException e) {
        log.error(e.getMessage());
    }
}
/**
 * 左填充
 *
 * @param str
 * @param length
 * @param ch
 * @return
 */
public static String leftPad(String str, int length, char ch) {
    if (str.length() >= length) {
        return str;
    }
    char[] chs = new char[length];
    Arrays.fill(chs, ch);
    char[] src = str.toCharArray();
    System.arraycopy(src, 0, chs, length - src.length, src.length);
    return new String(chs);
}

/**
 * 合并文件行内部类
 */
class PartFile {
    private String partFileName;
    private String firstRow;
    private String endRow;
    private boolean firstIsFull;

    public String getPartFileName() {
        return partFileName;
    }

    public void setPartFileName(String partFileName) {
        this.partFileName = partFileName;
    }

    public String getFirstRow() {
        return firstRow;
    }

    public void setFirstRow(String firstRow) {
        this.firstRow = firstRow;
    }

    public String getEndRow() {
        return endRow;
    }

    public void setEndRow(String endRow) {
        this.endRow = endRow;
    }

    public boolean getFirstIsFull() {
        return firstIsFull;
    }

    public void setFirstIsFull(boolean firstIsFull) {
        this.firstIsFull = firstIsFull;
    }
}

}

```

标签:java

收藏

0人收藏

支持

0

反对

0

发表评论