1. 程式人生 > >MapReduce 順序組合, 迭代式,組合式,鏈式

MapReduce 順序組合, 迭代式,組合式,鏈式

1、順序組合式

順序組合式就是按照指定順序執行任務如:mapreduce1 --> mapreduce2 --> mapreduce3

即:mapreduce1的輸出是mapreduce2的輸入,mapreduce2的輸出式mapreduce3的輸入

程式碼片段如下:

Java程式碼  收藏程式碼
  1. String inPath1 = "hdfs://hadoop0:9000/user/root/3D/";  
  2.         String outPath1 = "hdfs://hadoop0:9000/user/root/3DZout/";  
  3.         String outPath2 = "hdfs://hadoop0:9000/user/root/3DZout2/"
    ;  
  4.         String outPath3 = "hdfs://hadoop0:9000/user/root/3DZout3/";   
  5.         // job1配置  
  6.         Job job1 = Job.getInstance(conf);  
  7.         job1.setJarByClass(Mode.class);  
  8.         job1.setMapperClass(Map1.class);  
  9.         job1.setReducerClass(Reduce1.class);  
  10.         job1.setMapOutputKeyClass(Text.class
    );  
  11.         job1.setMapOutputValueClass(IntWritable.class);  
  12.         job1.setOutputKeyClass(Text.class);  
  13.         job1.setOutputValueClass(IntWritable.class);  
  14.         FileInputFormat.addInputPath(job1, new Path(inPath1));  
  15.         FileOutputFormat.setOutputPath(job1, new Path(outPath1));  
  16.         job1.waitForCompletion(true);  
  17.         // job2配置  
  18.         Job job2 = Job.getInstance(conf);  
  19.         job2.setJarByClass(Mode.class);  
  20.         job2.setMapperClass(Map2.class);  
  21.         job2.setReducerClass(Reduce2.class);  
  22.         job2.setMapOutputKeyClass(Text.class);  
  23.         job2.setMapOutputValueClass(IntWritable.class);  
  24.         job2.setOutputKeyClass(Text.class);  
  25.         job2.setOutputValueClass(IntWritable.class);  
  26.         FileInputFormat.addInputPath(job2, new Path(inPath1));  
  27.         FileOutputFormat.setOutputPath(job2, new Path(outPath2));  
  28.         job2.waitForCompletion(true);  
  29.         // job3配置  
  30.         Job job3 = Job.getInstance(conf);  
  31.         job3.setJarByClass(Mode.class);  
  32.         job3.setMapperClass(Map3.class);  
  33.         job3.setReducerClass(Reduce3.class);  
  34.         job3.setMapOutputKeyClass(Text.class);  
  35.         job3.setMapOutputValueClass(IntWritable.class);  
  36.         job3.setOutputKeyClass(Text.class);  
  37.         job3.setOutputValueClass(IntWritable.class);  
  38.         FileInputFormat.addInputPath(job3, new Path(outPath2));  
  39.         FileOutputFormat.setOutputPath(job3, new Path(outPath3));  
  40.         job3.waitForCompletion(true);  

子任務作業配置程式碼執行後,將按照順序逐個執行每個子任務作業。由於後一個子任務需要使用前一個子任務的輸出資料,因此,每一個子任務

都需要等前一個子任務執行執行完畢後才允許執行,這是通過job.waitForCompletion(true)方法加以保證的。

2、迭代組合式

迭代也可以理解為for迴圈或while迴圈,當滿足某些條件時,迴圈結束

mapreduce的迭代演算法正在研究中,後續提供完整原始碼....

程式碼如下:

3、複雜的依賴組合式

處理複雜的要求的時候,有時候一個mapreduce程式完成不了,往往需要多個mapreduce程式 這個時候就牽扯到各個任務之間的依賴關係,

所謂依賴就是一個M/R job的處理結果是另外一個M/R的輸入,以此類推,

這裡的順序是 job1 和 job2 單獨執行, job3依賴job1和job2執行後的結果

