1. 程式人生 > >【Apache Hadoop】MapReuce 程式設計總結-多MapReduce執行

【Apache Hadoop】MapReuce 程式設計總結-多MapReduce執行

學習hadoop,必不可少的就是寫MapReduce程式,當然,對於簡單的分析程式,我們只需一個MapReduce就能搞定,這裡就不提單MapReuce的情況了,網上例子很多,大家可以百度Google一下。對於比較複雜的分析程式,我們可能需要多個Job或者多個Map或者Reduce進行分析計算。

        多Job或者多MapReduce的程式設計形式有以下幾種:

1、迭代式MapReduce

MapReduce迭代方式,通常是前一個MapReduce的輸出作為下一個MapReduce的輸入,最終可只保留最終結果,中間資料可以刪除或保留,根據業務需要自己決定

        示例程式碼如下:

Configuration conf = new Configuration();
//first Job
Job job1 = new Job(conf,"job1");
.....
FileInputFormat.addInputPath(job1,InputPaht1);
FileOutputFromat.setOutputPath(job1,Outpath1);
job1.waitForCompletion(true);
//second Mapreduce
Job job2 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job2,Outpath1);
FileOutputFromat.setOutputPath(job2,Outpath2);
job2.waitForCompletion(true);
//third Mapreduce
Job job3 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job3,Outpath2);
FileOutputFromat.setOutputPath(job3,Outpath3);
job3.waitForCompletion(true);
.....

下面列舉一個mahout怎樣運用mapreduce迭代的,下面的程式碼快就是mahout中kmeans的演算法的程式碼,在main函式中用一個while迴圈來做mapreduce的迭代,其中:runIteration()是一次mapreduce的過程。

但個人感覺現在的mapreduce迭代設計不太滿意的地方。

1. 每次迭代,如果所有Job(task)重複建立,代價將非常高。

2.每次迭代,資料都寫入本地和讀取本地,I/O和網路傳輸的代價比較大。

好像Twister和Haloop的模型能過比較好的解決這些問題,但他們抽象度不夠高,支援的計算有限。

期待著下個版本hadoop更好的支援迭代演算法。

//main function
while (!converged && iteration <= maxIterations) {
      log.info("K-Means Iteration {}", iteration);
      // point the output to a new directory per iteration
      Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration);
      converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta);
      // now point the input to the old output directory
      clustersIn = clustersOut;
      iteration++;
}

  private static boolean runIteration(Configuration conf,
                                      Path input,
                                      Path clustersIn,
                                      Path clustersOut,
                                      String measureClass,
                                      String convergenceDelta)
    throws IOException, InterruptedException, ClassNotFoundException {

    conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
    conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
    conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);

    Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(ClusterObservations.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Cluster.class);

    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setMapperClass(KMeansMapper.class);
    job.setCombinerClass(KMeansCombiner.class);
    job.setReducerClass(KMeansReducer.class);

    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, clustersOut);

    job.setJarByClass(KMeansDriver.class);
    HadoopUtil.delete(conf, clustersOut);
    if (!job.waitForCompletion(true)) {
      throw new InterruptedException("K-Means Iteration failed processing " + clustersIn);
    }
    FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);

    return isConverged(clustersOut, conf, fs);
  }

2、依賴關係式MapReuce-JobControl

依賴關係式主要是由JobControl來實現,JobControl由兩個類組成:Job和JobControl。其中,Job類封裝了一個MapReduce作業及其對應的依賴關係,主要負責監控各個依賴作業的執行狀態,以此更新自己的狀態。

JobControl包含了一個執行緒用於週期性的監控和更新各個作業的執行狀態,排程依賴作業執行完成的作業,提交處於READY狀態的作業等,同事,還提供了一些API用於掛起、回覆和暫停該執行緒。

示例程式碼如下:

<span style="font-size:14px">Configuration job1conf = new Configuration();
Job job1 = new Job(job1conf,"Job1");
.........//job1 其他設定
Configuration job2conf = new Configuration();
Job job2 = new Job(job2conf,"Job2");
.........//job2 其他設定
Configuration job3conf = new Configuration();
Job job3 = new Job(job3conf,"Job3");
.........//job3 其他設定
job3.addDepending(job1);//設定job3和job1的依賴關係
job3.addDepending(job2);
JobControl JC = new JobControl("123");
JC.addJob(job1);//把三個job加入到jobcontorl中
JC.addJob(job2);
JC.addJob(job3);
JC.run();</span>

3、線性鏈式MapReduce-ChainMapper/ChainReduce

ChainMapper/ChainReduce主要為了解決線性鏈式Mapper提出的。在Map或者Reduce階段存在多個Mapper,這些Mapper像Linux管道一樣,前一個Mapper的輸出結果直接重定向到下一個Mapper的輸入,行程流水線。

需要注意的是,對於任意一個MapReduce作業,Map和Reduce階段可以有無線個Mapper,但是Reduce只能有一個。所以包含多個Reduce的作業,不能使用ChainMapper/ChainReduce來完成。

程式碼如下:

...
conf.setJobName("chain");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

