1. 程式人生 > >詳解Hadoop中的LineReader的readLine函式

詳解Hadoop中的LineReader的readLine函式

Hadoop中的LineReader的readLine函式可以說寫的很不錯,這裡結合自己的理解,詳細的添加了註釋。該函式最精彩的一點就是保證了讀取分片的時候不會出現斷行,針對不同的檔案系統,能夠做到正確的判斷行結束的位置,從而準確的讀出文字中的一行內容。

1、如果當前字元是’\r’,雖然不能立即確定是不是讀到行尾了(後面可能跟著’\n’),但是這行的內容已經確定了,就是行結束符的長度沒有確定,這個時候需要標記一下,prevCharCR = true,表示讀到了一個’\r’,再讀下一個字元看是不是’\n’,如果是,根據標記可知這是一個Windows檔案,行結束符長度就是2,若不是,可判定為Mac檔案,行結束符長度就是1了。

這裡有一個特殊的情況,如果’\r’在buffer最後面的一個位置,這時不能確定下一個buffer的開始是一個’\n’,因此需要再載入資料到個buffer(可能已經到了下一個分片)才能確定。

//LineReader類中的部分定義
/*
  private InputStream in;
  private byte[] buffer;
  // the number of bytes of real data in the buffer
  private int bufferLength = 0;
  // the current position in the buffer
  private
int bufferPosn = 0; private static final byte CR = '\r'; /private static final byte LF = '\n'; */ /** * Read one line from the InputStream into the given Text. A line * can be terminated by one of the following: '\n' (LF) , '\r' (CR), * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated * line. * * @param str the object to
store the given line (without newline) * @param maxLineLength the maximum number of bytes to store into str; * the rest of the line is silently discarded. * @param maxBytesToConsume the maximum number of bytes to consume * in this call. This is only a hint, because if the line cross * this threshold, we allow it to happen. It can overshoot * potentially by as much as one buffer length. * * @return the number of bytes read including the (longest) newline * found. * * @throws IOException if the underlying stream throws */ public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { /* We're reading data from in, but the head of the stream may be * already buffered in buffer, so we have several cases: * 1. No newline characters are in the buffer, so we need to copy * everything and read another buffer from the stream. * 2. An unambiguously terminated line is in buffer, so we just * copy to str. * 3. Ambiguously terminated line is in buffer, i.e. buffer ends * in CR. In this case we copy everything up to CR to str, but * we also need to see what follows CR: if it's LF, then we * need consume LF as well, so next call to readLine will read * from after that. * We use a flag prevCharCR to signal if previous character was CR * and, if it happens to be at the end of the buffer, delay * consuming it until we have a chance to look at the char that * follows. */ str.clear(); int txtLength = 0; //tracks str.getLength(), as an optimization int newlineLength = 0; //length of terminating newline boolean prevCharCR = false; //true of prev char was CR long bytesConsumed = 0; do { int startPosn = bufferPosn; //bufferPosn這個類成員變數記錄著讀取buffer的具體位置 if (bufferPosn >= bufferLength) {//如果之前讀取過一個buffer,此時bufferLength=bufferPosn ,或者第一個分片時bufferPosn =bufferLength=0 startPosn = bufferPosn = 0;//重新讀取一個buffer if (prevCharCR)//這裡對應上面說到的特殊情況,如果上一個buffer最後一個字元是'\r',它在上一個buffer中是沒有 ++bytesConsumed; //account for CR from previous read//算進bytesConsumed(為了讀取一行資料從buffer中實際讀取的字元數),所以這裡要算進去 bufferLength = in.read(buffer);//從緩衝區中讀取資料 if (bufferLength <= 0) break; // EOF } for (; bufferPosn < bufferLength; ++bufferPosn) { //從緩衝區讀取的資料中尋找換行符,對應於上面提到的幾種情況 if (buffer[bufferPosn] == LF) { //如果是'\n',確定找到了一行的結束符,看前面的字元是不是'\r',如果是,行結束符長度為2,否則行結束符為1 newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte//讀取位置由當前位置bufferPosn向前進一個,此時bufferPosn指示的位置還沒有做判斷 break;//找到了一行的結束符,跳出迴圈 } if (prevCharCR) { //CR + notLF, we are at notLF//如果上一個位置為'\r',當前位置不是'\n', //那麼也得到了一行的結束符,跳出迴圈,此時bufferPosn指示的位置還沒有做判斷 newlineLength = 1; break; //只要確定了行結束符的長度,newlineLength(初始值為0)就會儲存它, } prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn; if (prevCharCR && newlineLength == 0)//只有在buffer中讀取的所有的字元都無法確定行結束符的長度時才會進去,這時說明讀到buffer結尾也沒有出現'\n','\r\n' --readLength; //CR at the end of the buffer//但是有可能最後一個字元是'\r',此時在上面的迴圈中prevCharCR 便為true,需要讀取下一個buffer才知道後面跟的是不是'\n' bytesConsumed += readLength;//不管那種情況,上面共讀取到多少資料就要把這個資料儲存起來,讀取的總字元數由bytesConsumed 記錄,內容由str在後面追加 int appendLength = readLength - newlineLength;//將需要追加的內容長度算出來(讀取到的長度減去行結束符的長度) if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); txtLength += appendLength; } } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); if (bytesConsumed > (long)Integer.MAX_VALUE)//如果這一行太長,會丟擲異常 throw new IOException("Too many bytes before newline: " + bytesConsumed); return (int)bytesConsumed;//返回讀到的字元數 }