1. 程式人生 > >Hadoop 裡MapReduce裡 實現多個job任務 包含(迭代式、依賴式、鏈式)

Hadoop 裡MapReduce裡 實現多個job任務 包含(迭代式、依賴式、鏈式)

一、迭代式,所謂的迭代式,下一個執行的Job任務以上一個Job的輸出作為輸入,最終得到想要的結果。 這裡我只寫關鍵的程式碼了

Job job = new Job(new Configuration(),“test”);

JobConf jobConf=(JobConf) job.getConfiguration();
jobConf.setJobName("hadoopJoinTask");

//設定job輸入路徑
FileInputFormat.setInputPaths(inputPath);
//設定job輸出的路徑
FileOutputFormat.setOutputPath(jobConf, outPath);

Job job2 = new Job(new Configuration(),“test2”);

JobConf jobConf2=(JobConf) job2.getConfiguration();
jobConf2.setJobName("hadoopJoinTask");

//設定job2輸入路徑  job的輸出路徑
FileInputFormat.setInputPaths(outPath);
//設定job2輸出的路徑
FileOutputFormat.setOutputPath(jobConf2, outPath2);


二、依賴式,工作中經常遇到這樣的情況,比如job3需要等job1、job2、、、等執行完才能執行,因此job3是依賴於其他的job完成才能執行
//hadoop2  檢視hadoop原始碼 JobControl 發現有ControlledJob,  ControlledJob裡有依賴方法  addDependingJob

        Job job = new Job(new Configuration(),"job1");
        Job job2 = new Job(new Configuration(),"job2");

        ControlledJob controlledJob=new ControlledJob(job.getConfiguration());

        //設定job
        controlledJob.setJob(job);

        ControlledJob controlledJob2=new ControlledJob(job2.getConfiguration());
        controlledJob2.setJob(job2);
        //這裡就是設定job依賴的重要方法了,依賴於<span style="font-size: 9pt; font-family: Menlo;">controlledJob</span><span style="font-size: 9pt; font-family: Menlo;">  </span>
        controlledJob.addDependingJob(controlledJob);

        JobControl jc=new JobControl("jc");
        jc.addJob(controlledJob);
        jc.addJob(controlledJob2);
        //由於JobControl實現了Runnable 介面,而Runnable介面只有執行方法,沒有結束方法,因此需要一個執行緒來輔助

        Thread jcThread = new Thread(jc);
        jcThread.start();
        while(true){
            //當job池裡所有的job完成後,執行 下一步操作
            if(jc.allFinished()){
                System.out.println(jc.getSuccessfulJobList());
                jc.stop();


            }
            //獲取執行失敗的job列表
            if(jc.getFailedJobList().size() > 0){
                System.out.println(jc.getFailedJobList());
                jc.stop();

            }
        }
三、鏈式
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJobName("ChianJOb");
        // 在ChainMapper裡面新增Map1
        Configuration map1conf = new Configuration(false);
        ChainMapper.addMapper(job, Map1.class, LongWritable.class, Text.class,
                Text.class, Text.class, true, map1conf);
        // 在ChainReduce中加入Reducer,Map2;
        Configuration reduceConf = new Configuration(false);
        ChainReducer.setReducer(job, Reduce.class, LongWritable.class,
                Text.class, Text.class, Text.class, true, map1conf);
        Configuration map2Conf = new Configuration();
        ChainReducer.addMapper(job, Map2.class, LongWritable.class, Text.class,
                Text.class, Text.class, true, <span style="font-family: Menlo; font-size: 9pt;">map2Conf</span><span style="font-size: 9pt; font-family: Menlo;">);</span>