1. 程式人生 > >研究MapReduce原始碼之實現自定義LineRecordReader完成多行讀取檔案內容

研究MapReduce原始碼之實現自定義LineRecordReader完成多行讀取檔案內容

TextInputFormat是Hadoop預設的資料輸入格式,但是它只能一行一行的讀記錄,如果要讀取多行怎麼辦?
很簡單 自己寫一個輸入格式,然後寫一個對應的Recordreader就可以了,但是要實現確不是這麼簡單的

首先看看TextInputFormat是怎麼實現一行一行讀取的

大家看一看原始碼

public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter"
); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes); } //這個對檔案做壓縮用的 @Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new
CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; } }

我們只要看第一個createRecordReader方法即可,從原始碼分析可知,它new了一個LineRecordReader,那麼我們再來看看LineRecordReader的原始碼,看看這小子的內部世界

public class
LineRecordReader extends RecordReader<LongWritable, Text> { private static final Log LOG = LogFactory.getLog(LineRecordReader.class); public static final String MAX_LINE_LENGTH = "mapreduce.input.linerecordreader.line.maxlength"; private long start; private long pos; private long end; private SplitLineReader in; private FSDataInputStream fileIn; private Seekable filePosition; private int maxLineLength; private LongWritable key; private Text value; private boolean isCompressedInput; private Decompressor decompressor; private byte[] recordDelimiterBytes; public LineRecordReader() { } public LineRecordReader(byte[] recordDelimiter) { this.recordDelimiterBytes = recordDelimiter; } public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); // open the file and seek to the start of the split final FileSystem fs = file.getFileSystem(job); fileIn = fs.open(file); CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); if (null!=codec) { isCompressedInput = true; decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { final SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream( fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK); in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); start = cIn.getAdjustedStart(); end = cIn.getAdjustedEnd(); filePosition = cIn; } else { in = new SplitLineReader(codec.createInputStream(fileIn, decompressor), job, this.recordDelimiterBytes); filePosition = fileIn; } } else { fileIn.seek(start); in = new UncompressedSplitLineReader( fileIn, job, this.recordDelimiterBytes, split.getLength()); filePosition = fileIn; } // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; } private int maxBytesToConsume(long pos) { return isCompressedInput ? Integer.MAX_VALUE : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } private long getFilePosition() throws IOException { long retVal; if (isCompressedInput && null != filePosition) { retVal = filePosition.getPos(); } else { retVal = pos; } return retVal; } private int skipUtfByteOrderMark() throws IOException { // Strip BOM(Byte Order Mark) // Text only support UTF-8, we only need to check UTF-8 BOM // (0xEF,0xBB,0xBF) at the start of the text stream. int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength, Integer.MAX_VALUE); int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos)); // Even we read 3 extra bytes for the first line, // we won't alter existing behavior (no backwards incompat issue). // Because the newSize is less than maxLineLength and // the number of bytes copied to Text is always no more than newSize. // If the return size from readLine is not less than maxLineLength, // we will discard the current line and read the next line. pos += newSize; int textLength = value.getLength(); byte[] textBytes = value.getBytes(); if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) && (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) { // find UTF-8 BOM, strip it. LOG.info("Found UTF-8 BOM and skipped it"); textLength -= 3; newSize -= 3; if (textLength > 0) { // It may work to use the same buffer and not do the copyBytes textBytes = value.copyBytes(); value.set(textBytes, 3, textLength); } else { value.clear(); } } return newSize; } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0) { newSize = skipUtfByteOrderMark(); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if ((newSize == 0) || (newSize < maxLineLength)) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start)); } } public synchronized void close() throws IOException { try { if (in != null) { in.close(); } } finally { if (decompressor != null) { CodecPool.returnDecompressor(decompressor); } } } }

(裡面96-97行
in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes);

108-109行
in = new CompressedSplitLineReader(cIn, job,this.recordDelimiterBytes);

