MapReduce的典型程式設計場景2
1.MapReduce 多 Job 串聯
介紹:一個稍複雜點的處理邏輯往往需要多個 MapReduce 程式串聯處理,多 job 的串聯可以藉助MapReduce 框架的 JobControl 實現。
需求:
以下有兩個 MapReduce 任務,分別是 Flow 的 SumMR 和 SortMR,其中有依賴關係:SumMR的輸出是 SortMR 的輸入,所以 SortMR 的啟動得在 SumMR 完成之後
這兩個程式在:http://blog.51cto.com/14048416/2342024
如何實現兩個程式碼的依賴關係呢?
程式碼實現(這裡只給出多 Job 串聯的程式碼)
public class JobDecy { public static void main(String[] args) { Configuration conf = new Configuration(true); conf.set("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); System.setProperty("HADOOP_USER_NAME", "hadoop"); try { //job1 FlowSum Job job1 = Job.getInstance(conf); job1.setJobName("FlowSum"); //設定任務類 job1.setJarByClass(FlowSum.class); //設定Mapper Reducer Combine job1.setMapperClass(FlowSum.MyMapper.class); job1.setReducerClass(FlowSum.MyReducer.class); job1.setCombinerClass(FlowSum.FlowSumCombine.class); //設定map 和reduce 的輸入輸出型別 job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); job1.setOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); // 指定該 mapreduce 程式資料的輸入和輸出路徑 Path input1 = new Path("/data/input"); Path output1 = new Path("/data/output"); //一定要保證output不存在 if (output1.getFileSystem(conf).exists(output1)) { output1.getFileSystem(conf).delete(output1, true); //遞迴刪除 } FileInputFormat.addInputPath(job1, input1); FileOutputFormat.setOutputPath(job1, output1); //Job2 FlowSumSort Job job2= Job.getInstance(conf); job2.setJarByClass(FlowSumSort.class); job2.setJobName("FlowSumSort"); job2.setMapperClass(Mapper.class); job2.setReducerClass(Reducer.class); job2.setOutputKeyClass(FlowBean.class); job2.setOutputValueClass(NullWritable.class); // 指定該 mapreduce 程式資料的輸入和輸出路徑 Path input2=new Path("//data/output"); Path output2 =new Path("/data/output1"); //一定要保證output不存在 if(output2.getFileSystem(conf).exists(output2)){ output2.getFileSystem(conf).delete(output2,true); //遞迴刪除 } FileInputFormat.addInputPath(job2,input2); FileOutputFormat.setOutputPath(job2,output2); //為每個任務建立ControlledJob ControlledJob job1_cj=new ControlledJob(job1.getConfiguration()); ControlledJob job2_cj=new ControlledJob(job2.getConfiguration()); //繫結 job1_cj.setJob(job1); job2_cj.setJob(job2); // 設定作業依賴關係 job2_cj.addDependingJob(job2_cj); //job2 依賴於job1 //建立jobControl JobControl jc=new JobControl("sum and sort"); jc.addJob(job1_cj); jc.addJob(job2_cj); //使用執行緒開啟Job Thread jobThread=new Thread(jc); //開啟任務 jobThread.start(); //為了保證主程式不終止,沒0.5秒檢查一次是否完成作業 while(!jc.allFinished()){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } //罪作業完成之後,終止執行緒,釋放資源 jc.stop(); } catch (IOException e) { e.printStackTrace(); } } }
2.TopN 演算法實現(二次排序)
需求:求每個班級的總分最高的前三名
欄位:班級 姓名 數學 語文 英語 (欄位之間是製表符分割)
分析:
- 利用“班級和總分”作為 key,可以將 map 階段讀取到的所有學生成績資料按照班級和成績排倒序,傳送到 reduce
- 在 reduce 端利用 GroupingComparator 將班級相同的 kv 聚合成組,然後取前三個即是前三名
程式碼實現:
自定義學生類:
public class Student implements WritableComparable<Student> { private String t_class; private String t_name; private int t_sumSource; public Student(){ } public void set(String t_class,String t_name,int chinese,int math,int english){ this.t_class=t_class; this.t_name=t_name; this.t_sumSource=chinese+math+english; } public String getT_class() { return t_class; } public void setT_class(String t_class) { this.t_class = t_class; } public String getT_name() { return t_name; } public void setT_name(String t_name) { this.t_name = t_name; } public int getT_sumSource() { return t_sumSource; } public void setT_sumSource(int t_sumSource) { this.t_sumSource = t_sumSource; } //比較規則 @Override public int compareTo(Student stu) { //首先根據班級比較 int result1=this.t_class.compareTo(stu.t_class); //班級相同的在根據總分比較 if(result1==0){ return stu.t_sumSource-this.t_sumSource; } return result1; } //序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.t_class); out.writeUTF(this.t_name); out.writeInt(this.t_sumSource); } //反序列化 @Override public void readFields(DataInput in) throws IOException { this.t_class=in.readUTF(); this.t_name=in.readUTF(); this.t_sumSource=in.readInt(); } }
自定義分組:
//自定義分組規則 private static class MyGroupComparator extends WritableComparator{ //這句程式碼必須要加,並且要呼叫父類的構造 public MyGroupComparator(){ super(Student.class, true); } / 決定輸入到 reduce 的資料的分組規則 根據班級進行分組 / @Override public int compare(WritableComparable a, WritableComparable b) { Student stu1=(Student)a; Student stu2=(Student)a; return stu1.getTclass().compareTo(stu2.getTclass()); } }*
MR程式:
//Mapper
private static class MyMapper extends Mapper<LongWritable, Text, Student, NullWritable> {
Student bean = new Student();
NullWritable mv = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("\\s+");
//班級 姓名 數學 語文 英語
String t_clas=fields[0];
String t_name=fields[1];
int chinese=Integer.parseInt(fields[2]);
int math=Integer.parseInt(fields[3]);
int english=Integer.parseInt(fields[4]);
bean.set(t_clas,t_name,chinese,math,english);
context.write(bean,mv);
}
}
//Reducer
private static class MyReducer extends Reducer<Student, NullWritable, Student, NullWritable> {
@Override
protected void reduce(Student key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
int count =0;
for(NullWritable value:values){
if(count>2){
break;
}
context.write(key,value);
count++;
}
}
}
job:
public class ClazzScoreGroupComparator {
public static void main(String[] args) {
Configuration conf=new Configuration(true);
conf.set("fs.defaultFS","hdfs://zzy:9000");
conf.set("fs.defaultFS", "hdfs://zzy:9000");
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
Job job= Job.getInstance(conf);
job.setJarByClass(ClazzScoreGroupComparator.class);
job.setJobName("ClazzScoreGroupComparator");
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//指定自定義分組
job.setGroupingComparatorClass(MyGroupComparator.class);
job.setOutputKeyClass(Student.class);
job.setOutputValueClass(NullWritable.class);
// 指定該 mapreduce 程式資料的輸入和輸出路徑
Path input=new Path("//data/student.txt");
Path output =new Path("/data/output2");
//一定要保證output不存在
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output,true); //遞迴刪除
}
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output);
boolean success=job.waitForCompletion(true);
System.exit(success?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
3. MapReduce 全域性計數器
介紹:計數器是用來記錄 job 的執行進度和狀態的。它的作用可以理解為日誌。我們可以在程式的某個位置插入計數器,記錄資料或者進度的變化情況,MapReduce 自帶了許多預設 Counter,現在我們來分析這些預設 Counter 的含義,方便大家觀察 Job 結果,如輸入的位元組數、輸出的位元組數、Map 端輸入/輸出的位元組數和條數、Reduce 端的輸入/輸出的位元組數和條數等。
需求:利用全域性計數器來統計一個目錄下所有檔案出現的單詞總數和總行數
程式碼實現:
public class CounterWordCount {
public static void main(String[] args) {
Configuration conf=new Configuration(true);
conf.set("fs.defaultFS","hdfs://zzy:9000");
conf.set("fs.defaultFS", "hdfs://zzy:9000");
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
Job job= Job.getInstance(conf);
job.setJarByClass(CounterWordCount.class);
job.setJobName("CounterWordCount");
job.setMapperClass(MyMapper.class);
//設定reduceTask為0
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 指定該 mapreduce 程式資料的輸入和輸出路徑
Path input=new Path("//data/");
Path output =new Path("/data/output3");
//一定要保證output不存在
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output,true); //遞迴刪除
}
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output);
boolean success=job.waitForCompletion(true);
System.exit(success?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
//定義列舉 用於存放計數器
enum CouterWordsCounts{COUNT_WORDS, COUNT_LINES}
//Mapper
private static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
Text mk=new Text();
LongWritable mv=new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 統計行數,因為預設讀取文字是逐行讀取,所以 map 執行一次,行數+1
context.getCounter(CouterWordsCounts.COUNT_LINES).increment(1);
String words[]=value.toString().split("\\s+");
for(String word:words){
context.getCounter(CouterWordsCounts.COUNT_WORDS).increment(1);
}
}
//這個方法,在這個類的最後執行
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mk.set("行數:");
mv.set(context.getCounter(CouterWordsCounts.COUNT_LINES).getValue());
context.write(mk,mv);
mk.set("單詞數:");
mv.set(context.getCounter(CouterWordsCounts.COUNT_WORDS).getValue());
context.write(mk,mv);
}
}
}
4.MapReduce Join
介紹:在各種實際業務場景中,按照某個關鍵字對兩份資料進行連線是非常常見的。如果兩份資料都比較小,那麼可以直接在記憶體中完成連線。如果是大資料量的呢?顯然,在記憶體中進行連線會發生 OOM。MapReduce 可以用來解決大資料量的連線。在MapReduce join分兩種,map join和reduce join
map join
介紹:MapJoin 適用於有一份資料較小的連線情況。做法是直接把該小份資料直接全部載入到記憶體當中,按連結關鍵字建立索引。然後大份資料就作為 MapTask 的輸入,對 map()方法的每次輸入都去記憶體當中直接去匹配連線。然後把連線結果按 key 輸出.。
資料介紹:
movies.dat:1::Toy Story (1995)::Animation|Children's|Comedy
欄位含義:movieid, moviename, movietype
Ratings.dat:1::1193::5::978300760
欄位含義:userid, movieid, rate, timestamp
程式碼實現:
public class MovieRatingMapJoinMR {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
Job job = Job.getInstance(conf);
job.setJarByClass(MovieRatingMapJoinMR.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
String minInput = args[0];
String maxInput = args[1];
String output = args[2];
FileInputFormat.setInputPaths(job, new Path(maxInput));
Path outputPath = new Path(output);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
//將小表載入到記憶體
URI uri=new Path(minInput).toUri();
job.addCacheFile(uri);
boolean status = job.waitForCompletion(true);
System.exit(status?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
private static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
Text mk = new Text();
Text mv = new Text();
// 用來儲存小份資料的所有解析出來的 key-value
private static Map<String, String> movieMap = new HashMap<String, String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//讀取載入到記憶體的表資料,並將資料的封裝到movieMap容器中
URI[] cacheFiles = context.getCacheFiles();
//獲取檔名
String myfilePath = cacheFiles[0].toString();
BufferedReader br = new BufferedReader(new FileReader(myfilePath));
// 此處的 line 就是從檔案當中逐行讀到的 movie
String line = "";
while ((line = br.readLine()) != null) {
//movieid::moviename::movietype
String fields[] = line.split("::");
movieMap.put(fields[0], fields[1] + "\\t" + fields[2]);
}
IOUtils.closeStream(br);
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("::");
//userid::movieid::rate::timestamp
String userid = fields[0];
String movieid = fields[1];
String rate = fields[2];
String timestamp = fields[3];
if (movieMap.containsKey(userid)) {
String movieFileds = movieMap.get(userid);
mk.set(userid);
mv.set(movieFileds + "\\t" + movieid + "\\t" + rate + "\\t" + timestamp);
context.write(mk, mv);
}
}
}
}
reduce join
介紹:
- map 階段,兩份資料 data1 和 data2 會被 map 分別讀入,解析成以連結欄位為 key 以查詢欄位為 value 的 key-value 對,並標明資料來源是 data1 還是 data2。
- reduce 階段,reducetask 會接收來自 data1 和 data2 的相同 key 的資料,在 reduce 端進行乘積連結,最直接的影響是很消耗記憶體,導致 OOM
資料介紹:
movies.dat:1::Toy Story (1995)::Animation|Children's|Comedy
欄位含義:movieid, moviename, movietype
Ratings.dat:1::1193::5::978300760
欄位含義:userid, movieid, rate, timestamp
程式碼實現:
public class MovieRatingReduceJoinMR {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://zzy:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
Job job = Job.getInstance(conf);
job.setJarByClass(MovieRatingReduceJoinMR.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
String Input = args[0];
String output = args[1];
FileInputFormat.setInputPaths(job, new Path(Input));
Path outputPath = new Path(output);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
boolean status = job.waitForCompletion(true);
System.exit(status?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
private static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
private String name;
Text mk = new Text();
Text mv = new Text();
//獲取檔名
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//InputSplit是一個抽象類,使用它的實現類FileSplit
FileSplit is=(FileSplit)context.getInputSplit();
name=is.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//movies.dat movieid::moviename::movietype
//ratings.dat userid::movieid::rate::timestamp
String OutputKey=null;
String OutputValue=null;
String fields[]=value.toString().split("::");
if(name.endsWith("movies.dat")){
OutputKey=fields[0];
OutputValue=fields[1]+"\t"+fields[2]+"_"+"movies";
}else if(name.endsWith("ratings.dat")){
OutputKey=fields[1];
OutputValue=fields[0]+"\t"+fields[2]+"\t"+fields[3]+"_"+"ratings";
}
mk.set(OutputKey);
mv.set(OutputValue);
context.write(mk,mv);
}
}
//Reducer
private static class MyReducer extends Reducer< Text, Text, Text, Text>{
Text rv=new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<String> movies=new ArrayList<>();
List<String> ratings=new ArrayList<>();
//將資料分別新增到存放兩張表字段的容器中
for(Text value:values){
String fields[]= value.toString().split("_");
if(fields[1].equals("movies")){
movies.add(fields[0]);
}else if(fields[1].equals("ratings")){
ratings.add(fields[0]);
}
}
//連線兩個表的資料
if(ratings.size()>0&&movies.size()>0){
for(String movie:movies){
for(String rate:ratings){
rv.set(movie+"\t"+rate);
context.write(key,rv);
}
}
}
}
}
}