1. 程式人生 > >spark原始碼分析之ReadAheadInputStream

spark原始碼分析之ReadAheadInputStream

概述

ReadAheadInputStream實現了從當前buffer讀取的data耗盡時,切換到另外一個buffer讀取資料,並啟動任務從底層輸入流非同步預讀data,放入耗盡的buffer中。它通過2個buffer來完成——active buffer和read ahead buffer。在呼叫read()方法時會返回active buffer中的資料。而read ahead buffer用於從底層輸入流非同步讀取資料。通過切換這2個buffer,我們可以在active buffer耗盡時開始從read ahead buffer讀取資料,無需阻塞在磁碟IO上。

從底層輸入流非同步讀取資料到read ahead buffer ,相當於寫執行緒。

當前執行緒從read ahead buffer讀取資料,相當於讀執行緒(reader)。

在swap buffer、觸發async reading、獲取async state時需要加鎖。

async read在填滿read ahead buffer才返回可能會增加延遲,所以如果有reader等待資料,還可以新增一個“AtomicBoolean”標誌,以便能夠更早地返回。

成員變數

從概述可以得出,我們至少需要以下成員變數:activeBuffer、readAheadBuffer、underlyingInputStream等,同時還需要用於併發的相關成員變數。

  private ReentrantLock stateChangeLock = new ReentrantLock();

  @GuardedBy("stateChangeLock")

  private ByteBuffer activeBuffer;


  @GuardedBy("stateChangeLock")

  private ByteBuffer readAheadBuffer;

  private final InputStream underlyingInputStream;

  // whether there is a reader waiting for data.
  private AtomicBoolean isWaiting = new AtomicBoolean(false);

  private final ExecutorService executorService =

      ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");

  @GuardedBy("stateChangeLock")
  // true if async read is in progress
  //判斷async read是否完成的條件變數,如果為true,表示仍在進行
  private boolean readInProgress;

  //阻塞等待async read完成的condition例項
  private final Condition asyncReadComplete = stateChangeLock.newCondition();

read方法

@Override

  public int read(byte[] b, int offset, int len) throws IOException {

    if (offset < 0 || len < 0 || len > b.length - offset) {

      throw new IndexOutOfBoundsException();  //前置檢查引數是否正確

    }

    if (len == 0) {

      return 0;

    }



    if (!activeBuffer.hasRemaining()) {  
       
      // No remaining in active buffer - lock and switch to write ahead buffer.
      //如果active buffer耗盡,獲取鎖並轉換到ahead buffer

      stateChangeLock.lock(); 

      try {
        //while判斷async read是否正在進行,如果是則阻塞等待直到async read完成,否則立即返回。
  //因為在反轉read ahead buffer和active buffer後,
 //還會對read ahead buffer執行一次async read操作,所以應該阻塞等待該操作完成。
//如果呼叫該方法時async read操作已經完成,該方法會立即返回。
//如果是第一次呼叫read方法,沒有觸發async read操作,該方法也會立即返回。
        waitForAsyncReadComplete(); 

        if (!readAheadBuffer.hasRemaining()) { 
          // The first read.
          //如果readAheadBuffer中沒有資料,說明是第一次呼叫read方法
          //只有第一次呼叫read方法,才會2個buffer都同時沒資料,才會走這個if方法體的邏輯。
          //第二次及以後,因為async read,所以readAheadBuffer中總是有資料以備用
          readAsync();  //線上程池中啟動新執行緒執行async read 

          waitForAsyncReadComplete(); //阻塞等待直到async read完成

          if (isEndOfStream()) {

            return -1;

          }

        }

        // Swap the newly read read ahead buffer in place of empty active buffer.

        swapBuffers();  //反轉read ahead buffer和active buffer

        // After swapping buffers, trigger another async read for read ahead buffer.

        readAsync();  //反轉2個buffer後,為read ahead buffer執行另一次async read操作

      } finally {

        stateChangeLock.unlock();

      }

    }

    len = Math.min(len, activeBuffer.remaining()); //實際可讀取的位元組數
    
    //從activeBuffer讀取len個位元組到目標位元組陣列b的offset開始處
    activeBuffer.get(b, offset, len);   


    return len;

  }

readAsync方法

