1. 程式人生 > >hadoop原始碼解析之hdfs寫資料全流程分析---客戶端處理

hadoop原始碼解析之hdfs寫資料全流程分析---客戶端處理

DFSOutputStream介紹

DFSOutputStream概況介紹

這一節我們介紹hdfs寫資料過程中,客戶端的處理部分。客戶端的處理主要是用到了DFSOutputStream物件,從名字我們可以看出,這個是對dfs檔案系統輸出流的一個封裝,接下來我們先來詳細瞭解一下用到的幾個重要的類和其中的變數。

DFSOutputStream的主要功能在類的註釋中其實已經說的很清楚了,大家先看下,英文不好,翻譯的可能不太好。



/****************************************************************
 * DFSOutputStream從位元組流建立檔案
 * DFSOutputStream creates files from a stream of bytes.
 *
 * 客戶端寫的資料DFSOutputStream臨時快取了起來。資料被分解了一個個的資料包(DFSPacket),
 * 每個DFSPacket一般是64K大小,一個DFSPacket又包含了若干個塊(chunks),每個chunk一般是512k並且
 * 有一個對應的校驗和。
 * The client application writes data that is cached internally by
 * this stream. Data is broken up into packets, each packet is
 * typically 64K in size. A packet comprises of chunks. Each chunk
 * is typically 512 bytes and has an associated checksum with it.
 *
 * 當一個客戶端程式寫的的資料填充慢了當前的資料包的時候(DFSPacket型別的變數currentPacket),
 * 就會被有順序的放入dataQueue佇列中。DataStreamer執行緒從dataQueue中獲取資料包(packets),
 * 傳送該資料包給資料管道(pipeline)中的第一個datanode, 然後把該資料包從dataQueue中移除,新增到ackQueue。
 * ResponseProcessor會從各個datanode中接收ack確認訊息。
 * 當對於一個DFSPacket的成功的ack確認訊息被所有的datanode接收到了,ResponseProcessor將其從ackQueue列表中移除  
 * When a client application fills up the currentPacket, it is
 * enqueued into dataQueue.  The DataStreamer thread picks up
 * packets from the dataQueue, sends it to the first datanode in
 * the pipeline and moves it from the dataQueue to the ackQueue.
 * The ResponseProcessor receives acks from the datanodes. When an
 * successful ack for a packet is received from all datanodes, the
 * ResponseProcessor removes the corresponding packet from the
 * ackQueue.
 *
 *
 * 在有錯誤發生的時候,所有的未完成的資料包從ackQueue佇列移除,一個新的不包含損壞的datanode的管道將會被建立,
 * DataStreamer執行緒將重新開始從dataQueue獲取資料包傳送。
 * In case of error, all outstanding packets and moved from
 * ackQueue. A new pipeline is setup by eliminating the bad
 * datanode from the original pipeline. The DataStreamer now
 * starts sending packets from the dataQueue.
****************************************************************/
@InterfaceAudience.Private public class DFSOutputStream extends FSOutputSummer implements Syncable, CanSetDropBehind { }

DFSOutputStream重要的變數

最重要的兩個佇列,dataQueue和ackQueue,這兩個佇列都是典型的生產者、消費者模式,對於dataQueue來說,生產者是客戶端,消費者是DataStreamer,對於ackQueue來說,生產者是DataStreamer,消費者是ResponseProcessor


  /**
   * dataQueue和ackQueue是兩個非常重要的變數,他們是儲存了DFSPacket物件的連結串列。
   * dataQueue列表用於儲存待發送的資料包,客戶端寫入的資料,先臨時存到這個佇列裡。
   * ackQueue是回覆佇列,從datanode收到回覆訊息之後,存到這裡佇列裡。
   * 
   */
// both dataQueue and ackQueue are protected by dataQueue lock private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>(); private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>(); private DFSPacket currentPacket = null;//當前正在處理的資料包 private
DataStreamer streamer; private long currentSeqno = 0; private long lastQueuedSeqno = -1; private long lastAckedSeqno = -1; private long bytesCurBlock = 0; // bytes written in current block 當前的資料塊有多少個位元組 private int packetSize = 0; // write packet size, not including the header. private int chunksPerPacket = 0;