這是用來壓縮的,大家不用管,知道有這回事就行了
其實這兩個類都繼承自SplitLineReader,是與壓縮有關的,後面我們自定義的時候不用改,貼上複製過來就行。

從上面發現了一個問題,看原始碼的第57行

private SplitLineReader in;

它引入了一個SplitLineReader 類,用這個小子來讀取每一行,不信?你看原始碼的182-197行,如下(我的基於2.6.4版本的原始碼,不同的版本程式碼應該差別不大)

 while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        newSize = skipUtfByteOrderMark();
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }

發現沒有 ===》 newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));

它用了SplitLineReader 裡面的一個方法readLine來讀取,所以我們就得繼續跟蹤去看看SplitLineReader 這個小子的廬山真面目,下面看SplitLineReader 的原始碼

public class SplitLineReader extends org.apache.hadoop.util.LineReader {
  public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
    super(in, recordDelimiterBytes);
  }

  public SplitLineReader(InputStream in, Configuration conf,
      byte[] recordDelimiterBytes) throws IOException {
    super(in, conf, recordDelimiterBytes);
  }

  public boolean needAdditionalRecordAfterSplit() {
    return false;
  }
}

發現這傢伙繼承自LineReader(快接近我們的目標了,堅持看下去),發現這小子裡面根本就沒有readLine方法,大家是不是覺得我在忽悠大家,哈哈,我沒有忽悠大家,它原始碼裡面確實沒有,但是但是,它可是繼承了LineReader這個類,說不定他的父類LineReader有了,好,不信我們去看看LineReader
繼續跟蹤到LineReader的原始碼

public class LineReader implements Closeable {
  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
  private int bufferSize = DEFAULT_BUFFER_SIZE;
  private InputStream in;
  private byte[] buffer;
  // the number of bytes of real data in the buffer
  private int bufferLength = 0;
  // the current position in the buffer
  private int bufferPosn = 0;

  private static final byte CR = '\r';
  private static final byte LF = '\n';

  // The line delimiter
  private final byte[] recordDelimiterBytes;

  /**
   * Create a line reader that reads from the given stream using the
   * default buffer-size (64k).
   * @param in The input stream
   * @throws IOException
   */
  public LineReader(InputStream in) {
    this(in, DEFAULT_BUFFER_SIZE);
  }

  /**
   * Create a line reader that reads from the given stream using the 
   * given buffer-size.
   * @param in The input stream
   * @param bufferSize Size of the read buffer
   * @throws IOException
   */
  public LineReader(InputStream in, int bufferSize) {
    this.in = in;
    this.bufferSize = bufferSize;
    this.buffer = new byte[this.bufferSize];
    this.recordDelimiterBytes = null;
  }

  /**
   * Create a line reader that reads from the given stream using the
   * <code>io.file.buffer.size</code> specified in the given
   * <code>Configuration</code>.
   * @param in input stream
   * @param conf configuration
   * @throws IOException
   */
  public LineReader(InputStream in, Configuration conf) throws IOException {
    this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
  }

  /**
   * Create a line reader that reads from the given stream using the
   * default buffer-size, and using a custom delimiter of array of
   * bytes.
   * @param in The input stream
   * @param recordDelimiterBytes The delimiter
   */
  public LineReader(InputStream in, byte[] recordDelimiterBytes) {
    this.in = in;
    this.bufferSize = DEFAULT_BUFFER_SIZE;
    this.buffer = new byte[this.bufferSize];
    this.recordDelimiterBytes = recordDelimiterBytes;
  }

  /**
   * Create a line reader that reads from the given stream using the
   * given buffer-size, and using a custom delimiter of array of
   * bytes.
   * @param in The input stream
   * @param bufferSize Size of the read buffer
   * @param recordDelimiterBytes The delimiter
   * @throws IOException
   */
  public LineReader(InputStream in, int bufferSize,
      byte[] recordDelimiterBytes) {
    this.in = in;
    this.bufferSize = bufferSize;
    this.buffer = new byte[this.bufferSize];
    this.recordDelimiterBytes = recordDelimiterBytes;
  }

