1. 程式人生 > >MapReduce資料處理兩表join連線

MapReduce資料處理兩表join連線

現在這裡有兩個text文件,需要把它合併成一個文件,並且裡面的資料不能有冗餘..

  1. <strong><span style="color:#FF0000;">info.txt檔案:</span></strong>  
  2. 1003 kaka  
  3. 1004 da  
  4. 1005 jue  
  5. 1006 zhao  
  1. <span style="color:#FF0000;"><strong>cpdata.txt檔案:</strong></span>  
  2. 201001 1003 abc  
  3. 201002 1005 def  
  4. 201003 1006 ghi  
  5. 201004 1003 jkl  
  6. 201005 1004 mno  
  7. 201006 1005 pqr  
  8. 201001 1003 abc  
  9. 201004 1003 jkl  
  10. 201006 1005 mno  
  11. 200113 1007 zkl  

生成檔案:

  1. 1003    201001 abc kaka  
  2. 1003    201004 jkl kaka  
  3. 1004    201005 mno da  
  4. 1005    201002 def jue  
  5. 1005    201006 pqr jue  
  6. 1005    201006 mno jue  
  7. 1006    201003 ghi zhao  

這裡先申明下,這個純屬個人想法,如果有跟好的方法可以告訴我

   因為info.txt文件的第一個欄位與cpdata.txt的第二個欄位是相同的,所以我把他們做為key值,這樣通過Map他們就會組合了.去冗餘,

              主要是用了個List記錄已經讀取過的變數,如果有一樣的就不讀取了.

程式碼如下:

  1. publicclass Advanced extends Configured implements Tool {  
  2.     publicstaticclass AdMap extends Mapper<LongWritable, Text, Text, TextPair>{  
  3.         @Override
  4.         protectedvoid map(LongWritable key, Text value, Context context)  
  5.                 throws IOException, InterruptedException {  
  6.             // TODO Auto-generated method stub
  7. //          String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
  8.             Text word = new Text();  
  9.             String line = value.toString();  
  10.             String[] childline = line.split(" ");    //以空格擷取
  11.                    //判斷是哪一張表,其實個人覺得這樣判斷還不合理,可以使用上面注視掉的獲取路徑值來判斷
  12.                                if(childline.length == 3){  
  13.                      TextPair pair = new TextPair();  
  14.                      pair.setFlag("0");         //這是個標識   0.表示 cpdata.txt     1表示info.txt
  15.                      pair.setKey(childline[1]);  
  16.                      pair.setValue(childline[0]+" "+childline[2]);  
  17.                      pair.setContent(pair.toString());  
  18.                      word.clear();  
  19.                      word.set(pair.getKey());  
  20.                                          context.write(word, pair);        //傳遞一個物件要實現WritableComparable介面
  21.                 }else{  
  22.                     TextPair pair = new  TextPair();  
  23.                     pair.setFlag("1");  
  24.                     pair.setKey(childline[0]);  
  25.                     pair.setValue(childline[1]);  
  26.                     pair.setContent(pair.toString());  
  27.                     word.clear();  
  28.                     word.set(pair.getKey());  
  29.                     context.write(word, pair);  
  30.                 }  
  31.         }  
  32.     }  
  33.     publicstaticclass AdReduce extends Reducer<Text, TextPair, Text, Text>{  
  34.         @Override
  35.         publicvoid reduce(Text key, Iterable<TextPair> values,  
  36.                 Context context)  
  37.                 throws IOException, InterruptedException {  
  38.             // TODO Auto-generated method stub
  39.                         //list0裝載的都是cpdata的資料,list1裝載的是info的資料
  40.                         List<Text> list0 = new ArrayList<Text>();       
  41.             List<Text> list1 = new ArrayList<Text>();  
  42.             Iterator<TextPair> it = values.iterator();  
  43.             TextPair pair = new TextPair();  
  44.             while(it.hasNext()){  
  45.                     pair = it.next();  
  46.                 if("1".equals(pair.getFlag()))  
  47.                     list1.add(new Text(pair.getValue()));  
  48.                 else
  49.                     list0.add(new Text(pair.getValue()));  
  50.             }  
  51.             List<Text> sublist = new ArrayList<Text>();       //sublist用來新增已經寫過的資料,然後再判斷,如果存在就不用操作
  52.             for(int i = 0 ; i<list1.size(); i++){  
  53.                 for(int j = 0 ;j<list0.size();j++){  
  54.                     if(!sublist.contains(list0.get(j))){  
  55.                         sublist.add(list0.get(j));  
  56.                         context.write(key, new Text(list0.get(j)+" " +list1.get(i)));  
  57.                     }  
  58.                 }  
  59.             }  
  60.         }  
  61.     }  
  62.     /** 
  63.      * @param args 
  64.      */
  65.     publicstaticvoid main(String[] args) {  
  66.         try {  
  67.                        int res = ToolRunner.run(new Configuration(), new Advanced(), args);  
  68.             System.exit(res);  
  69.         } catch (Exception e) {  
  70.             // TODO Auto-generated catch block
  71.             e.printStackTrace();  
  72.         }  
  73.     }  
  74.     @Override
  75.     publicint run(String[] args) throws Exception {  
  76.         Configuration conf = new Configuration();  
  77.         FileSystem fs = FileSystem.get(conf);  
  78.         if(fs.exists(new Path(args[2]))){  
  79.                         //如果檔案已近存在就刪除檔案
  80. //          System.out.println("error : file is exists");
  81. //          System.exit(-1);
  82.             fs.delete(new Path(args[2]), true);  
  83.         }  
  84.         Job job = new Job(conf , "Advanced");  
  85.         job.setJarByClass(Advanced.class