1. 程式人生 > >Hadoop的JVM重用機制和小檔案解決

Hadoop的JVM重用機制和小檔案解決

Hadoop的JVM重用機制和小檔案解決

一、hadoop2.0 uber功能
  1) uber的原理:Yarn的預設配置會禁用uber元件,即不允許JVM重用。我們先看看在這種情況下,Yarn是如何執行一個MapReduce job的。首先,Resource Manager裡的Applications Manager會為每一個application(比如一個使用者提交的MapReduce Job)在NodeManager裡面申請一個container,然後在該container裡面啟動一個Application Master。container在Yarn中是分配資源的容器(記憶體、cpu、硬碟等),它啟動時便會相應啟動一個JVM。此時,Application Master便陸續為application包含的每一個task(一個Map task或Reduce task)向Resource Manager申請一個container。等每得到一個container後,便要求該container所屬的NodeManager將此container啟動,然後就在這個container裡面執行相應的task。等這個task執行完後,這個container便會被NodeManager收回,而container所擁有的JVM也相應地被退出。在這種情況下,可以看出每一個JVM僅會執行一Task, JVM並未被重用。
  2)使用者可以通過啟用uber元件來允許JVM重用——即在同一個container裡面依次執行多個task。在yarn-site.xml檔案中,改變一下幾個引數的配置即可啟用uber的方法:引數| 預設值 | 描述- mapreduce.job.ubertask.enable | (false) | 是否啟用user功能。如果啟用了該功能,則會將一個“小的application”的所有子task在同一個JVM裡面執行,達到JVM重用的目的。這個JVM便是負責該application的ApplicationMaster所用的JVM(執行在其container裡)。那具體什麼樣的application算是“小的application"呢?下面幾個引數便是用來定義何謂一個“小的application"- mapreduce.job.ubertask.maxmaps | 9 | map任務數的閥值,如果一個application包含的map數小於該值的定義,那麼該application就會被認為是一個小的application。- mapreduce.job.ubertask.maxreduces | 1 | reduce任務數的閥值,如果一個application包含的reduce數小於該值的定義,那麼該application就會被認為是一個小的application。不過目前Yarn不支援該值大於1的情況。- mapreduce.job.ubertask.maxbytes | | application的輸入大小的閥值。預設為dfs.block.size的值。當實際的輸入大小不超過該值的設定,便會認為該application為一個小的application。最後,我們來看當uber功能被啟用的時候,Yarn是如何執行一個application的。首先,Resource Manager裡的Applications Manager會為每一個application在NodeManager裡面申請一個container,然後在該container裡面啟動一個Application Master。containe啟動時便會相應啟動一個JVM。此時,如果uber功能被啟用,並且該application被認為是一個“小的application”,那麼Application Master便會將該application包含的每一個task依次在這個container裡的JVM裡順序執行,直到所有task被執行完。這樣Application Master便不用再為每一個task向Resource Manager去申請一個單獨的container,最終達到了 JVM重用(資源重用)的目的。
  3)在yarn-site.xml裡的配置示例:

	<!-- 開啟uber模式(針對小作業的優化) --> 
	<property> 
		<name>mapreduce.job.ubertask.enable</name> 
		<value>true</value> 
	</property> 
	<!-- 配置啟動uber模式的最大map數 --> 
	<property> 
		<name>mapreduce.job.ubertask.maxmaps</name> 
		<value>9</value> 
	</property> 
	<!-- 配置啟動uber模式的最大reduce數 --> 
	<property> 
		<name>mapreduce.job.ubertask.maxreduces</name> 
		<value>1</value> 
	</property> 

2.0的uber模式開啟之後,JVM重用也一定生效。生效的條件是Map任務數量小於配置的任務數量,則認為是一個小任務,如果是小任務則JVM生效。
  比如配置的Map任務=9 ,實際的map=5。如果實際的12,則認為不是一個小任務,則不開啟JVM重用機制。
  補充:uber的規定,reduce的數量必須是1,如果reduce>1,則不認為是小任務。JVM重用機制,是針對任務為單位的,即不同Task是不能共用JVM重用機制的。使用JVM重用機制,一定要注意全域性變數的使用問題。