  /**
   * Create a line reader that reads from the given stream using the
   * <code>io.file.buffer.size</code> specified in the given
   * <code>Configuration</code>, and using a custom delimiter of array of
   * bytes.
   * @param in input stream
   * @param conf configuration
   * @param recordDelimiterBytes The delimiter
   * @throws IOException
   */
  public LineReader(InputStream in, Configuration conf,
      byte[] recordDelimiterBytes) throws IOException {
    this.in = in;
    this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
    this.buffer = new byte[this.bufferSize];
    this.recordDelimiterBytes = recordDelimiterBytes;
  }


  /**
   * Close the underlying stream.
   * @throws IOException
   */
  public void close() throws IOException {
    in.close();
  }

  /**
   * Read one line from the InputStream into the given Text.
   *
   * @param str the object to store the given line (without newline)
   * @param maxLineLength the maximum number of bytes to store into str;
   *  the rest of the line is silently discarded.
   * @param maxBytesToConsume the maximum number of bytes to consume
   *  in this call.  This is only a hint, because if the line cross
   *  this threshold, we allow it to happen.  It can overshoot
   *  potentially by as much as one buffer length.
   *
   * @return the number of bytes read including the (longest) newline
   * found.
   *
   * @throws IOException if the underlying stream throws
   */
  public int readLine(Text str, int maxLineLength,
                      int maxBytesToConsume) throws IOException {
    if (this.recordDelimiterBytes != null) {
      return readCustomLine(str, maxLineLength, maxBytesToConsume);
    } else {
      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
    }
  }

  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
      throws IOException {
    return in.read(buffer);
  }

  /**
   * Read a line terminated by one of CR, LF, or CRLF.
   */
  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
  throws IOException {
    /* We're reading data from in, but the head of the stream may be
     * already buffered in buffer, so we have several cases:
     * 1. No newline characters are in the buffer, so we need to copy
     *    everything and read another buffer from the stream.
     * 2. An unambiguously terminated line is in buffer, so we just
     *    copy to str.
     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
     *    in CR.  In this case we copy everything up to CR to str, but
     *    we also need to see what follows CR: if it's LF, then we
     *    need consume LF as well, so next call to readLine will read
     *    from after that.
     * We use a flag prevCharCR to signal if previous character was CR
     * and, if it happens to be at the end of the buffer, delay
     * consuming it until we have a chance to look at the char that
     * follows.
     */
    str.clear();
    int txtLength = 0; //tracks str.getLength(), as an optimization
    int newlineLength = 0; //length of terminating newline
    boolean prevCharCR = false; //true of prev char was CR
    long bytesConsumed = 0;
    do {
      int startPosn = bufferPosn; //starting from where we left off the last time
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        if (prevCharCR) {
          ++bytesConsumed; //account for CR from previous read
        }
        bufferLength = fillBuffer(in, buffer, prevCharCR);
        if (bufferLength <= 0) {
          break; // EOF
        }
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
        if (buffer[bufferPosn] == LF) {
          newlineLength = (prevCharCR) ? 2 : 1;
          ++bufferPosn; // at next invocation proceed from following byte
          break;
        }
        if (prevCharCR) { //CR + notLF, we are at notLF
          newlineLength = 1;
          break;
        }
        prevCharCR = (buffer[bufferPosn] == CR);
      }
      int readLength = bufferPosn - startPosn;
      if (prevCharCR && newlineLength == 0) {
        --readLength; //CR at the end of the buffer
      }
      bytesConsumed += readLength;
      int appendLength = readLength - newlineLength;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

    if (bytesConsumed > Integer.MAX_VALUE) {
      throw new IOException("Too many bytes before newline: " + bytesConsumed);
    }
    return (int)bytesConsumed;
  }