資料處理執行緒類DataStreamer

DataStreamer是用於處理資料的核心類,我們看下注釋中的解釋


  /**
   *  DataStreamer負責往管道中的datanodes傳送資料包, 從namenode中獲取塊的位置資訊和blockid,然後開始
   *  將資料包傳送到datanode的管道。
   *  每個包都有一個序列號。
   *  當所有的資料包都發送完畢並且都接收到回覆訊息之後,DataStreamer關閉當前的block
   * The DataStreamer class is responsible for sending data packets to the
   * datanodes in the pipeline. It retrieves a new blockid and block locations
   * from the namenode, and starts streaming packets to the pipeline of
   * Datanodes. Every packet has a sequence number associated with
   * it. When all the packets for a block are sent out and acks for each
   * if them are received, the DataStreamer closes the current block.
   */
  class DataStreamer extends Daemon {

    private volatile boolean streamerClosed = false;
    private volatile ExtendedBlock block; // its length is number of bytes acked
    private Token<BlockTokenIdentifier> accessToken;
    private DataOutputStream blockStream;//傳送資料的輸出流
    private DataInputStream blockReplyStream;//輸入流,即接收ack訊息的流
    private ResponseProcessor response = null;
    private volatile DatanodeInfo[] nodes = null; // list of targets for current block 將要傳送的datanode的集合
    private volatile StorageType[] storageTypes = null;
    private volatile String[] storageIDs = null;

    ......................  

  }

響應處理類ResponseProcessor

ResponseProcessor是DataStreamer的子類,用於處理接收到的ack資料


    //處理從datanode返回的相應資訊,當相應到達的時候,將DFSPacket從ackQueue移除
    // Processes responses from the datanodes.  A packet is removed
    // from the ackQueue when its response arrives.
    //
    private class ResponseProcessor extends Daemon {}

處理流程

客戶端發資料到dataQueue

建立檔案之後返回一個FSDataOutputStream物件,呼叫write方法寫資料,最終呼叫了org.apache.hadoop.fs.FSOutputSummer.write(byte[], int, int);

write呼叫write1()方法迴圈寫入len長度的資料,當寫滿一個數據塊的時候,呼叫抽象方法writeChunk來寫入資料,具體的實現則是org.apache.hadoop.hdfs.DFSOutputStream類中的同名方法,

具體的寫入是在writeChunkImpl方法中,具體的程式碼如下:


  private synchronized void writeChunkImpl(byte[] b, int offset, int len,
          byte[] checksum, int ckoff, int cklen) throws IOException {
    dfsClient.checkOpen();
    checkClosed();

    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    if (cklen != 0 && cklen != getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            getChecksumSize() + " but found to be " + cklen);
    }

    if (currentPacket == null) {
      currentPacket = createPacket(packetSize, chunksPerPacket, 
          bytesCurBlock, currentSeqno++, false);
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.getSeqno() +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      }
    }

    currentPacket.writeChecksum(checksum, ckoff, cklen);
    currentPacket.writeData(b, offset, len);
    currentPacket.incNumChunks();
    bytesCurBlock += len;

    // If packet is full, enqueue it for transmission
    //當一個DFSPacket寫滿了,則呼叫waitAndQueueCurrentPacket將其加入
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        bytesCurBlock == blockSize) {
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.getSeqno() +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      }
      waitAndQueueCurrentPacket();

      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
        appendChunk = false;
        resetChecksumBufSize();
      }

      if (!appendChunk) {
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        computePacketChunkSize(psize, bytesPerChecksum);
      }
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //
      if (bytesCurBlock == blockSize) {
        currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
        currentPacket.setSyncBlock(shouldSyncBlock);
        waitAndQueueCurrentPacket();
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      }
    }
  }