程式碼如下:

Java程式碼  收藏程式碼
  1. package com.hadoop.mapreduce;  
  2. import java.io.IOException;  
  3. import java.util.StringTokenizer;  
  4. import org.apache.hadoop.conf.Configuration;  
  5. import org.apache.hadoop.fs.FileSystem;  
  6. import org.apache.hadoop.fs.Path;  
  7. import org.apache.hadoop.io.IntWritable;  
  8. import org.apache.hadoop.io.Text;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10. import org.apache.hadoop.mapreduce.Mapper;  
  11. import org.apache.hadoop.mapreduce.Reducer;  
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  13. import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;  
  14. import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;  
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  16. import org.apache.hadoop.util.GenericOptionsParser;  
  17. public class Mode {  
  18.     // 第一個Job  
  19.     public static class Map1 extends Mapper<Object, Text, Text, IntWritable>{  
  20.         Text word = new Text();  
  21.         @Override  
  22.         protected void map(Object key, Text value,Context context)  
  23.                 throws IOException, InterruptedException {  
  24.             StringTokenizer st = new StringTokenizer(value.toString());  
  25.             while(st.hasMoreTokens()){  
  26.                 word.set(st.nextToken());  
  27.                 context.write(word, new IntWritable(1));  
  28.             }  
  29.         }  
  30.     }  
  31.     public static class Reduce1 extends Reducer<Text, IntWritable, Text, IntWritable>{  
  32.         IntWritable result = new IntWritable();  
  33.         @Override  
  34.         protected void reduce(Text key, Iterable<IntWritable> values,Context context)  
  35.                 throws IOException, InterruptedException {  
  36.             int sum = 0;  
  37.             for(IntWritable val : values){  
  38.                 sum += val.get();  
  39.             }  
  40.             result.set(sum);  
  41.             context.write(key, result);  
  42.         }  
  43.     }  
  44.     // 第二個Job  
  45.     public static class Map2 extends Mapper<Object, Text, Text, IntWritable>{  
  46.         Text word = new Text();  
  47.         @Override  
  48.         protected void map(Object key, Text value,Context context)  
  49.                 throws IOException, InterruptedException {  
  50.             StringTokenizer st = new StringTokenizer(value.toString());  
  51.             while(st.hasMoreTokens()){  
  52.                 word.set(st.nextToken());  
  53.                 context.write(word, new IntWritable(1));  
  54.             }  
  55.         }  
  56.     }  
  57.     public static class Reduce2 extends Reducer<Text, IntWritable, Text, IntWritable>{  
  58.         IntWritable result = new IntWritable();  
  59.         @Override  
  60.         protected void reduce(Text key, Iterable<IntWritable> values,Context context)  
  61.                 throws IOException, InterruptedException {  
  62.             int sum = 0;  
  63.             for(IntWritable val : values){  
  64.                 sum += val.get();  
  65.             }  
  66.             result.set(sum);  
  67.             context.write(key, result);  
  68.         }  
  69.     }  
  70.     // 第三個Job  
  71.     public static class Map3 extends Mapper<Object, Text, Text, IntWritable>{  
  72.         Text word = new Text();  
  73.         @Override  
  74.         protected void map(Object key, Text value,Context context)  
  75.                 throws IOException, InterruptedException {  
  76.             StringTokenizer st = new StringTokenizer(value.toString());  
  77.             while(st.hasMoreTokens()){  
  78.                 word.set(st.nextToken());  
  79.                 context.write(word, new IntWritable(1));  
  80.             }  
  81.         }  
  82.     }  
  83.     public static class Reduce3 extends Reducer<Text, IntWritable, Text, IntWritable>{  
  84.         IntWritable result = new IntWritable();  
  85.         @Override  
  86.         protected void reduce(Text key, Iterable<IntWritable> values,Context context)  
  87.                 throws IOException, InterruptedException {  
  88.             int sum = 0;  
  89.             for(IntWritable val : values){  
  90.                 sum += val.get();  
  91.             }  
  92.             result.set(sum);  
  93.             context.write(key, result);  
  94.         }  
  95.     }  
  96.     public static void main(String[] args) throws IOException{  
  97.         String inPath1 = "hdfs://hadoop0:9000/user/root/3D/";  
  98.         String outPath1 = "hdfs://hadoop0:9000/user/root/3DZout/";  
  99.         String outPath2 = "hdfs://hadoop0:9000/user/root/3DZout2/";  
  100.         String outPath3 = "hdfs://hadoop0:9000/user/root/3DZout3/";  
  101.         String[] inOut = {inPath1, outPath1};  
  102.         Configuration conf = new Configuration();  
  103.         String[] otherArgs = new GenericOptionsParser(conf, inOut).getRemainingArgs();  
  104.         if (otherArgs.length < 2) {  
  105.             System.err.println("Usage: wordcount <in> [<in>...] <out>");  
  106.             System.exit(2);  
  107.         }  
  108.         // 判斷輸出路徑是否存在,如存在先刪除  
  109.         FileSystem hdfs = FileSystem.get(conf);  
  110.         Path findFile = new Path(outPath1);  
  111.         boolean isExists = hdfs.exists(findFile);  
  112.         if(isExists){  
  113.             hdfs.delete(findFile, true);  
  114.         }  
  115.         if(hdfs.exists(new Path(outPath2))){  
  116.             hdfs.delete(new Path(outPath2), true);  
  117. 相關推薦

    斐波那契數列的遞迴(迴圈)通項公式三種實現

    謂Fibonacci數列是指這樣一種數列,它的前兩項均為1,從第三項開始各項均為前兩項之和。用數學公式表示出來就是:           1                            (n=1,2)fib(n)=           fib(n-1)+fib(n-

    MapReduce 順序組合 組合式

    1、順序組合式 順序組合式就是按照指定順序執行任務如:mapreduce1 --> mapreduce2 --> mapreduce3 即:mapreduce1的輸出是mapreduce2的輸入,mapreduce2的輸出式mapreduce3

    Python基礎(8):python中的特性進階篇(列表生成生成器器)

    python中還包括一些高階特性,以下簡單介紹。 迭代 定義:用for迴圈來遍歷物件的過程,叫做迭代。 作用物件:可迭代物件 如何判斷是否為可迭代物件:isinstance(xxx,Iterable),Iterable型別來源於collections模組。 應用場景: 1

    python 可序列(列表元組字串)實現鄰近去重順序不變

    碼字不易,轉載請標明出處… 鄰近去重程式碼實現如下: def special_func_order(seq): list_ = [] # 定義一個空列表,用來儲存判斷後的資料 for i in range(len(seq) - 1): # 假如 le

    模版方法模式器模式組合模式狀態模式代理模式

    1.模版方法模式:在一個方法中定義一個演算法的骨架,而將一些步驟延遲到子類中,模版方法使得子類可以在不改變演算法結構的情況下,重新定義演算法中的某些步驟,還可以提供hook()讓子類決定是否執行某些步驟。比如sort中的Comparable介面。 2.迭代器模式就是集合的迭代器 3.組合模式

    推導模組

    推導表示式 推導表示式相對於for迴圈來處理資料,要更加的方便 列表推導表示式使用更加的廣泛 列表推導 迴圈新增: li = [ ] for i in range(1,11): #左閉右開 li.append(i) #[1,2,3,4,5,6,7,8,

    python的切片列表生成

    python的程式碼需要有優雅,明確,簡單的特性。代表著需要程式碼越少越好,越簡單越好。為此,python提供了許多高階特性,如切片,迭代,列表生成式等等,可以有效的用簡單程式碼實現複雜功能。 1,切片 適用型別:list, tuple, str 切片功能簡單說就是實現擷取,不管是列表list,不可變列

    Python五種方式 for迴圈列表推導內建函式map()生成器推導生成器函式 速度對比

    對比了Python3的五種迭代方式進行函式簡單計算的花費時間 五種迭代分別是,for迴圈,列表推導式,內建函式map(),生成器推導式,生成器函式 簡單計算以add()加10操作和abs()絕對值舉例

    python列表生成生成器

    列表生成式:也叫推導,使程式碼更簡潔1.列表推導式:      a= [x for x in range(100) if x % 3 == 0]2.字典推導式     #快速更換key和value  字典推導和列表推導的使用方法是類似的,中括號該改成大括號。      y =

    微信約戰炸金花棋牌平臺出租Java普通碼塊構造碼塊靜態碼塊區別執行順序碼實例

    屬性 java 對象 ... 沒有 每次 class string eat 除了說微信約戰炸金花棋牌平臺出租( h5.super-mans.com Q:2012035031)普通代碼塊,靜態代碼塊,構造代碼塊的執行順序外,還有靜態方法,靜態變量等,都放在一起的話,這個

    用程式碼來解釋可生成器的區別

    一. 創造器(creator) 這是我自己造的一個名詞,因為在python術語中,對只實現了__next__()方法的物件,好像沒有任何名分,為了說明,我將只實現了__next__()方法的物件稱為創造器(creator)。 class O_Next: def __init__(se

    day011 函式名的運用閉包

    主要內容: 1.函式名的使用以及第一類物件 2.閉包 3.迭代器一、函式名的運用 函式名就是變數名,命名規則與變數名一樣。 函式名儲存的是函式的記憶體地址。 1、檢視函式名的記憶體地址 """python def inf(): print("疏影"

    C++ 使用Vector容器查詢插入去重 用法總結

    返回最後一個元素: return v.back(); 迭代器: for (std::vector<int>::iterator it = v.begin(); it != v.end(); it++) {

    遞迴和遍歷

      遞迴   如果一個函式在內部呼叫自身本身,這個函式就是遞迴函式。   條件:必須要有收斂條件和遞迴公式。   特性:1.必須有一個明確的結束條件。      2.每次進入更深一層遞迴時,問題規模相比賞析遞迴都應有所減少。      3.遞迴效率不高,遞迴層次過多會導致棧溢位(遞迴最大999層)。

    Python生成器器,可物件

                  在瞭解Python的資料結構時,容器(container)、可迭代物件(iterable)、迭代器(iterator)、生成器(generator)、列表/集合/字典推導式(list,set,dic

    python3.5進階(三)-------------實現多工之協程(生成器器)

    1.迭代器:迭代是訪問集合元素的一種方式,迭代器是可以記住遍歷的位置的物件,迭代器物件從集合的第一個元素開始訪問,直到所有訪問結束,迭代器只能前進不能後退。判斷一個數據型別是否可以迭代,看是否能for迴圈。如(字串,列表,元祖...)序列可以迭代,數字不能迭代,或通過isintance([11,12

    vector容器空間介面卡三個類方法的實現

    C++的STL庫有一個容器叫vector,這個容器底層的資料結構是一個記憶體可以自動增長的陣列,每次當陣列儲存滿了以後,記憶體可以自動增加兩倍,請完成vector容器、迭代器和空間配置器三個類方法的實現。 #include<iostream> using namespace

    空間配置器容器介面卡

    一、空間配置器 下面先總體介紹一下空間配置器。空間配置器的作用是在底層為上層的各種容器提供儲存空間,需要多少分配多少,一般分配的比你需要的更多。打個不恰當的比喻,空間配置器和容器之間的關係,相當於將軍和糧草的關係。當然了,容器相當於將軍,它在陣前殺敵,衝鋒陷陣,處理各種事情;而空間配置器

    生成器器和裝飾器

    1.生成器 解析器在實時生成資料,資料不會駐留在記憶體中。因此,其執行效率很高! yield 是一個類似 return 的關鍵字,只是這個函式返回的是個生成器 當你呼叫這個函式的時候,函式內部的程式碼並不立即執行 ,這個函式只是返回一個生成器物件 當你使用for進行迭代的時候,函式中的程