java併發讀&寫檔案
阿新 • • 發佈:2019-01-28
package org.mushroom.multithread; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; /** * 要點: * 1, 將一個檔案分為 ,大小為 {@link Reader#segmentLength} 的 {@link Reader#segments} 個檔案 * ,每個執行緒通過讀取pure block來讀取資料,被訪問的塊會通過{@link Reader#segment}標記 * ,自增1. * 2, 多個執行緒的全域性變數是final {@link Reader#source}, final {@link Reader#target}和 * {@link Reader#segment},需要保證這些變數的安全性 */ public class Reader implements Runnable { private static final int BYTE = 1024; private static final long NEGATIVE_ONE = -1L; private static final long ZERO = 0L; private static final long ONE = 1L; // 全域性變數 供多執行緒訪問塊 private final AtomicLong segment = new AtomicLong(NEGATIVE_ONE); // 單個檔案大小 private final int segmentLength = 30 * BYTE * BYTE; // 原始檔案 private final File source; // 複製後的檔案 private final File target; // 檔案被分割後的塊數 private final long segments; // 最後一塊檔案實際大小 private final long remains; public Reader(String sourcePath, String targetPath) throws IOException { this.source = new File(sourcePath); this.target = new File(targetPath); if (!this.target.exists()) { this.target.createNewFile(); } this.remains = (this.source.length() % segmentLength); //如果餘數不為0, 則需要多一個塊來儲存多餘的bytes,否則會丟失 if (this.remains != ZERO) { this.segments = this.source.length() / segmentLength + ONE; } else { this.segments = sourcePath.length() / segmentLength; } } /** * run: * 1, while true: 當前塊未被訪問, 從{@link Reader#segment = 0}開始第一次訪問 * 2, {@link Reader#readBlock(RandomAccessFile, long)}從檔案中讀取資料,並返回 byte[] * 3, {@link Reader#writeBlock(RandomAccessFile, byte[], long)},設定position後將緩衝寫入檔案 */ public void run() { RandomAccessFile reader = null; RandomAccessFile writer = null; try { reader = new RandomAccessFile(source, "r"); writer = new RandomAccessFile(target, "rw"); long position = -1L; //迴圈計數當前segment, 多個執行緒均可修改 while ((position = segment.incrementAndGet()) < segments) { final byte[] bytes = readBlock(reader, position); writeBlock(writer, bytes, position); } } catch (IOException e) { e.printStackTrace(); } finally { close(writer); close(reader); } } private void writeBlock(RandomAccessFile writer, byte[] bytes, long position) throws IOException { writer.seek(position * segmentLength); writer.write(bytes); } /** * 1, reader設定position * 2, 建立緩衝陣列 * 3, 將資料寫入byte[] * 4, 返回緩衝陣列 * * @return position 供 {@link RandomAccessFile#write(byte[])}使用 */ private byte[] readBlock(RandomAccessFile reader, long position) throws IOException { reader.seek(position * segmentLength); final byte[] bytes = new byte[getWriteLength(position)]; reader.read(bytes); return bytes; } /** * 獲得當前byte[]實際可寫入長度可能是{@link Reader#segmentLength} 或者 {@link Reader#remains} */ private int getWriteLength(long position) throws IOException { if (position == segments + NEGATIVE_ONE && remains > ZERO) { return (int) remains; } return segmentLength; } /** * 關閉流的通用介面方法 * * @param closeable */ private void close(Closeable closeable) { try { if (Objects.nonNull(closeable)) { closeable.close(); } } catch (IOException e) { e.printStackTrace(); } } }