JobConf mapper1Conf=new JobConf(false);
JobConf mapper2Conf=new JobConf(false);
JobConf redduce1Conf=new JobConf(false);
JobConf mappe3Conf=new JobConf(false);
...
ChainMapper.addMapper(conf,Mapper1.class,LongWritable.class,Text.class,Text.class,Text.class,true,mapper1Conf);
ChainMapper.addMapper(conf,Mapper2.class,Text.class,Text.class,LongWritable.class,Text.class,false,mapper2Conf);
ChainReducer.setReduce(conf,Reducer.class,LongWritable.class,Text.class,Text.class,Text.class,true,reduce1Conf);
ChainReducer.addMapper(conf,Mapper3.class,Text.class,Text.class,LongWritable.class,Text.class,true,mapper3Conf);
JobClient.runJob(conf);

4、子Job式MapReduce

子Job式其實也是迭代式中的一種,我這裡單獨的提取出來了,說白了,就是一個父Job包含多個子Job。

在nutch中,Crawler是一個父Job,通過run方法中呼叫runTool工具進行子Job的呼叫,而runTool是通過反射來呼叫子Job執行。

下面來看下Nutch裡面是如何實現的

....
private NutchTool currentTool = null;
....
private Map<String, Object> runTool(Class<? extends NutchTool> toolClass,
			Map<String, Object> args) throws Exception {
		currentTool = (NutchTool) ReflectionUtils.newInstance(toolClass,
				getConf());
		return currentTool.run(args);
	}
...
@Override
	public Map<String, Object> run(Map<String, Object> args) throws Exception {
		results.clear();
		status.clear();
		String crawlId = (String) args.get(Nutch.ARG_CRAWL);
		if (crawlId != null) {
			getConf().set(Nutch.CRAWL_ID_KEY, crawlId);
		}
		String seedDir = null;
		String seedList = (String) args.get(Nutch.ARG_SEEDLIST);
		if (seedList != null) { // takes precedence
			String[] seeds = seedList.split("\\s+");
			// create tmp. dir
			String tmpSeedDir = getConf().get("hadoop.tmp.dir") + "/seed-"
					+ System.currentTimeMillis();
			FileSystem fs = FileSystem.get(getConf());
			Path p = new Path(tmpSeedDir);
			fs.mkdirs(p);
			Path seedOut = new Path(p, "urls");
			OutputStream os = fs.create(seedOut);
			for (String s : seeds) {
				os.write(s.getBytes());
				os.write('\n');
			}
			os.flush();
			os.close();
			cleanSeedDir = true;
			seedDir = tmpSeedDir;
		} else {
			seedDir = (String) args.get(Nutch.ARG_SEEDDIR);
		}
		Integer depth = (Integer) args.get(Nutch.ARG_DEPTH);
		if (depth == null)
			depth = 1;
		boolean parse = getConf().getBoolean(FetcherJob.PARSE_KEY, false);
		String solrUrl = (String) args.get(Nutch.ARG_SOLR);
		int onePhase = 3;
		if (!parse)
			onePhase++;
		float totalPhases = depth * onePhase;
		if (seedDir != null)
			totalPhases++;
		float phase = 0;
		Map<String, Object> jobRes = null;
		LinkedHashMap<String, Object> subTools = new LinkedHashMap<String, Object>();
		status.put(Nutch.STAT_JOBS, subTools);
		results.put(Nutch.STAT_JOBS, subTools);
		// inject phase
		if (seedDir != null) {
			status.put(Nutch.STAT_PHASE, "inject");
			jobRes = runTool(InjectorJob.class, args);
			if (jobRes != null) {
				subTools.put("inject", jobRes);
			}
			status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
			if (cleanSeedDir && tmpSeedDir != null) {
				LOG.info(" - cleaning tmp seed list in " + tmpSeedDir);
				FileSystem.get(getConf()).delete(new Path(tmpSeedDir), true);
			}
		}
		if (shouldStop) {
			return results;
		}
		// run "depth" cycles
		for (int i = 0; i < depth; i++) {
			
			status.put(Nutch.STAT_PHASE, "generate " + i);
			jobRes = runTool(GeneratorJob.class, args);
			if (jobRes != null) {
				subTools.put("generate " + i, jobRes);
			}

			status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
			if (shouldStop) {
				return results;
			}
			status.put(Nutch.STAT_PHASE, "fetch " + i);
			jobRes = runTool(FetcherJob.class, args);
			if (jobRes != null) {
				subTools.put("fetch " + i, jobRes);
			}
			status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
			if (shouldStop) {
				return results;
			}
			if (!parse) {
				status.put(Nutch.STAT_PHASE, "parse " + i);
				jobRes = runTool(ParserJob.class, args);
				if (jobRes != null) {
					subTools.put("parse " + i, jobRes);
				}
				status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
				if (shouldStop) {
					return results;
				}
			}
			status.put(Nutch.STAT_PHASE, "updatedb " + i);
			jobRes = runTool(DbUpdaterJob.class, args);
			if (jobRes != null) {
				subTools.put("updatedb " + i, jobRes);
			}
			status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
			if (shouldStop) {
				return results;
			}
		}
		if (solrUrl != null) {
			status.put(Nutch.STAT_PHASE, "index");
			jobRes = runTool(SolrIndexerJob.class, args);
			if (jobRes != null) {
				subTools.put("index", jobRes);
			}
		}
		return results;
	}