當packet滿了的時候,呼叫waitAndQueueCurrentPacket方法,將資料包放入dataQueue佇列中,waitAndQueueCurrentPacket方法開始的時候會進行packet的大小的判斷,當dataQueue和ackQueue的值大於writeMaxPackets(預設80)時候,就等地,直到有足夠的空間.


  private void waitAndQueueCurrentPacket() throws IOException {
    synchronized (dataQueue) {
      try {
      // If queue is full, then wait till we have enough space
        boolean firstWait = true;
        try {
         //當大小不夠的時候就wait
          while (!isClosed() && dataQueue.size() + ackQueue.size() >
              dfsClient.getConf().writeMaxPackets) {
                    ..................
            try {
              dataQueue.wait();
            } catch (InterruptedException e) {
                ..............
            }
          }
        } finally {
         ...............
        }
        checkClosed();
        //入佇列
        queueCurrentPacket();
      } catch (ClosedChannelException e) {
      }
    }
  }

最後呼叫了queueCurrentPacket方法,將packet真正的放入了佇列中


  private void queueCurrentPacket() {
    synchronized (dataQueue) {
      if (currentPacket == null) return;
      currentPacket.addTraceParent(Trace.currentSpan());
      dataQueue.addLast(currentPacket);//將資料包放到了佇列的尾部
      lastQueuedSeqno = currentPacket.getSeqno();
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
      }
      currentPacket = null;//當前packet置空,用於下一個資料包的寫入
      dataQueue.notifyAll();//喚醒所有在dataQueue上的執行緒去處理
    }
  }

最終通過方法queueCurrentPacket將DFSPacket寫入dataQueue,即dataQueue.addLast(currentPacket);

並通過dataQueue.notifyAll();喚醒dataQueue上面等待的所有執行緒來處理資料


  private void queueCurrentPacket() {
    synchronized (dataQueue) {
      if (currentPacket == null) return;
      currentPacket.addTraceParent(Trace.currentSpan());
      dataQueue.addLast(currentPacket);
      lastQueuedSeqno = currentPacket.getSeqno();
      if (DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
      }
      currentPacket = null;
      dataQueue.notifyAll();
    }
  }

DataStreamer處理dataQueue中的資料

DataStreamer處理髮送資料的核心邏輯在run方法中。

處理錯誤

在開始的時候,首先判斷是否有錯誤

具體的處理方法是private的processDatanodeError方法,如果發現了錯誤,就講ack佇列裡的packet全部放回dataQueue中,然後建立一個新的流重新發送資料。

建立輸出資料流,傳送資料

通過nextBlockOutputStream()方法建立到datanode的輸出流。

向namenode申請資料塊

locateFollowingBlock方法申請資料塊,具體的程式碼是
dfsClient.namenode.addBlock(src, dfsClient.clientName,
block, excludedNodes, fileId, favoredNodes);

dfsClient拿到namenode的代理,然後通過addBlock方法來申請新的資料塊,addBlock方法申請資料塊的時候還會提交上一個塊,也就是引數中的block,即上一個資料塊。
excludedNodes引數表示了申請資料塊的時候需要排除的datanode列表,
favoredNodes引數表示了優先選擇的datanode列表。

連線到第一個datanode

成功申請了資料塊之後,會返回一個LocatedBlock物件,裡面包含了datanode的相關資訊。

然後通過createBlockOutputStream方法連線到第一個datanode,具體就是new了一個DataOutputStream物件來連線到datanode。 然後構造了一個Sender物件,來向DataNode傳送操作碼是80的寫block的輸出流,
傳送到datanode的資料,datanode通過DataXceiver接收處理

 new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
      dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
      nodes.length, block.getNumBytes(), bytesSent, newGS,
      checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
    (targetPinnings == null ? false :targetPinnings[0]), targetPinnings);

申請block,然後建立到datanode的連線,是在一個do while迴圈中做的,如果失敗了會嘗試重新連線,預設三次。

建立管道

