1. 程式人生 > >2018-08-04 期 MapReduce倒排索引編程案例2(jobControll方式)

2018-08-04 期 MapReduce倒排索引編程案例2(jobControll方式)

基本 正常 org gets [] pro stat context 控制器

1、第一階段MapReduce任務程序

package cn.itcast.bigdata.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**

* 利用MapReduce實現輸入多個文件中單詞在每個文件中出現的次數,輸出格式如下:

* hello (a.txt 2,b.txt 1,c.txt 4)

* tom (a.txt 5,b.txt 3)

* 實現方法:采用倒排索引算法並結合jobControll實現

* 本案例中所有的Mapper、Reducer、Job均采用匿名內部類實現

* @author songjq

*

*/

public class IndexStepOne {

/**

* 第一階段Mapper處理後輸出數據格式為

* <k2> <v2>

* <hello:a.txt> <1>

* <hello:a.txt> <1>

* <hello:b.txt> <1>

* @author songjq

*

*/

static class IndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

/**

* 格式:<hello-->a.txt,1><helle-->b.txt,1>

*/

private Text tkey = new Text();

private IntWritable tvalue = new IntWritable(1);

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

FileSplit inputSplit = (FileSplit) context.getInputSplit();

String fileName = inputSplit.getPath().getName();

String line = value.toString();

String[] split = line.split(" ");

for (String val : split) {

tkey.set(val + "-->" + fileName);

context.write(tkey, tvalue);

}

}

}

/**

* 第一階段Mapper輸出數據格式為

* <k2> <v2>

* <hello:a.txt> <1>

* <hello:a.txt> <1>

* <hello:b.txt> <1>

* 第一階段Reducer處理後輸出到HDFS數據格式為

* <k3> <v3>

* <hello> <a.txt-->2>

* <hello> <b.txt-->1>

* @author songjq

*

*/

static class IndexStepOneReducer extends Reducer<Text, IntWritable, Text, LongWritable> {

private LongWritable tvalue = new LongWritable(0);

@Override

protected void reduce(Text key, Iterable<IntWritable> values, Context ctx)

throws IOException, InterruptedException {

long count = 0;

for(IntWritable value:values) {

count++;

}

tvalue.set(count);

ctx.write(key, tvalue);

}

}

}

2、第二階段MapReduce任務程序

package cn.itcast.bigdata.index;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

/**

* 利用MapReduce實現輸入多個文件中單詞在每個文件中出現的次數,輸出格式如下:

* hello (a.txt 2,b.txt 1,c.txt 4)

* tom (a.txt 5,b.txt 3)

* 實現方法:采用倒排索引算法並結合jobControll實現

* 本案例中所有的Mapper、Reducer、Job均采用匿名內部類實現

* @author songjq

*

*/

public class IndexStepTwo {

/**

* 第二階段Mapper

* 第二階段Mapper輸入數據為第一階段Reducer輸出到HDFS的數據,格式為

* hello a.txt-->2

* hello b.txt-->1

* 通過第二階段Mapper處理,輸出數據格式為

* <k2> <v2>

* <hello> <a.txt-->2,b.txt-->1>

* @author songjq

*

*/

static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{

private Text tkey = new Text();

private Text tvalue = new Text();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] split = line.split("\t");

if(split.length>1) {

String[] split2 = split[0].split("-->");

tkey.set(split2[0]);

if(split2.length>1) {

tvalue.set(split2[1]+"-->"+split[1]);

context.write(tkey, tvalue);

}

}

}

}

/**

* 第二階段Reducer

* 通過第二階段Reducer處理後,為最終輸出結果,輸出格式為

* <k4> <v4>

* <hello> <(a.txt 2,b.txt 1)>

* @author songjq

*

*/

static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text>{

private Text tval = new Text();

@Override

protected void reduce(Text key, Iterable<Text> values, Context ctx)

throws IOException, InterruptedException {

StringBuffer sb = new StringBuffer();

for(Text value:values) {

sb.append(value+" ");

}

tval.set(sb.toString());

ctx.write(key, tval);

}

}

}

3、利用jobControll來實現依賴任務的提交

package cn.itcast.bigdata.index;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.jobcontrol.JobControl;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.itcast.bigdata.index.IndexStepOne.IndexStepOneMapper;

import cn.itcast.bigdata.index.IndexStepOne.IndexStepOneReducer;

import cn.itcast.bigdata.index.IndexStepTwo.IndexStepTwoMapper;

import cn.itcast.bigdata.index.IndexStepTwo.IndexStepTwoReducer;

/**

* 簡單的job串聯可以使用jobControll來實現 更復雜的job的調度可以考慮用shell腳本來寫,或者幹脆用現成的任務調度工具oozie來做

* 這裏使用簡單的jobControll來實現兩個階段MapReduce任務依賴提交處理

* 由於第二階段的Mapper輸入需要依賴第一階段Reducer的輸出,因此可以利用jobControll來實現第二階段Mapper的等待,直到

* 第一階段Reducer輸出後,第二階段的job才開始提交處理

* 核心方法:

* controlledJob2.addDependingJob(controlledJob1);

* @author songjq

*

*/

public class OnceSubmitClient {

public static void main(String[] args) throws Exception {

// 構造第一階段的基本job對象job1

Configuration conf1 = new Configuration();

Job job1 = Job.getInstance(conf1, "inexStepOne");

job1.setJarByClass(OnceSubmitClient.class);

job1.setMapperClass(IndexStepOneMapper.class);

job1.setReducerClass(IndexStepOneReducer.class);

job1.setMapOutputKeyClass(Text.class);

job1.setMapOutputValueClass(IntWritable.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(LongWritable.class);

FileInputFormat.setInputPaths(job1, new Path(args[0]));

FileOutputFormat.setOutputPath(job1, new Path(args[1]));

// 構造第二階段的基本job對象job2

Configuration conf2 = new Configuration();

Job job2 = Job.getInstance(conf2, "inexStepTwo");

job2.setJarByClass(OnceSubmitClient.class);

job2.setMapperClass(IndexStepTwoMapper.class);

job2.setReducerClass(IndexStepTwoReducer.class);

job2.setMapOutputKeyClass(Text.class);

job2.setMapOutputValueClass(Text.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(Text.class);

// 第二個job的輸出是第一個job的輸入

FileInputFormat.setInputPaths(job2, new Path(args[1]));

FileOutputFormat.setOutputPath(job2, new Path(args[2]));

// ControlledJob是基本的job的封裝

ControlledJob controlledJob1 = new ControlledJob(conf1);

// 將job1封裝到controlledJob1中去

controlledJob1.setJob(job1);

ControlledJob controlledJob2 = new ControlledJob(conf2);

// 將job2封裝到controlledJob2中去

controlledJob2.setJob(job2);

// 先構造一個job控制器

JobControl jobControl = new JobControl("index");

// 指定兩個job之間的依賴關系

controlledJob2.addDependingJob(controlledJob1);

// 向job控制器中添加job

jobControl.addJob(controlledJob1);

jobControl.addJob(controlledJob2);

// 創建一個線程去啟動jobControl

Thread thread = new Thread(jobControl);

thread.start();

// 如果job沒有運行完,主線程就等等

while (!jobControl.allFinished()) {

thread.sleep(500);

}

int succeedSize = jobControl.getSuccessfulJobList().size();

//0正常退出 1異常退出

System.exit(succeedSize == 2 ? 0 : 1);

}

}


2018-08-04 期 MapReduce倒排索引編程案例2(jobControll方式)