  /**
   * Read a line terminated by a custom delimiter.
   */
  private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
      throws IOException {
   /* We're reading data from inputStream, but the head of the stream may be
    *  already captured in the previous buffer, so we have several cases:
    * 
    * 1. The buffer tail does not contain any character sequence which
    *    matches with the head of delimiter. We count it as a 
    *    ambiguous byte count = 0
    *    
    * 2. The buffer tail contains a X number of characters,
    *    that forms a sequence, which matches with the
    *    head of delimiter. We count ambiguous byte count = X
    *    
    *    // ***  eg: A segment of input file is as follows
    *    
    *    " record 1792: I found this bug very interesting and
    *     I have completely read about it. record 1793: This bug
    *     can be solved easily record 1794: This ." 
    *    
    *    delimiter = "record";
    *        
    *    supposing:- String at the end of buffer =
    *    "I found this bug very interesting and I have completely re"
    *    There for next buffer = "ad about it. record 179       ...."           
    *     
    *     The matching characters in the input
    *     buffer tail and delimiter head = "re" 
    *     Therefore, ambiguous byte count = 2 ****   //
    *     
    *     2.1 If the following bytes are the remaining characters of
    *         the delimiter, then we have to capture only up to the starting 
    *         position of delimiter. That means, we need not include the 
    *         ambiguous characters in str.
    *     
    *     2.2 If the following bytes are not the remaining characters of
    *         the delimiter ( as mentioned in the example ), 
    *         then we have to include the ambiguous characters in str. 
    */
    str.clear();
    int txtLength = 0; // tracks str.getLength(), as an optimization
    long bytesConsumed = 0;
    int delPosn = 0;
    int ambiguousByteCount=0; // To capture the ambiguous characters count
    do {
      int startPosn = bufferPosn; // Start from previous end position
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
        if (bufferLength <= 0) {
          if (ambiguousByteCount > 0) {
            str.append(recordDelimiterBytes, 0, ambiguousByteCount);
            bytesConsumed += ambiguousByteCount;
          }
          break; // EOF
        }
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) {
        if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
          delPosn++;
          if (delPosn >= recordDelimiterBytes.length) {
            bufferPosn++;
            break;
          }
        } else if (delPosn != 0) {
          bufferPosn--;
          delPosn = 0;
        }
      }
      int readLength = bufferPosn - startPosn;
      bytesConsumed += readLength;
      int appendLength = readLength - delPosn;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      bytesConsumed += ambiguousByteCount;
      if (appendLength >= 0 && ambiguousByteCount > 0) {
        //appending the ambiguous characters (refer case 2.2)
        str.append(recordDelimiterBytes, 0, ambiguousByteCount);
        ambiguousByteCount = 0;
        // since it is now certain that the split did not split a delimiter we
        // should not read the next record: clear the flag otherwise duplicate
        // records could be generated
        unsetNeedAdditionalRecordAfterSplit();
      }
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
      if (bufferPosn >= bufferLength) {
        if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
          ambiguousByteCount = delPosn;
          bytesConsumed -= ambiguousByteCount; //to be consumed in next
        }
      }
    } while (delPosn < recordDelimiterBytes.length 
        && bytesConsumed < maxBytesToConsume);
    if (bytesConsumed > Integer.MAX_VALUE) {
      throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
    }
    return (int) bytesConsumed; 
  }

  /**
   * Read from the InputStream into the given Text.
   * @param str the object to store the given line
   * @param maxLineLength the maximum number of bytes to store into str.
   * @return the number of bytes read including the newline
   * @throws IOException if the underlying stream throws
   */
  public int readLine(Text str, int maxLineLength) throws IOException {
    return readLine(str, maxLineLength, Integer.MAX_VALUE);
  }

  /**
   * Read from the InputStream into the given Text.
   * @param str the object to store the given line
   * @return the number of bytes read including the newline
   * @throws IOException if the underlying stream throws
   */
  public int readLine(Text str) throws IOException {
    return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
  }

  protected int getBufferPosn() {
    return bufferPosn;
  }

  protected int getBufferSize() {
    return bufferSize;
  }

  protected void unsetNeedAdditionalRecordAfterSplit() {
    // needed for custom multi byte line delimiters only
    // see MAPREDUCE-6549 for details
  }
}