二、Hadoop小檔案問題
  小檔案的定義:小檔案指的是那些size比HDFS 的block size(預設64M/1.0版本,128M/2.0版本)小的多的檔案。如果在HDFS中儲存海量的小檔案,會產生很多問題。
  大量小檔案在HDFS中的問題:任何一個檔案,目錄和block,在HDFS中都會被表示為元資料資訊,每一個元資料資訊佔用150 bytes的記憶體空間。所以,如果有10million個檔案,每一個檔案對應一個block,那麼就將要消耗namenode 3G的記憶體來儲存這些block的資訊。如果規模再大一些,那麼將會超出現階段計算機硬體所能滿足的極限。不僅如此,HDFS並不是為了有效的處理大量小檔案而存在的。它主要是為了流式的訪問大檔案而設計的。對小檔案的讀取通常會造成大量從datanode到datanode的seeks和hopping來retrieve檔案,而這樣是非常的低效的一種訪問方式。
  大量小檔案在mapreduce中的問題:Map tasks通常是每次處理一個block的input(預設使用FileInputFormat)。如果檔案非常的
小,並且擁有大量的這種小檔案,那麼每一個map task都僅僅處理了非常小的input資料,並且會產生大量的map tasks,每一個map task都會消耗一定量的bookkeeping的資源。比較一個1GB的檔案,預設block size為64M,和1Gb的檔案,沒一個檔案100KB,那麼後者沒一個小檔案使用一個map task,那麼job的時間將會十倍甚至百倍慢於前者。
  hadoop中有一些特性可以用來減輕這種問題:可以在一個JVM中允許task reuse,以支援在一個JVM中執行多個map task,以此來減少一些JVM的啟動消耗。另一種方法是將多個小檔案合成一個spilt,即用一個map任務來處理。
三、Hadoop小檔案解決方案
  在使用Hadoop處理海量小檔案的應用場景中,如果你選擇使用CombineFileInputFormat,而且你是第一次使用,可能你會感到有點迷惑。雖然,從這個處理方案的思想上很容易理解,但是可能會遇到這樣那樣的問題。使用CombineFileInputFormat作為Map任務的輸入規格描述,首先需要實現一個自定義的RecordReader。CombineFileInputFormat的大致原理是,他會將輸入多個數據檔案(小檔案)的元資料全部包裝到CombineFileSplit類裡面。也就是說,因為小檔案的情況下,在HDFS中都是單Block的檔案,即一個檔案一個Block,一個CombineFileSplit包含了一組檔案Block,包括每個檔案的起始偏移(offset),長度(length),Block位置(localtions)等元資料。如果想要處理一個CombineFileSplit,很容易想到,對其包含的每個InputSplit(實際上這裡面沒有這個,你需要讀取一個小檔案塊的時候,需要構造一個FileInputSplit物件)。在執行MapReduce任務的時候,需要讀取檔案的文字行(簡單一點是文字行,也可能是其他格式資料)。那麼對於CombineFileSplit來說,你需要處理其包含的小檔案Block,就要對應設定一個RecordReader,才能正確讀取檔案資料內容。通常情況下,我們有一批小檔案,格式通常是相同的,只需要在為CombineFileSplit實現一個RecordReader的時候,內建另一個用來讀取小檔案Block的RecordReader,這樣就能保證讀取CombineFileSplit內部聚積的小檔案。
  為CombineFileSplit實現一個RecordReader,並在內部使用Hadoop自帶的
