1. 程式人生 > >hadoop初讀--寫資料時的資料流管道雙向機制

hadoop初讀--寫資料時的資料流管道雙向機制

為了保證分散式資料的一致性和完整性,hadoop寫資料流時使用了寫資料和應答的雙向機制.
這裡寫圖片描述


這裡著重說明的是反向應答其實是分為兩部分:
1.寫請求答應:
     正常情況下,這個應答會從管道的最後一個數據節點開始,往客戶端方向傳送,管道上的每一個節點都會等待這個應答,收到應答後,才會開始接受資料,也就是說,客戶端會等待這個應答,然後才開始傳送資料。這個應答是同步的,即直到收到應答後才會進行下一步。應答包的結構,只有兩個欄位:返回碼和附加資訊,當返回碼是OP_STATUS_ERROR時,附件資訊提供了流中第一個出錯的資料節點地址資訊.

2.寫資料應答:
     客戶端通過資料流管道傳送資料,管道上的資料節點會在接受資料並寫磁碟後,需要給上游節點發送確認包,以清除緩衝區的內容。確認包從最後一個數據節點發送,逆流而上,直達資料來源。應答包對應的類是:
DataTransferProtocol.PipelineAck.

下面貼下略簡程式碼,以註釋方式說明幾個本人以為的需要關注的點:

1.寫請求應答處理的程式碼:

/**
* 1.當前節點的寫請求應答處理
*/
if (targets.length > 0) {
        InetSocketAddress mirrorTarget = null;
        mirrorNode = targets[0].getName();
        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
        mirrorSock = datanode.newSocket();
        try
{ int timeoutValue = datanode.socketTimeout + (HdfsConstants.READ_TIMEOUT_EXTENSION * numTargets); int writeTimeout = datanode.socketWriteTimeout + (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets); NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE); mirrorOut = new
DataOutputStream( new BufferedOutputStream( NetUtils.getOutputStream(mirrorSock, writeTimeout), SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock)); mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK ); mirrorOut.writeLong( block.getBlockId() ); mirrorOut.writeLong( block.getGenerationStamp() ); mirrorOut.writeInt( pipelineSize ); mirrorOut.writeBoolean( isRecovery ); Text.writeString( mirrorOut, client ); mirrorOut.writeBoolean(hasSrcDataNode); if (hasSrcDataNode) { srcDataNode.write(mirrorOut); } mirrorOut.writeInt( targets.length - 1 ); for ( int i = 1; i < targets.length; i++ ) { targets[i].write( mirrorOut ); } accessToken.write(mirrorOut); blockReceiver.writeChecksumHeader(mirrorOut); mirrorOut.flush(); if (client.length() != 0) { /** * mirrorOut.flush(); * 後同步等待應答mirrorIn.readShort() */ mirrorInStatus = mirrorIn.readShort(); firstBadLink = Text.readString(mirrorIn); if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) { LOG.info("Datanode " + targets.length + " got response for connect ack " + " from downstream datanode with firstbadlink as " + firstBadLink); } } } catch (IOException e) { /** * 在建立mirrorSock(mirrorOut和mirrorIn)時,如果出現IOException, 返回DataTransferProtocol.OP_STATUS_ERROR的返回碼和包含第一個出錯的資料節點資訊的附加資訊 */ /** * 這裡的狀態和mirrorNode才是真正對應當前資料節點的. */ if (client.length() != 0) { replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR); Text.writeString(replyOut, mirrorNode); replyOut.flush(); } IOUtils.closeStream(mirrorOut); mirrorOut = null; IOUtils.closeStream(mirrorIn); mirrorIn = null; IOUtils.closeSocket(mirrorSock); mirrorSock = null; if (client.length() > 0) { throw e; } else { LOG.info(datanode.dnRegistration + ":Exception transfering block " +block + " to mirror " + mirrorNode +". continuing without the mirror.\n" + StringUtils.stringifyException(e)); } } }
/**
* 2.下一個節點反饋的寫請求應答處理
*/
if (client.length() != 0) {
        if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
          LOG.info("Datanode " + targets.length +
                   " forwarding connect ack to upstream firstbadlink is " +
                   firstBadLink);
        }
        /**
         * 其實mirrorInStatus和firstBadLink是下一個資料節點的狀態(是從mirrorIn裡讀取的,見388-390行程式碼)
         */
        replyOut.writeShort(mirrorInStatus);
        Text.writeString(replyOut, firstBadLink);
        replyOut.flush();
      }

2.寫資料應答處理的程式碼:

/**
* 主方法在PacketResponder.run()
*/
.............
 short[] replies = null;
            if (mirrorError) { // no ack is read
                replies = new short[2];
                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
                replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
            } else {//構造成功應答
     /**
      * ack.getNumOfReplies():收集下游資料額節點處理結果
     */
            short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
                replies = new short[1+ackLen];
                replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
                for (int i=0; i<ackLen; i++) {
                //加入下游應答
                    replies[i+1] = ack.getReply(i);
                }
            }
          PipelineAck replyAck = new       PipelineAck(expected, replies);
            replyAck.write(replyOut);//往上游傳送應答
            replyOut.flush();
            if (LOG.isDebugEnabled()) {
              LOG.debug("PacketResponder " + numTargets +" for block " + block +" responded an ack: " + replyAck);
            }
 ...........