1. 程式人生 > >Hadoop自定義linereader,實現按行分塊

Hadoop自定義linereader,實現按行分塊

最近想用Hadoop實現一個A的轉置乘以A的矩陣運算,假設A是100w*100的矩陣,想把100w行特徵分成100個map,每個map處理1w行,每個map一次性處理1w行,而不是一行一行處理。
hadoop0.21.0這個版本已經實現了NLineInputFormat這個方法

job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(job, in);
NLineInputFormat.setNumLinesPerSplit(job,10000); 

這樣可以實現每個map處理10000行的需求,100萬行就是100個map,而不是預設的按物理塊大小分配map,但是這個介面內部呼叫map方法的時候,仍是一行一行處理的,map方法會被執行10000次,現在改寫linereader,讓map方法只執行一次,每次處理10000行。

首先定義myLineInputFormat類,將job的讀入方式設成myLineInputFormat。

job.setInputFormatClass(myLineInputFormat.class);
myLineInputFormat.addInputPath(job, in);
myLineInputFormat.setNumLinesPerSplit(job,10000);

myLineInputFormat的原始碼直接拷貝NLineInputFormat的原始碼,myLineInputFormat這個類裡面的RecordReader返回的LongWritable, Text返回的是每個map方法裡的檔案偏移量和文字內容,每次讀10000行,偏移量就是10000行的偏移量,text自然是10000行的文字內容。
重寫myLineInputFormat類裡的RecordReader方法:

public RecordReader<LongWritable, Text> createRecordReader(
      InputSplit genericSplit, TaskAttemptContext context) 
      throws IOException {
    context.setStatus(genericSplit.toString());
    return new mylinereader();
  }

定義myLineReader類,先原始碼拷貝LineRecordReader類,getFilePosition方法就是返回的每次文字的偏移量,每次的偏移量根據LineReader的readLine方法來確定,所以myLineReader的LineReader引用自己的LineReader,把import的類包改一下就成。
定義LineReader類,放在自定義的package裡面,原始碼拷貝原先hadoop裡的LineReader類,修改readLine方法:

      if(appendLength > 0)
         appendLength = (appendLength+1) * 10000 ;
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }

第一個if是修改檔案的長度,自己新增,第二個if是原來方法裡的,如果10000行內容太多,這裡append會報陣列越界,所以修改LineReader的構造方法,如下,DEFAULT_BUFFER_SIZE是類屬性,值也可以自己修改

 public LineReader(InputStream in, Configuration conf) throws IOException {
    //this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
      this(in, DEFAULT_BUFFER_SIZE);
  }

同時修改readLine方法的返回值,原始是返回一行的偏移量,現在改成返回10000行的偏移量

  return (int)bytesConsumed * 10000 ;