它裡面有很多方法,真的有我們要的readLine方法,說明我的推斷沒有錯,沒有忽悠大家,它過載了好幾個readLine方法
其他我們不去管,我們只管對我們有用的,如下

 public int readLine(Text str, int maxLineLength,
                      int maxBytesToConsume) throws IOException {
    if (this.recordDelimiterBytes != null) {
      return readCustomLine(str, maxLineLength, maxBytesToConsume);
    } else {
      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
    }
  }

它裡面呼叫了readCustomLine方法和readDefaultLine方法,下面看看這兩個方法

 private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
      throws IOException {
    str.clear();
    int txtLength = 0; // tracks str.getLength(), as an optimization
    long bytesConsumed = 0;
    int delPosn = 0;
    int ambiguousByteCount=0; // To capture the ambiguous characters count
    do {
      int startPosn = bufferPosn; // Start from previous end position
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
        if (bufferLength <= 0) {
          if (ambiguousByteCount > 0) {
            str.append(recordDelimiterBytes, 0, ambiguousByteCount);
            bytesConsumed += ambiguousByteCount;
          }
          break; // EOF
        }
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) {
        if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
          delPosn++;
          if (delPosn >= recordDelimiterBytes.length) {
            bufferPosn++;
            break;
          }
        } else if (delPosn != 0) {
          bufferPosn--;
          delPosn = 0;
        }
      }
      int readLength = bufferPosn - startPosn;
      bytesConsumed += readLength;
      int appendLength = readLength - delPosn;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      bytesConsumed += ambiguousByteCount;
      if (appendLength >= 0 && ambiguousByteCount > 0) {
        //appending the ambiguous characters (refer case 2.2)
        str.append(recordDelimiterBytes, 0, ambiguousByteCount);
        ambiguousByteCount = 0;
        // since it is now certain that the split did not split a delimiter we
        // should not read the next record: clear the flag otherwise duplicate
        // records could be generated
        unsetNeedAdditionalRecordAfterSplit();
      }
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
      if (bufferPosn >= bufferLength) {
        if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
          ambiguousByteCount = delPosn;
          bytesConsumed -= ambiguousByteCount; //to be consumed in next
        }
      }
    } while (delPosn < recordDelimiterBytes.length 
        && bytesConsumed < maxBytesToConsume);
    if (bytesConsumed > Integer.MAX_VALUE) {
      throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
    }
    return (int) bytesConsumed; 
  }
 private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
  throws IOException {
    str.clear();
    int txtLength = 0; //tracks str.getLength(), as an optimization
    int newlineLength = 0; //length of terminating newline
    boolean prevCharCR = false; //true of prev char was CR
    long bytesConsumed = 0;
    do {
      int startPosn = bufferPosn; //starting from where we left off the last time
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        if (prevCharCR) {
          ++bytesConsumed; //account for CR from previous read
        }
        bufferLength = fillBuffer(in, buffer, prevCharCR);
        if (bufferLength <= 0) {
          break; // EOF
        }
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
        if (buffer[bufferPosn] == LF) {
          newlineLength = (prevCharCR) ? 2 : 1;
          ++bufferPosn; // at next invocation proceed from following byte
          break;
        }
        if (prevCharCR) { //CR + notLF, we are at notLF
          newlineLength = 1;
          break;
        }
        prevCharCR = (buffer[bufferPosn] == CR);
      }
      int readLength = bufferPosn - startPosn;
      if (prevCharCR && newlineLength == 0) {
        --readLength; //CR at the end of the buffer
      }
      bytesConsumed += readLength;
      int appendLength = readLength - newlineLength;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

    if (bytesConsumed > Integer.MAX_VALUE) {
      throw new IOException("Too many bytes before newline: " + bytesConsumed);
    }
    return (int)bytesConsumed;
  }