/** Read data from underlyingInputStream to readAheadBuffer asynchronously. */

  private void readAsync() throws IOException {

    stateChangeLock.lock();

    final byte[] arr = readAheadBuffer.array(); //獲取readAheadBuffer的backing array

    try {

      if (endOfStream || readInProgress) {
      //如果readInProgress為true,說明執行緒池中已經有執行緒正在執行async read,return結束返回

        return;

      }

      checkReadException();

      readAheadBuffer.position(0); //重置readAheadBuffer

      readAheadBuffer.flip();

      //將readInProgress置為true,表明將線上程池開啟新執行緒執行async read
      readInProgress = true;

    } finally {

      stateChangeLock.unlock();

    }

    executorService.execute(new Runnable() {



      @Override

      public void run() {

        stateChangeLock.lock();

        try {

          if (isClosed) {

            readInProgress = false;

            return;

          }

          // Flip this so that the close method will not close the underlying input stream when we

          // are reading.
          //切換isReading為true,表明一個read ahead task正在執行,所以close方法將不會關閉底層輸入流

          isReading = true;

        } finally {

          stateChangeLock.unlock();

        }



        // Please note that it is safe to release the lock and read into the read ahead buffer

        // because either of following two conditions will hold - 1. The active buffer has

        // data available to read so the reader will not read from the read ahead buffer.

        // 2. This is the first time read is called or the active buffer is exhausted,

        // in that case the reader waits for this async read to complete.

        // So there is no race condition in both the situations.
        //釋放鎖去讀取資料到readAheadBuffer是安全的,因為從readAheadBuffer讀取資料滿足以下條件之一:
        //1、一般情況下active buffer是可用的,所以reader不會從readAheadBuffer讀取資料;
        //2、第一次呼叫read方法,或者active buffer耗盡後,會阻塞等待async read完成;

        int read = 0;

        int off = 0, len = arr.length;

        Throwable exception = null;

        try {

          // try to fill the read ahead buffer.

          // if a reader is waiting, possibly return early.

          do {
            //讀取資料到readAheadBuffer的backing array,只有在填滿backing array或者到達檔案末尾,
//或者當前有執行緒等待async read完成才退出while迴圈
            read = underlyingInputStream.read(arr, off, len);

            if (read <= 0) break; //到達檔案末尾,break退出迴圈

            off += read;

            len -= read;  //backing array的剩餘空間長度

          } while (len > 0 && !isWaiting.get()); //如果backing array未填滿且當前沒有執行緒等待async read完成

        } catch (Throwable ex) {

          exception = ex;

          if (ex instanceof Error) {

            // `readException` may not be reported to the user. Rethrow Error to make sure at least

            // The user can see Error in UncaughtExceptionHandler.

            throw (Error) ex;

          }

        } finally {

          stateChangeLock.lock();

          readAheadBuffer.limit(off);

          if (read < 0 || (exception instanceof EOFException)) {

            endOfStream = true;

          } else if (exception != null) {

            readAborted = true;

            readException = exception;

          }
    
          readInProgress = false; //重置readInProgress,表示執行async read完成
          signalAsyncReadComplete(); //通知正在阻塞等待async read完成的執行緒

          stateChangeLock.unlock();

          closeUnderlyingInputStreamIfNecessary();

        }

      }

    });

  }

swapBuffers方法

/**

   * flip the active and read ahead buffer

   */

  private void swapBuffers() {

    ByteBuffer temp = activeBuffer;

    activeBuffer = readAheadBuffer;

    readAheadBuffer = temp;

  }

waitForAsyncReadComplete方法

while判斷async read是否正在進行,如果是則阻塞等待直到async read完成,否則立即返回。

private void waitForAsyncReadComplete() throws IOException {

    stateChangeLock.lock();

    isWaiting.set(true); //表明當前有reader正在等待資料 

    try {

      // There is only one reader, and one writer, so the writer should signal only once,

      // but a while loop checking the wake up condition is still needed to avoid spurious wakeups.  應該使用while迴圈校驗喚醒條件,從而避免虛假喚醒

      while (readInProgress) {  //readInProgress欄位作為條件變數,如果為true,表示async read仍在進行

        asyncReadComplete.await(); //阻塞等待async read完成

      }

    } catch (InterruptedException e) {

      InterruptedIOException iio = new InterruptedIOException(e.getMessage());

      iio.initCause(e);

      throw iio;

    } finally {

      isWaiting.set(false);

      stateChangeLock.unlock();

    }

    checkReadException();

  }

 signalAsyncReadComplete方法

通知async read執行完成。

 private void signalAsyncReadComplete() {

    stateChangeLock.lock();

    try {

      asyncReadComplete.signalAll();

    } finally {

      stateChangeLock.unlock();

    }

  }