詳解Hadoop中的LineReader的readLine函式
阿新 • • 發佈:2019-01-02
Hadoop中的LineReader的readLine函式可以說寫的很不錯,這裡結合自己的理解,詳細的添加了註釋。該函式最精彩的一點就是保證了讀取分片的時候不會出現斷行,針對不同的檔案系統,能夠做到正確的判斷行結束的位置,從而準確的讀出文字中的一行內容。
1、如果當前字元是’\r’,雖然不能立即確定是不是讀到行尾了(後面可能跟著’\n’),但是這行的內容已經確定了,就是行結束符的長度沒有確定,這個時候需要標記一下,prevCharCR = true,表示讀到了一個’\r’,再讀下一個字元看是不是’\n’,如果是,根據標記可知這是一個Windows檔案,行結束符長度就是2,若不是,可判定為Mac檔案,行結束符長度就是1了。
這裡有一個特殊的情況,如果’\r’在buffer最後面的一個位置,這時不能確定下一個buffer的開始是一個’\n’,因此需要再載入資料到個buffer(可能已經到了下一個分片)才能確定。
//LineReader類中的部分定義
/*
private InputStream in;
private byte[] buffer;
// the number of bytes of real data in the buffer
private int bufferLength = 0;
// the current position in the buffer
private int bufferPosn = 0;
private static final byte CR = '\r';
/private static final byte LF = '\n';
*/
/**
* Read one line from the InputStream into the given Text. A line
* can be terminated by one of the following: '\n' (LF) , '\r' (CR),
* or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated
* line.
*
* @param str the object to store the given line (without newline)
* @param maxLineLength the maximum number of bytes to store into str;
* the rest of the line is silently discarded.
* @param maxBytesToConsume the maximum number of bytes to consume
* in this call. This is only a hint, because if the line cross
* this threshold, we allow it to happen. It can overshoot
* potentially by as much as one buffer length.
*
* @return the number of bytes read including the (longest) newline
* found.
*
* @throws IOException if the underlying stream throws
*/
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
/* We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
* everything and read another buffer from the stream.
* 2. An unambiguously terminated line is in buffer, so we just
* copy to str.
* 3. Ambiguously terminated line is in buffer, i.e. buffer ends
* in CR. In this case we copy everything up to CR to str, but
* we also need to see what follows CR: if it's LF, then we
* need consume LF as well, so next call to readLine will read
* from after that.
* We use a flag prevCharCR to signal if previous character was CR
* and, if it happens to be at the end of the buffer, delay
* consuming it until we have a chance to look at the char that
* follows.
*/
str.clear();
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //length of terminating newline
boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //bufferPosn這個類成員變數記錄著讀取buffer的具體位置
if (bufferPosn >= bufferLength) {//如果之前讀取過一個buffer,此時bufferLength=bufferPosn ,或者第一個分片時bufferPosn =bufferLength=0
startPosn = bufferPosn = 0;//重新讀取一個buffer
if (prevCharCR)//這裡對應上面說到的特殊情況,如果上一個buffer最後一個字元是'\r',它在上一個buffer中是沒有
++bytesConsumed; //account for CR from previous read//算進bytesConsumed(為了讀取一行資料從buffer中實際讀取的字元數),所以這裡要算進去
bufferLength = in.read(buffer);//從緩衝區中讀取資料
if (bufferLength <= 0)
break; // EOF
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //從緩衝區讀取的資料中尋找換行符,對應於上面提到的幾種情況
if (buffer[bufferPosn] == LF) { //如果是'\n',確定找到了一行的結束符,看前面的字元是不是'\r',如果是,行結束符長度為2,否則行結束符為1
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte//讀取位置由當前位置bufferPosn向前進一個,此時bufferPosn指示的位置還沒有做判斷
break;//找到了一行的結束符,跳出迴圈
}
if (prevCharCR) { //CR + notLF, we are at notLF//如果上一個位置為'\r',當前位置不是'\n',
//那麼也得到了一行的結束符,跳出迴圈,此時bufferPosn指示的位置還沒有做判斷
newlineLength = 1;
break; //只要確定了行結束符的長度,newlineLength(初始值為0)就會儲存它,
}
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn;
if (prevCharCR && newlineLength == 0)//只有在buffer中讀取的所有的字元都無法確定行結束符的長度時才會進去,這時說明讀到buffer結尾也沒有出現'\n','\r\n'
--readLength; //CR at the end of the buffer//但是有可能最後一個字元是'\r',此時在上面的迴圈中prevCharCR 便為true,需要讀取下一個buffer才知道後面跟的是不是'\n'
bytesConsumed += readLength;//不管那種情況,上面共讀取到多少資料就要把這個資料儲存起來,讀取的總字元數由bytesConsumed 記錄,內容由str在後面追加
int appendLength = readLength - newlineLength;//將需要追加的內容長度算出來(讀取到的長度減去行結束符的長度)
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > (long)Integer.MAX_VALUE)//如果這一行太長,會丟擲異常
throw new IOException("Too many bytes before newline: " + bytesConsumed);
return (int)bytesConsumed;//返回讀到的字元數
}