LineRecordReader來讀取小檔案的文字行資料程式碼實現:

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class CombineSmallfileRecordReader extends RecordReader<LongWritable,BytesWritable>{
	private CombineFileSplit combineFileSplit;
	private LineRecordReader lineRecordReader=new LineRecordReader();
	private Path[] paths;
	private int totalLength;
	private int currentIndex;
	private float currentProgress=0;
	private LongWritable currentKey;
	private BytesWritable currentValue=new BytesWritable();
	
	public CombineSmallfileRecordReader(CombineFileSplit combineFileSplit,TaskAttemptContext context,Integer index) {
		super();
		this.combineFileSplit=combineFileSplit;
		this.currentIndex=index;//當前要處理的小檔案Block在CombineFileSpilt中的索引
	}
	
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		this.combineFileSplit = (CombineFileSplit) split;
		// 處理CombineFileSplit中的一個小檔案Block,因為使用LineRecordReader,需要構造一個FileSplit物件,然後才能夠讀取資料
		FileSplit fileSplit =new FileSplit(combineFileSplit.getPath(currentIndex),
											combineFileSplit.getOffset(currentIndex),
											combineFileSplit.getLength(currentIndex),
											combineFileSplit.getLocations());
		lineRecordReader.initialize(fileSplit, context);
		this.paths = combineFileSplit.getPaths();
		totalLength = paths.length;
		context.getConfiguration().set("map.input.file.name",combineFileSplit.getPath(currentIndex).getName());
	}
	
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (currentIndex >=0&&currentIndex< totalLength){
			return lineRecordReader.nextKeyValue();
		}else{
			return false;
		}
	}
	
	@Override
	public BytesWritable getCurrentValue() throws IOException,InterruptedException {
		byte[] content = lineRecordReader.getCurrentValue().getBytes();
		currentValue.set(content,0,content.length);
		return currentValue;
	}
	@Override
	public float getProgress() throws IOException, InterruptedException {
		if(currentIndex >=0&& currentIndex < totalLength){
			currentProgress = (float) currentIndex / totalLength;
			return currentProgress;
		}
		return currentProgress;
	}
	
	@Override
	public void close() throws IOException {
		lineRecordReader.close();
	}

如果存在這樣的應用場景,你的小檔案具有不同的格式,那麼就需要考慮對不同型別的小檔案,使用不同的內建RecordReader,具體邏輯也是在上面的類中實現。
我們已經為CombineFileSplit實現了一個RecordReader,然後需要在一個
CombineFileInputFormat中注入這個RecordReader類實現類
CombineSmallfileRecordReader的物件。這時,需要實現一個CombineFileInputFormat的子類,可以重寫createRecordReader方法。我們實現的CombineSmallfileInputFormat,程式碼如下所示:

CombineSmallfileInputFormat類

import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

public class CombineSmallfileInputFormat extends CombineFileInputFormat<LongWritable, BytesWritable>{
	@Override
	public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
		CombineFileSplit combineFileSplit = (CombineFileSplit) split;
		//這裡比較重要的是,一定要通過CombineFileRecordReader來建立一個RecordReader
		//而且它的構造方法的引數必須是上面的定義的型別和順序。
		//構造方法包含3個引數:第一個是CombineFileSplit型別,第二個是TaskAttemptContext型別,第三個是Class<? extends RecordReader>型別。
		CombineFileRecordReader<LongWritable, BytesWritable> recordReader =new CombineFileRecordReader<LongWritable, BytesWritable>(combineFileSplit, context, CombineSmallfileRecordReader.class);
			try {
				recordReader.initialize(combineFileSplit, context);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return recordReader;
		}
	}

CombineSmallfileMapper類

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CombineSmallfileMapper extends Mapper<LongWritable, BytesWritable, Text,Text>{
	private Text file=new Text();
	@Override
	Mapper<LongWritable, BytesWritable, Text, Text>.Context context)throws IOException,InterruptedException {
		String fileName=context.getConfiguration().get("map.input.file.name");
		file.set(fileName);
		context.write(file, new Text(new String(value.getBytes()).trim()));
	}
}

CombineSmallfiles類

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class CombineSmallfiles {
	public static void main(String[] args) throws Exception {
		Configuration conf=new Configuration();
		Job job =Job.getInstance(conf);
		job.setJarByClass(CombineSmallfiles.class);
		job.setMapperClass(CombineSmallfileMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(BytesWritable.class);
		job.setInputFormatClass(CombineSmallfileInputFormat.class);
		FileInputFormat.setInputPaths(job, new 
		Path("hdfs://192.168.234.21:9000/score"));
		FileOutputFormat.setOutputPath(job, new 
		Path("hdfs://192.168.234.21:9000/score/result"));
		job.waitForCompletion(true);
	}
}