nextBlockOutputStream方法成功的返回了datanode的資訊之後,setPipeline方法建立到datanode的管道資訊,這個方法比較簡單,就是用申請到的datanode給相應的變數賦值。


    private void setPipeline(LocatedBlock lb) {
      setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
    }
    private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
        String[] storageIDs) {
      this.nodes = nodes;
      this.storageTypes = storageTypes;
      this.storageIDs = storageIDs;
    }

初始化資料流

initDataStreaming方法主要就是根據datanode列表建立ResponseProcessor物件,並且調動start方法啟動,並將狀態設定為DATA_STREAMING


    /**
     * Initialize for data streaming
     */
    private void initDataStreaming() {
      this.setName("DataStreamer for file " + src +
          " block " + block);
      response = new ResponseProcessor(nodes);
      response.start();
      stage = BlockConstructionStage.DATA_STREAMING;
    }

傳送資料包

一切準備就緒之後,從dataQueue頭部拿出一個packet,放入ackQueue的尾部,並且喚醒在dataQueue上等待的所有執行緒,通過 one.writeTo(blockStream);傳送資料包。


          // send the packet
          Span span = null;
          synchronized (dataQueue) {
            // move packet from dataQueue to ackQueue
            if (!one.isHeartbeatPacket()) {
              span = scope.detach();
              one.setTraceSpan(span);
              dataQueue.removeFirst();
              ackQueue.addLast(one);
              dataQueue.notifyAll();
            }
          }

          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("DataStreamer block " + block +
                " sending packet " + one);
          }

          // write out data to remote datanode
          TraceScope writeScope = Trace.startSpan("writeTo", span);
          try {
            one.writeTo(blockStream);
            blockStream.flush();   
          } catch (IOException e) {
            // HDFS-3398 treat primary DN is down since client is unable to 
            // write to primary DN. If a failed or restarting node has already
            // been recorded by the responder, the following call will have no 
            // effect. Pipeline recovery can handle only one node error at a
            // time. If the primary node fails again during the recovery, it
            // will be taken out then.
            tryMarkPrimaryDatanodeFailed();
            throw e;
          } finally {
            writeScope.close();
          }

關閉資料流

當dataQueue中的所有資料塊都發送完畢,並且確保都收到ack訊息之後,客戶端的寫入操作就結束了,呼叫endBlock方法來關閉相應的流,


          // Is this block full?
          if (one.isLastPacketInBlock()) {
            // wait for the close packet has been acked
            synchronized (dataQueue) {
              while (!streamerClosed && !hasError && 
                  ackQueue.size() != 0 && dfsClient.clientRunning) {
                dataQueue.wait(1000);// wait for acks to arrive from datanodes
              }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }

            endBlock();
          }


關閉響應,關閉資料流,將管道置空,狀態變成PIPELINE_SETUP_CREATE

    private void endBlock() {
      if(DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Closing old block " + block);
      }
      this.setName("DataStreamer for file " + src);
      closeResponder();
      closeStream();
      setPipeline(null, null, null);
      stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    }

ResponseProcessor處理回覆訊息

這塊邏輯相對比較簡單


      @Override
      public void run() {

        setName("ResponseProcessor for block " + block);
        PipelineAck ack = new PipelineAck();

        TraceScope scope = NullScope.INSTANCE;
        while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
          // process responses from datanodes.
          try {
            //從ack佇列裡讀取packet
            // read an ack from the pipeline
            long begin = Time.monotonicNow();
            ack.readFields(blockReplyStream);
             ..............


            //一切都處理成功之後,將其從ack佇列中刪除
            synchronized (dataQueue) {
              scope = Trace.continueSpan(one.getTraceSpan());
              one.setTraceSpan(null);
              lastAckedSeqno = seqno;
              pipelineRecoveryCount = 0;
              ackQueue.removeFirst();
              dataQueue.notifyAll();

              one.releaseBuffer(byteArrayManager);
            }
          } catch (Exception e) {
          //如果遇到了異常,並沒有立即處理,而是放到了一個AtomicReference型別的物件中,
            if (!responderClosed) {
              if (e instanceof IOException) {
                setLastException((IOException)e);
              }
                ............
            }
          } finally {
            scope.close();
          }
        }
      }