1. 程式人生 > >(三)hadoop中FileInputFormat類的getSplits獲取InputSplit的過程

(三)hadoop中FileInputFormat類的getSplits獲取InputSplit的過程

FileInputFormat繼承了抽象類InputFormat,來看一下InputFormat的原始碼:

public abstract class InputFormat<K, V> {
  public abstract 
    List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,                                    InterruptedException;
}

InputFormat 主要用於描述輸入資料的格式, 它提供以下兩個功能。
(1)資料切分 : 按照某個策略將輸入資料切分成若干個 InputSplit, 以便確定 Map Task 個數。對應的就是getSplits方法。
(2)為 Mapper 提供輸入資料: 給定某個 InputSplit, 能將其解析成一個個 key/value 對。對應的就是createRecordReader方法。

getSplits 方法主要完成資料切分的功能, 它會嘗試著將輸入資料切分成 InputSplit,並放入集合List中返回。
InputSplit有以下特點:
(1)邏輯分片 : 它只是在邏輯上對輸入資料進行分片, 並不會在磁碟上將其切分成分片進行儲存。 InputSplit 只記錄了分片的元資料資訊, 比如起始位置、 長度以及所在的節點列表等。
(2)可序列化: 在 Hadoop 中, 物件序列化主要有兩個作用: 程序間通訊和永久儲存。 此處, InputSplit 支援序列化操作主要是為了程序間通訊。 作業被提交到 JobTracker 之前, Client 會呼叫作業 InputFormat 中的 getSplits 函式, 並將得到的 InputSplit 序列
化到檔案中。 這樣, 當作業提交到 JobTracker 端對作業初始化時, 可直接讀取該檔案, 解析出所有 InputSplit, 並建立對應的 Map Task。而createRecordReader則根據InputSplit ,將其解析成一個個 key/value 對。

現在再來看FileInputFormat中對這兩個方法的具體實現。

先來看FileInputFormat的定義:

public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {}

FileInputFormat也是一個抽象類,繼承了InputFormat這個抽象類。
再來看看FileInputFormat關於getSplits的實現,原始碼如下:

public List<InputSplit> getSplits(JobContext job
) throws
IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus>files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); FileSystem fs = path.getFileSystem(job.getConfiguration()); long length = file.getLen(); BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); if ((length != 0) && isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else if (length != 0) { splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); } else { //Create empty hosts array for zero length files splits.add(new FileSplit(path, 0, length, new String[0])); } }

我們來一塊一塊的分析,

long minSize = Math.max(getFormatMinSplitSize(),                                                                    getMinSplitSize(job));

long maxSize = getMaxSplitSize(job);

不難理解,這裡是獲取InputSplit的size的最小值和最大值,最小值minSize是通過取getFormatMinSplitSize()和getMinSplitSize(job))中的較大的值
getFormatMinSplitSize方法的原始碼如下:

protected long getFormatMinSplitSize() {
  return 1;
}

直接返回1,單位是B.
而getMinSplitSize方法的原始碼如下:

public static long getMinSplitSize(JobContext job) {
   return job.getConfiguration().getLong("mapred.min.split.size", 1L);
}

返回的是從配置檔案中讀取“mapred.min.split.size”屬性的value值,“mapred.min.split.size”是需要使用者自己新增配置的,配置在mapred-site.xml檔案中。
這樣一來,minSize的值取決於使用者配置的mapred.min.split.size和1B中的較大值
maxSize的大小是由getMaxSplitSize方法確定的,原始碼如下:

public static long getMaxSplitSize(JobContext context) {
   return context.getConfiguration().getLong("mapred.max.split.size",Long.MAX_VALUE);
}

若“mapred.max.split.size”屬性值讀取不到,則返回Long.MAX_VALUE,否則返回“mapred.max.split.size”屬性的值。

getSplits方法中有一句程式碼值得關注:

long splitSize = computeSplitSize(blockSize, minSize, maxSize);

這裡是確定了InputSplit的大小,computeSplitSize方法原始碼如下:

protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
  return Math.max(minSize, Math.min(maxSize, blockSize));
}

上面說過,minSize的值取決於使用者配置的mapred.min.split.size和1B中的較大值。 maxSize的值取決於使用者配置的mapred.max.split.size和Long.MAX_VALUE中的較大值。blockSize則是HDFS的預設塊大小。

獲取到splitSize 後,檔案將被切分成大小為splitSize的InputSplit,最後剩下不足splitSize的資料塊單獨成為一個InputSplit。
那接下來毫無疑問就是按照splitSize 來切分檔案了(邏輯上的切分)。

再看getSplits方法的程式碼塊:

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize >SPLIT_SLOP) {
   int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
   splits.add(new FileSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
   bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
   splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
  blkLocations[blkLocations.length-1].getHosts()));
}

這裡就是切分的核心程式碼了,bytesRemaining 表示的是切分後,剩餘的待切分的檔案大小,初始值就是檔案大小【length】,splitSize就是InputSplit的大小,SPLIT_SLOP是一個常量值,定義如下:

private static final double SPLIT_SLOP = 1.1;//10% slop

意思就是當剩餘檔案大小bytesRemaining與splitSize的比值還大於1.1的時候,就繼續切分,否則,剩下的直接作為一個InputSplit。
敲黑板,劃重點:並不一定非得bytesRemaining小於splitSize才停止劃分哦,只要bytesRemaining/splitSize<=1.1就會停止劃分,將剩下的作為一個InputSplit

我們還可以看到,

splits.add(new FileSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts()));

這裡四個引數表示的意思是該InputSplit所在的(路徑,起始位置,大小,所在的 host(節點) 列表)

以上就知道getSplits獲取InputSplit的過程。