注意注意readCustomLine和readDefaultLine方法的第一句都用了一句程式碼(重點,後面我們自定義時要改的地方)
他們都用了str.clear();
這句程式碼什麼意思,意思是系統每讀完一行,它會清空這一行的值!!!
如果我們自定義讀取多行的時候,肯定不能清空它,因為我們需要它來計數第二行的位置
比如
123,
456
789,
111
如果一次讀兩行的話 假如我把第一行清空了,那麼我第二行的偏移量就得不到正確的值了,讀出來的值本應該是
123,456
789,111
但是如果清空了的話 就讀出來少了一行
變成了
456
111

所以我們只有獨到最後一行才清空值,它前面的行都不能清空

這就要求我們到時候自己過載一個方法了,到時候再說,大家接著看

有沒有暈了,沒有? 那就好,我都說得這麼清楚了,還暈的話,大家就先休息一下

我幫大家理一理:看都用到了哪些類

這裡寫圖片描述

最開始TextInputFormat裡面用到了LineRecordReader,LineRecordReader裡面用到了SplitLineReader,而SplitLineReader裡面用到了LineReader
自定義的時候思路按這個來
TextInputFormat–》LineRecordReader–》SplitLineReader–》LineReader

下面寫第一個自定義的TextInputFormat

package com.my.input;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class myInputFormat extends FileInputFormat<Text,Text> {
    //用來壓縮的
    @Override
      protected boolean isSplitable(JobContext context, Path file) {
        final CompressionCodec codec =
          new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
        if (null == codec) {
          return true;
        }
        return codec instanceof SplittableCompressionCodec;
      }

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context)
            throws IOException, InterruptedException {
        context.setStatus(genericSplit.toString());
        return new MyRecordReader(context.getConfiguration());
    }

}

接著寫一個自定義的LineRecordReader
其中修改了182行開始的以下程式碼

因為我這裡要實現輸出多行,所以寫了一個for迴圈,又由於我前面說得前面的行不能清空,所以要加一個boolean標誌量

 while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        newSize = skipUtfByteOrderMark();
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }

改為

boolean clear = true;
        for (int i = 1; i <= 2; i++) {
            if (i == 2) {
                clear = false;
            }
            while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
                if (pos == 0) {
                    newSize = skipUtfByteOrderMark();
                } else {
                    newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos), clear);
                    pos += newSize;
                }

                if ((newSize == 0) || (newSize < maxLineLength)) {
                    break;
                }

                // line too long. try again
                LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
            }
        }

再寫自定義SplitLineReader

package com.my.lingRecordReader;
import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MySplitLineReader extends MyLineReader {
  public MySplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
    super(in, recordDelimiterBytes);
  }

  public MySplitLineReader(InputStream in, Configuration conf,
      byte[] recordDelimiterBytes) throws IOException {
    super(in, conf, recordDelimiterBytes);
  }

  public boolean needAdditionalRecordAfterSplit() {
    return false;
  }
}

最後寫自定義LineReader
它過載了一個readLine(Text str, int maxLineLength, int maxBytesToConsume, boolean clear)方法來實現不清空前面讀取的行的值

package com.my.lingRecordReader;

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;

/**
 * A class that provides a line reader from an input stream. Depending on the
 * constructor used, lines will either be terminated by:
 * <ul>
 * <li>one of the following: '\n' (LF) , '\r' (CR), or '\r\n' (CR+LF).</li>
 * <li><em>or</em>, a custom byte sequence delimiter</li>
 * </ul>
 * In both cases, EOF also terminates an otherwise unterminated line.
 */
@InterfaceAudience.LimitedPrivate({ "MapReduce" })
@InterfaceStability.Unstable
public class MyLineReader implements Closeable {
    private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
    private int bufferSize = DEFAULT_BUFFER_SIZE;
    private InputStream in;
    private byte[] buffer;
    // the number of bytes of real data in the buffer
    private int bufferLength = 0;
    // the current position in the buffer
    private