1. 程式人生 > >java併發讀&寫檔案

java併發讀&寫檔案

package org.mushroom.multithread;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 要點:
 * 1, 將一個檔案分為 ,大小為 {@link Reader#segmentLength} 的 {@link Reader#segments} 個檔案
 * ,每個執行緒通過讀取pure block來讀取資料,被訪問的塊會通過{@link Reader#segment}標記
 * ,自增1.
 * 2, 多個執行緒的全域性變數是final {@link Reader#source}, final {@link Reader#target}和
 * {@link Reader#segment},需要保證這些變數的安全性
 */
public class Reader implements Runnable {
    private static final int BYTE = 1024;
    private static final long NEGATIVE_ONE = -1L;
    private static final long ZERO = 0L;
    private static final long ONE = 1L;
    // 全域性變數 供多執行緒訪問塊
    private final AtomicLong segment = new AtomicLong(NEGATIVE_ONE);
    // 單個檔案大小
    private final int segmentLength = 30 * BYTE * BYTE;
    // 原始檔案
    private final File source;
    // 複製後的檔案
    private final File target;
    // 檔案被分割後的塊數
    private final long segments;
    // 最後一塊檔案實際大小
    private final long remains;

    public Reader(String sourcePath, String targetPath) throws IOException {
        this.source = new File(sourcePath);
        this.target = new File(targetPath);
        if (!this.target.exists()) {
            this.target.createNewFile();
        }
        this.remains = (this.source.length() % segmentLength);
        //如果餘數不為0, 則需要多一個塊來儲存多餘的bytes,否則會丟失
        if (this.remains != ZERO) {
            this.segments = this.source.length() / segmentLength + ONE;
        } else {
            this.segments = sourcePath.length() / segmentLength;
        }

    }

    /**
     * run:
     * 1, while true: 當前塊未被訪問, 從{@link Reader#segment = 0}開始第一次訪問
     * 2, {@link Reader#readBlock(RandomAccessFile, long)}從檔案中讀取資料,並返回 byte[]
     * 3, {@link Reader#writeBlock(RandomAccessFile, byte[], long)},設定position後將緩衝寫入檔案
     */
    public void run() {
        RandomAccessFile reader = null;
        RandomAccessFile writer = null;
        try {
            reader = new RandomAccessFile(source, "r");
            writer = new RandomAccessFile(target, "rw");
            long position = -1L;
            //迴圈計數當前segment, 多個執行緒均可修改
            while ((position = segment.incrementAndGet()) < segments) {
                final byte[] bytes = readBlock(reader, position);
                writeBlock(writer, bytes, position);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close(writer);
            close(reader);
        }
    }

    private void writeBlock(RandomAccessFile writer, byte[] bytes, long position) throws IOException {
        writer.seek(position * segmentLength);
        writer.write(bytes);
    }

    /**
     * 1, reader設定position
     * 2, 建立緩衝陣列
     * 3, 將資料寫入byte[]
     * 4, 返回緩衝陣列
     *
     * @return position 供 {@link RandomAccessFile#write(byte[])}使用
     */
    private byte[] readBlock(RandomAccessFile reader, long position) throws IOException {
        reader.seek(position * segmentLength);
        final byte[] bytes = new byte[getWriteLength(position)];
        reader.read(bytes);
        return bytes;
    }

    /**
     * 獲得當前byte[]實際可寫入長度可能是{@link Reader#segmentLength} 或者 {@link Reader#remains}
     */
    private int getWriteLength(long position) throws IOException {
        if (position == segments + NEGATIVE_ONE && remains > ZERO) {
            return (int) remains;
        }
        return segmentLength;
    }

    /**
     * 關閉流的通用介面方法
     *
     * @param closeable
     */

    private void close(Closeable closeable) {
        try {
            if (Objects.nonNull(closeable)) {
                closeable.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}