1. 程式人生 > >並行作業3:在eclipse中開發MapReduce程式

並行作業3:在eclipse中開發MapReduce程式

在eclipse中開發MapReduce程式

系統採用vm下ubuntu16.04

一、eclipse安裝(參考我的其它部落格)

二、eclipse配置

1、下載hadoop-eclipse-plugin-2.7.3.jar外掛,並將其拖到虛擬機器桌面

2、將其移動到/usr/local/java/ide/eclipse/plugins目錄下

cd ~/桌面
sudo mv hadoop-eclipse-plugin-2.7.3.jar /usr/local/java/ide/eclipse/plugins

3、重啟eclipse

4、切換檢視,點選右上角小象

5、點選右下帶加號小象

6、點選配置檔案進行配置

Localtion name:hadoopTest
Map/Reduce Master:
    host:localhost
    Port:9001
    
//這裡選中Use M/R Master host按鈕
DFS Master:
    host:localhost
    Port:9000

username:hk

7、左側專案欄

DFS Locations
    |--hadoopTest
        |--資料夾(0)

8、命令列輸入,離開安全模式,以便在eclipse內可以直接懟hdfs目錄下檔案進行操作。

hadoop dfsadmin  -safemode leave

9、新建上傳程式碼相關檔案

DFS Locations
    |--hadoopTest
        |--(1)
            |--user(1)
                |--hk(1)
                    |--sort_in(3)
                        |--file1.txt
                        |--file2.txt
                        |--file3.txt

三、MapReduce程式

1、例項描述

對輸入檔案中資料進行排序。輸入檔案中的每行內容均為一個數字,即一個數據。要求在輸出中每行有兩個間隔的數字,其中,第一個代表原始資料在原始資料集中的位次,第二個代表原始資料。

(1)file1.txt:
2
32
654
32
15
756
65223
(2)file2.txt:
5956
22
650
92
(3)file3.txt:
26
54
6
期望輸出
1    2
2    6
3    15
4    22
5    26
6    32
7    32
8    54
9    92
10    650
11    654
12    756
13    5956
14    65223

2、設計思路

這個例項僅僅要求對輸入資料進行排序,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個預設的排序,而不需要自己再實現具體的排序呢?答案是肯定的。
但是在使用之前首先需要了解它的預設排序規則。它是按照key值進行排序的,如果key為封裝int的IntWritable型別,那麼MapReduce按照數字大小對key排序,如果key為封裝為String的Text型別,那麼MapReduce按照字典順序對字串排序。
瞭解了這個細節,我們就知道應該使用封裝int的IntWritable型資料結構了。也就是在map中將讀入的資料轉化成 IntWritable型,然後作為key值輸出(value任意)。reduce拿到<key,value-list>之後,將輸入的 key作為value輸出,並根據value-list中元素的個數決定輸出的次數。輸出的key(即程式碼中的linenum)是一個全域性變數,它統計當前key的位次。需要注意的是這個程式中沒有配置Combiner,也就是在MapReduce過程中不使用Combiner。這主要是因為使用map和reduce就已經能夠完成任務了。

3、程式程式碼

package edu.hk.sort;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class Sort {
    //map將輸入中的value化成IntWritable型別,作為輸出的key
    public static class Map extends
    	Mapper<Object,Text,IntWritable,IntWritable>{
        private static IntWritable data=new IntWritable();
        //實現map函式
        public void map(Object key,Text value,Context context)
                throws IOException,InterruptedException{
            String line=value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
        }
    }
    //reduce將輸入中的key複製到輸出資料的key上,
    //然後根據輸入的value-list中元素的個數決定key的輸出次數
    //用全域性linenum來代表key的位次
    public static class Reduce extends
            Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
        private static IntWritable linenum = new IntWritable(1);
        //實現reduce函式
        public void reduce(IntWritable key,Iterable<IntWritable> values,Context context)
                throws IOException,InterruptedException{
            for(IntWritable val:values){
                context.write(linenum, key);
                linenum = new IntWritable(linenum.get()+1);
            }
        }
    }
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        //這句話很關鍵
        conf.set("fs.default.name", "hdfs://localhost:9000");
        String[] ioArgs=new String[]{"sort_in","sort_out"};
        String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
     if (otherArgs.length != 2) {
     System.err.println("Usage: Data Sort <in> <out>");
         System.exit(2);
     }
     Job job = new Job(conf, "Data Sort");
     job.setJarByClass(Sort.class);
     //設定Map和Reduce處理類
     job.setMapperClass(Map.class);
     job.setReducerClass(Reduce.class);
     //設定輸出型別
     job.setOutputKeyClass(IntWritable.class);
     job.setOutputValueClass(IntWritable.class);
     //設定輸入和輸出目錄
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
     System.exit(job.waitForCompletion(true) ? 0 : 1);
     }
}

4、結果

1    2
2    6
3    15
4    22
5    26
6    32
7    32
8    54
9    92
10    650
11    654
12    756
13    5956
14    65223