spark原始碼分析之ReadAheadInputStream
阿新 • • 發佈:2018-12-25
概述
ReadAheadInputStream實現了從當前buffer讀取的data耗盡時,切換到另外一個buffer讀取資料,並啟動任務從底層輸入流非同步預讀data,放入耗盡的buffer中。它通過2個buffer來完成——active buffer和read ahead buffer。在呼叫read()方法時會返回active buffer中的資料。而read ahead buffer用於從底層輸入流非同步讀取資料。通過切換這2個buffer,我們可以在active buffer耗盡時開始從read ahead buffer讀取資料,無需阻塞在磁碟IO上。
從底層輸入流非同步讀取資料到read ahead buffer ,相當於寫執行緒。
當前執行緒從read ahead buffer讀取資料,相當於讀執行緒(reader)。
在swap buffer、觸發async reading、獲取async state時需要加鎖。
async read在填滿read ahead buffer才返回可能會增加延遲,所以如果有reader等待資料,還可以新增一個“AtomicBoolean”標誌,以便能夠更早地返回。
成員變數
從概述可以得出,我們至少需要以下成員變數:activeBuffer、readAheadBuffer、underlyingInputStream等,同時還需要用於併發的相關成員變數。
private ReentrantLock stateChangeLock = new ReentrantLock(); @GuardedBy("stateChangeLock") private ByteBuffer activeBuffer; @GuardedBy("stateChangeLock") private ByteBuffer readAheadBuffer; private final InputStream underlyingInputStream; // whether there is a reader waiting for data. private AtomicBoolean isWaiting = new AtomicBoolean(false); private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); @GuardedBy("stateChangeLock") // true if async read is in progress //判斷async read是否完成的條件變數,如果為true,表示仍在進行 private boolean readInProgress; //阻塞等待async read完成的condition例項 private final Condition asyncReadComplete = stateChangeLock.newCondition();
read方法
@Override public int read(byte[] b, int offset, int len) throws IOException { if (offset < 0 || len < 0 || len > b.length - offset) { throw new IndexOutOfBoundsException(); //前置檢查引數是否正確 } if (len == 0) { return 0; } if (!activeBuffer.hasRemaining()) { // No remaining in active buffer - lock and switch to write ahead buffer. //如果active buffer耗盡,獲取鎖並轉換到ahead buffer stateChangeLock.lock(); try { //while判斷async read是否正在進行,如果是則阻塞等待直到async read完成,否則立即返回。 //因為在反轉read ahead buffer和active buffer後, //還會對read ahead buffer執行一次async read操作,所以應該阻塞等待該操作完成。 //如果呼叫該方法時async read操作已經完成,該方法會立即返回。 //如果是第一次呼叫read方法,沒有觸發async read操作,該方法也會立即返回。 waitForAsyncReadComplete(); if (!readAheadBuffer.hasRemaining()) { // The first read. //如果readAheadBuffer中沒有資料,說明是第一次呼叫read方法 //只有第一次呼叫read方法,才會2個buffer都同時沒資料,才會走這個if方法體的邏輯。 //第二次及以後,因為async read,所以readAheadBuffer中總是有資料以備用 readAsync(); //線上程池中啟動新執行緒執行async read waitForAsyncReadComplete(); //阻塞等待直到async read完成 if (isEndOfStream()) { return -1; } } // Swap the newly read read ahead buffer in place of empty active buffer. swapBuffers(); //反轉read ahead buffer和active buffer // After swapping buffers, trigger another async read for read ahead buffer. readAsync(); //反轉2個buffer後,為read ahead buffer執行另一次async read操作 } finally { stateChangeLock.unlock(); } } len = Math.min(len, activeBuffer.remaining()); //實際可讀取的位元組數 //從activeBuffer讀取len個位元組到目標位元組陣列b的offset開始處 activeBuffer.get(b, offset, len); return len; }
readAsync方法
/** Read data from underlyingInputStream to readAheadBuffer asynchronously. */
private void readAsync() throws IOException {
stateChangeLock.lock();
final byte[] arr = readAheadBuffer.array(); //獲取readAheadBuffer的backing array
try {
if (endOfStream || readInProgress) {
//如果readInProgress為true,說明執行緒池中已經有執行緒正在執行async read,return結束返回
return;
}
checkReadException();
readAheadBuffer.position(0); //重置readAheadBuffer
readAheadBuffer.flip();
//將readInProgress置為true,表明將線上程池開啟新執行緒執行async read
readInProgress = true;
} finally {
stateChangeLock.unlock();
}
executorService.execute(new Runnable() {
@Override
public void run() {
stateChangeLock.lock();
try {
if (isClosed) {
readInProgress = false;
return;
}
// Flip this so that the close method will not close the underlying input stream when we
// are reading.
//切換isReading為true,表明一個read ahead task正在執行,所以close方法將不會關閉底層輸入流
isReading = true;
} finally {
stateChangeLock.unlock();
}
// Please note that it is safe to release the lock and read into the read ahead buffer
// because either of following two conditions will hold - 1. The active buffer has
// data available to read so the reader will not read from the read ahead buffer.
// 2. This is the first time read is called or the active buffer is exhausted,
// in that case the reader waits for this async read to complete.
// So there is no race condition in both the situations.
//釋放鎖去讀取資料到readAheadBuffer是安全的,因為從readAheadBuffer讀取資料滿足以下條件之一:
//1、一般情況下active buffer是可用的,所以reader不會從readAheadBuffer讀取資料;
//2、第一次呼叫read方法,或者active buffer耗盡後,會阻塞等待async read完成;
int read = 0;
int off = 0, len = arr.length;
Throwable exception = null;
try {
// try to fill the read ahead buffer.
// if a reader is waiting, possibly return early.
do {
//讀取資料到readAheadBuffer的backing array,只有在填滿backing array或者到達檔案末尾,
//或者當前有執行緒等待async read完成才退出while迴圈
read = underlyingInputStream.read(arr, off, len);
if (read <= 0) break; //到達檔案末尾,break退出迴圈
off += read;
len -= read; //backing array的剩餘空間長度
} while (len > 0 && !isWaiting.get()); //如果backing array未填滿且當前沒有執行緒等待async read完成
} catch (Throwable ex) {
exception = ex;
if (ex instanceof Error) {
// `readException` may not be reported to the user. Rethrow Error to make sure at least
// The user can see Error in UncaughtExceptionHandler.
throw (Error) ex;
}
} finally {
stateChangeLock.lock();
readAheadBuffer.limit(off);
if (read < 0 || (exception instanceof EOFException)) {
endOfStream = true;
} else if (exception != null) {
readAborted = true;
readException = exception;
}
readInProgress = false; //重置readInProgress,表示執行async read完成
signalAsyncReadComplete(); //通知正在阻塞等待async read完成的執行緒
stateChangeLock.unlock();
closeUnderlyingInputStreamIfNecessary();
}
}
});
}
swapBuffers方法
/**
* flip the active and read ahead buffer
*/
private void swapBuffers() {
ByteBuffer temp = activeBuffer;
activeBuffer = readAheadBuffer;
readAheadBuffer = temp;
}
waitForAsyncReadComplete方法
while判斷async read是否正在進行,如果是則阻塞等待直到async read完成,否則立即返回。
private void waitForAsyncReadComplete() throws IOException {
stateChangeLock.lock();
isWaiting.set(true); //表明當前有reader正在等待資料
try {
// There is only one reader, and one writer, so the writer should signal only once,
// but a while loop checking the wake up condition is still needed to avoid spurious wakeups. 應該使用while迴圈校驗喚醒條件,從而避免虛假喚醒
while (readInProgress) { //readInProgress欄位作為條件變數,如果為true,表示async read仍在進行
asyncReadComplete.await(); //阻塞等待async read完成
}
} catch (InterruptedException e) {
InterruptedIOException iio = new InterruptedIOException(e.getMessage());
iio.initCause(e);
throw iio;
} finally {
isWaiting.set(false);
stateChangeLock.unlock();
}
checkReadException();
}
signalAsyncReadComplete方法
通知async read執行完成。
private void signalAsyncReadComplete() {
stateChangeLock.lock();
try {
asyncReadComplete.signalAll();
} finally {
stateChangeLock.unlock();
}
}