1. 程式人生 > >雲端計算期末報告無圖 kmeans和最短路徑演算法hadoop實現詳解

雲端計算期末報告無圖 kmeans和最短路徑演算法hadoop實現詳解

《雲端計算應用開發實驗》大作業報告

一.實驗環境與實驗工具

ubuntu 16.04真機 + hadoop2.6 + 本地偽分佈
 

二.實驗原理

以下內容為科普性內容,不過裡面還是有一些關鍵的解釋在配環境的時候用得上

Hadoop是一個能夠讓使用者輕鬆架構和使用的分散式計算平臺。

使用者可以輕鬆地在Hadoop上開發和執行處理海量資料的應用程式。它主要有以下幾個優點:

  1. 高可靠性。Hadoop按位儲存和處理資料的能力值得人們信賴。
  2. 高擴充套件性。Hadoop是在可用的計算機集簇間分配資料並完成計算任務的,這些集簇可以方便地擴充套件到數以千計的節點中。
  3. 高效性。Hadoop能夠在節點之間動態地移動資料,並保證各個節點的動態平衡,因此處理速度非常快。
  4. 高容錯性。Hadoop能夠自動儲存資料的多個副本,並且能夠自動將失敗的任務重新分配
  5. 低成本。與一體機、商用資料倉庫以及QlikView、Yonghong Z-Suite等資料集市相比,Hadoop是開源的,專案的軟體成本因此會大大降低

Hadoop的核心就是HDFS和MapReduce

HDFS(Hadoop Distributed File System,Hadoop分散式檔案系統),它是一個高度容錯性的系統,適合部署在廉價的機器上。HDFS能提供高吞吐量的資料訪問,適合那些有著超大資料集(large data set)的應用程式。

HDFS的關鍵元素:

  • Block:將一個檔案進行分塊,通常是64M。
  • NameNode:儲存整個檔案系統的目錄資訊、檔案資訊及分塊資訊,這是由唯一一臺主機專門儲存,當然這臺主機如果出錯,NameNode就失效了。
  • SecondaryNameNode 它不是 namenode 的冗餘守護程序,而是提供週期檢查點和清理任務。
    • 出於對可擴充套件性和容錯性等考慮,我們一般將SecondaryNameNode執行在一臺非NameNode的機器上。
  • DataNode:它負責管理連線到節點的儲存(一個叢集中可以有多個節點)。每個儲存資料的節點執行一個 datanode 守護程序。

上面關於namenode和datanode的定義還是比較重要的

圖片名稱

MapReduce是一種程式設計模型,用於大規模資料集(大於1TB)的並行運算。概念”Map(對映)”和”Reduce(歸約)”,是它的主要思想。它極大地方便了程式設計人員在不會分散式並行程式設計的情況下,將自己的程式執行在分散式系統上。

MapReduce物理實體

  1. 客戶端(client):編寫mapreduce程式,配置作業,提交作業,這就是程式設計師完成的工作;
  2. JobTracker:初始化作業,分配作業,與TaskTracker通訊,協調整個作業的執行;
  3. TaskTracker:保持與JobTracker的通訊,在分配的資料片段上執行Map或Reduce任務,TaskTracker和JobTracker的不同有個很重要的方面,就是在執行任務時候TaskTracker可以有n多個,JobTracker則只會有一個;
  4. Hdfs:儲存作業的資料、配置資訊等等,最後的結果也是儲存在hdfs上面

MapReduce作業執行機制:

圖片名稱

  • ### 這一過程在實驗中體現得比較明顯,之後會有程式碼的詳細分析

1.在客戶端啟動一個作業。

2.向JobTracker請求一個Job ID。

3.將執行作業所需要的資原始檔複製到HDFS上,包括MapReduce程式打包的JAR檔案、配置檔案和客戶端計算所得的輸入劃分資訊。這些檔案都存放在JobTracker專門為該作業建立的資料夾中。資料夾名為該作業的Job ID。JAR檔案預設會有10個副本(mapred.submit.replication屬性控制);輸入劃分資訊告訴了JobTracker應該為這個作業啟動多少個map任務等資訊。

4.JobTracker接收到作業後,將其放在一個作業佇列裡,等待作業排程器對其進行排程,當作業排程器根據自己的排程演算法排程到該作業時,會根據輸入劃分資訊為每個劃分建立一個map任務,並將map任務分配給TaskTracker執行。對於map和reduce任務,TaskTracker根據主機核的數量和記憶體的大小有固定數量的map槽和reduce槽。

5.TaskTracker每隔一段時間會給JobTracker傳送一個心跳,告訴JobTracker它依然在執行,同時心跳中還攜帶著很多的資訊,比如當前map任務完成的進度等資訊。當JobTracker收到作業的最後一個任務完成資訊時,便把該作業設定成“成功”。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條訊息給使用者。

用最簡短的語言解釋MapReduce:
We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes.
Now we get together and add our individual counts. That’s reduce.

三.實驗過程

以下內容為正題,從環境搭建開始:

重搭真機偽分佈環境

之前的實驗是在虛擬機器上跑的,利用兩臺虛擬機器一臺做主機一臺做從機,虛擬機器效能差,兩臺虛擬機器之前的連線速度極慢,map reduce的過程也非常的慢,親測是沒跑動上面的演算法,所以我重新在ubuntu16.04的真機上搭建了偽分散式的hadoop環境和hdfs分散式檔案系統,當然也順便把單機模式的hadoop也跑了一下wordcount,偽分佈和完全分佈就只是不需要配置slave檔案,即自己就是自己的從機,因為TA給了配置文件,但是是全分佈的,而且有些也不需要,我就把關鍵的地方說一下
1. java安裝環境變數啥的基本常識,注意一下要在~/.bashrc裡面新增的環境變數才會一直生效,配置完後需要source一下才能立即生效
2. 配置ssh,ssh畢竟是ubuntu最常用的功能之一,方便以後ssh自己的筆記本,這裡就不用改名為master了,好難看,還是保持的當前的名字,但是在/etc/hosts那裡我還是改成了自己的pc名,ip就是本地ip:127.0.0.1,不用新增master和slave,然後配置ssh就是讓自己的機器能直接通過賬戶密碼ssh控制,新增密匙什麼的按教程來就沒問題
3. hadoop安裝配置,並且和java一樣設定hadoop的環境變數就是在.bashrc檔案裡新增hadoop的bin路徑,然後關鍵是三個檔案的配置,注意嘗試過單機模式,就不用配置core-site檔案,然後輸入輸出就是本地的檔案,我跑了一下wordcount試了一下成功了就繼續配置偽分佈模式
* 修改core-site.xml

  <property>  
        <name>fs.defaultFS</name>  
        <value>hdfs://cx-ThinkPad-X230:9000</value>  
    </property>  
    <property>  
        <name>hadoop.tmp.dir</name>  
        <value>/usr/local/hadoop/hadoop/tmp</value>  
    </property> 

這裡第一個fs.defaultFS就是配置預設的檔案系統,主要要寫上自己主機名稱,9000為埠號,預設為這個如果不是那麼需要在程式碼中設定,
第二個是hadoop.tmp.dir,下面的路徑名就是你第一次執行hdfs namenode -format這在這個資料夾下生成hdfs系統,這是一個基目錄,所以如果你的hdfs檔案系統出了問題或者想重新配置,將這個資料夾刪掉再重新執行hdfs namenode -format即可(親測經驗之談。。。)

  • 修改hdfs-site.xml
  <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>/usr/local/hadoop/hadoop/tmp/dfs/name</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/usr/local/hadoop/hadoop/tmp/dfs/data</value>
    </property>

dfs.replication是副本數,即block的備份數
後面兩個就是節點的路徑,需要在第一個配置檔案的子目錄下面,這點十分重要,非常之坑,親測我這裡的路徑和上面的路徑名不一致,導致我後面只要一跑mapreduce導致系統直接登出,迷了我很久,後來從偽分佈又換成單機,又換成偽分佈,配了幾次才發現問題
NameNode 和Datanode在前面已經介紹得很清楚,就不過多介紹

這裡在hdfs檔案系統format之後我嘗試在/usr/local/hadoop/hadoop/tmp即我指定的目錄中檢視我put上去的東西,結果並不能找到檔案,而是一些奇怪的檔案,看來hdfs檔案系統將傳上去的檔案處理過,要訪問只能通過hdfs方式檢視

然後配置檔案還要配置(其實偽分佈模式可以不用搞這個的)
YARN 是從 MapReduce 中分離出來的,負責資源管理與任務排程。YARN 運行於 MapReduce 之上,提供了高可用性、高擴充套件性,如果配了用sbin/start-yarn.sh啟動即可,會看到多的nodemanager和recoursemanager,配置和教程一樣就不多說了

上面的配置過程還是折騰了許久,因為沒有按照TA的教材自己研究,網上教程版本太多,參差不齊,不過有問題才能學到東西,配置過程也加深了對hadoop和hdfs的理解,這個東西還是很強大的,也激起了繼續折騰的興趣

演算法實現

1.Kmeans

這裡先介紹一下Kmeans演算法
K-means演算法是硬聚類演算法,是典型的基於原型的目標函式聚類方法的代表,採用距離作為相似性的評價指標,即認為兩個物件的距離越近,其相似度就越大。該演算法認為簇是由距離靠近的物件組成的,因此把得到緊湊且獨立的簇作為最終目標。
圖片名稱

演算法過程如下:

  • 從N個文件隨機選取K個文件作為質心
  • 對剩餘的每個文件測量其到每個質心的距離,並把它歸到最近的質心的類
  • 重新計算已經得到的各個類的質心
  • 迭代2~3步直至新的質心與原質心相等或小於指定閾值,演算法結束

圖片名稱 圖片名稱 圖片名稱 圖片名稱 圖片名稱

從 K-means 演算法框架可以看出,該演算法需要不斷地進行樣本分類調整,不斷地計算調整後的新的聚類中心,因此當資料量非常大時,演算法的時間開銷是非常大的。而Hadoop的分散式計算平臺恰好可以彌補這一缺點,所以K-means演算法也十分適合hadoop

其實這個演算法在hadoop實現非常簡單粗暴,感覺也是hadoop和機器學習結合最通俗易懂的例子,個人覺得就像機器學習和hadoop結合的”wordcount”,這裡主要是對程式碼的詳細分析,包括了對mapreduce的理解

先介紹map和reduce過程
* map過程:
首先從HDFS上讀取中心點,然後讓當前文件與這k箇中心點進行距離運算,找出距離最近的的一箇中心點,以中心點為key值,將記錄原樣輸出。

  public static class Map extends Mapper<LongWritable, Text, IntWritable, Text>{
        //中心集合
        ArrayList<ArrayList<Double>> centers = null;      
        int k = 0;   
        //讀取中心
        protected void setup(Context context) throws IOException,
                InterruptedException {
            //從HDFS上讀取資料
            centers = Utils.getCentersFromHDFS(context.getConfiguration().get("centersPath"),false);

            k = centers.size();//獲得中心點的數目 
        }
        /**
         * 1.每次讀取一條要分類的條記錄與中心做對比,歸類到對應的中心
         * 2.以中心ID為key,中心包含的記錄為value輸出(例如: 1 0.2 。  1為聚類中心的ID,0.2為靠近聚類中心的某個值)
         */
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //讀取一行資料
            ArrayList<Double> fileds = Utils.textToArray(value);
            int sizeOfFileds = fileds.size();//維度 

            double mindistance = 1999999999;
            int centerIndex = 0;           
            //依次取出k箇中心點與當前讀取的記錄做計算
            for(int i=0;i<k;i++){
                double currentdistance = 0;
                for(int j=1;j<sizeOfFileds;j++){ 
                    double centerPoint = Math.abs(centers.get(i).get(j));
                    double filed = Math.abs(fileds.get(j));
                    currentdistance += Math.pow((centerPoint - filed), 2);
                }
                //迴圈找出距離該記錄最接近的中心點的ID
                if(currentdistance<mindistance){
                    mindistance = currentdistance;
                    centerIndex = i;//記錄下是第幾個中心點 
                }
            }
            //以中心點為Key 將記錄原樣輸出
            context.write(new IntWritable(centerIndex+1), value);
        }      
    }

從HDFS讀取檔案函式getCentersFromHDFS():根據路徑讀取檔案,轉換為ArrayList

  public static ArrayList<ArrayList<Double>> getCentersFromHDFS(String centersPath,boolean isDirectory) 
    throws IOException{

        ArrayList<ArrayList<Double>> result = new ArrayList<ArrayList<Double>>();
        Path path = new Path(centersPath);
        Configuration conf = new Configuration();

        FileSystem fileSystem = path.getFileSystem(conf);
        if(isDirectory){   //判斷是否是目錄 
            FileStatus[] listFile = fileSystem.listStatus(path);
            for (int i = 0; i < listFile.length; i++) {
                result.addAll(getCentersFromHDFS(listFile[i].getPath().toString(),false));
            }
            return result;
        }

        FSDataInputStream fsis = fileSystem.open(path);
        LineReader lineReader = new LineReader(fsis, conf);
        Text line = new Text();

        while(lineReader.readLine(line) > 0){//讀取中心點檔案,將Text轉換為ArrayList<ArrayList<Double>>返回
            ArrayList<Double> tempList = textToArray(line);
            result.add(tempList);
        }
        lineReader.close();
        return result;
    }

 
* reduce過程
以key值分簇,計算出該簇的下一輪的中心點。之後又回到main函式中,呼叫比較函式compareCenters()判斷是否已經收斂。

    //利用reduce的歸併功能以中心為Key將記錄歸併到一起
  public static class Reduce extends Reducer<IntWritable, Text, Text, Text>{
        /**
         * 1.Key為聚類中心的ID value為以該中心點的邊緣點的集合
         * 2.計數所有記錄元素的平均值,求出新的中心
         */
        protected void reduce(IntWritable key, Iterable<Text> value,Context context)
                throws IOException, InterruptedException {
            ArrayList<ArrayList<Double>> filedsList = new ArrayList<ArrayList<Double>>();            
            //依次讀取記錄集,每行為一個ArrayList<Double>
            for(Iterator<Text> it =value.iterator();it.hasNext();){
                ArrayList<Double> tempList = Utils.textToArray(it.next());
                filedsList.add(tempList);
            }
            //計算新的中心
            //每行的元素個數
            int filedSize = filedsList.get(0).size();
            double[] avg = new double[filedSize];
            for(int i=1;i<filedSize;i++){
                //求每列的平均值
                double sum = 0;
                int size = filedsList.size();
                for(int j=0;j<size;j++){
                    sum += filedsList.get(j).get(i);
                }
                avg[i] = sum / size;
            }
            context.write(new Text("") , new Text(Arrays.toString(avg).replace("[", "").replace("]", "")));
        }     
    }

 
* 主函式
讀入輸入的4個引數,執行選取初始中心點的函式getinitcenter(),初始化完成後進入第一次運算。

    public static void main(String[] args) 
        throws ClassNotFoundException, IOException, InterruptedException {        
        Configuration conf = new Configuration();
        if (args.length != 4) {
            System.err.println("Usage: InvertedIndex <centerPath> <dataPath> <newCenterPath>");
            System.exit(2);
        }

        args = new GenericOptionsParser(conf, args).getRemainingArgs();//獲取執行命令時輸入的引數 
        String centerPath = args[0];
        String dataPath = args[1];
        String newCenterPath = args[2];
        int NumOfCenter=Integer.parseInt(args[3]);

        Utils.getinitcenter(centerPath,dataPath,NumOfCenter);//隨機選取初始點 

        int count = 0;
        while(true){
            run(centerPath,dataPath,newCenterPath,true);
            System.out.println(" 第 " + ++count + " 次計算 ");
            if(Utils.compareCenters(centerPath,newCenterPath )){//判斷是否收斂,如果收斂則不用再進行運算 
                //run(centerPath,dataPath,newCenterPath,false);
                break;
            }

        }
    }

還有其他的一些通用函式這裡就不介紹了,在附件的程式碼裡都有詳細的註釋

Kmeans演算法執行結果
圖片名稱

圖片名稱

上圖是執行兩次之後成功分簇,然後檢視結果和原始的資料點,可以看到二維點集

(0,2),(0,0),(1.5,0),(5,0),(5,2)

已經分成了0,1兩簇,第一簇包括(0,2),(0,0),(1.5,0),第二簇包括(5,0),(5,2)
 
 

2.shortestPath 最短路徑演算法

沒錯,我們雖然小組只有兩個人但是就是折騰了兩個出來,好不容易在真機上搭出來環境,跑得賊快,一個演算法不過癮,
還有原因是因為上面的例子其實基本就是部落格寫的,我們自己就加了個隨機生成簇首點,估計寫的同學都很多(估計TA也發現了大片程式碼雷同),
於是想重新搞一個難一點的,然後自己寫,於是就選了這個演算法

這個演算法的思路也就是map和reduce的流程是參照部落格上面的,但是實現都是自己敲的,算是比較深刻的實踐了mapreduce程式設計,自我感覺要比kmeans難那麼一丟丟,雖然做完了發現也很簡單= =
首先說一下map和reduce的過程

檔案的輸入格式是

A   B,12 E,5 D,1
B   C,2 D,3
C   E,1
D   B,3 C,1 E,5
E   A,1
K   a,n1, b,n2 ... //表示節點到K 到 後面鄰居節點的距離是n

然後在實驗中發現下面的規則
注意節點與鄰居表之間的分隔符是'\t',非常重要,因為在map的過程中會自動把一行文字按y遇到的第一個’\t’分隔成map的key和value,如果沒有’\t’,那麼會全部存為value,而key沒有值(列印沒有值,但是實際上我覺得應該是按行數來作為一個key value的)


  • map過程
第一次將A B,12 E,5 D,1分解成為
Node <當前與源點距離> <鄰接表>
A 0 B,12 E,5 D,1(源節點)
或者
B inf C,2 D,3(中間節點)
壓入的時候就Node作為key,<當前距離> <鄰接表>一同作為value
之後就是
如果當前與源點距離不為inf就把節點的的所有鄰接表距離算出來(包含源點),並按照
Node作為key,當前距離+下一節點距離作為value
那麼在reduce階段,同一個key也就是每個node節點獲得的values資料存的就是
Nodei    <當前與源點距離> <鄰接表>
Nodei    dis1
Nodei    dis2
Nodei    dis3

reduce會自動按key值排序,所以我們獲得到的nodei是按A,B,C..順序得到的,所以也就類似BFS了


  • reduce過程
    key會排序,但是注意values陣列並不會排序,而是按壓入的順序儲存,

reduce過程任務是更新到當前節點的最短路徑
從上面的Nodei dis中選一個最小的值與下面的距離比較,如果更小則更新
Nodei <當前與源點距離> <鄰接表>
然後一直執行這個過程,直到一輪mapreduce之後所有節點都沒有更新,則停止,最後輸出源節點到所有節點的最短路徑

程式碼+註釋:
* map程式碼

    public static class ShortestPathMapper extends Mapper<Text, Text, Text, Text> {
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {      

            String name=key.toString();
            //configuration 裡可以存一下控制變數,可在mapreduce過程中的獲取
            int conuter = context.getConfiguration().getInt("Mapcount", 1);

            String adj = "";
            //第一次就加上去初始的距離,源點為0其他為不可達
            if (conuter == 1) {
                if (name.equals("A")) {
                    adj = "0"+ " " +value.toString();
                } else {
                    adj = "inf"+ " " +value.toString();
                }
            } 
            else {
                adj = value.toString();
            }

            Node node = new Node();
            node.getFormatString(adj);
            if (!node.getDistance().equals("inf")){
                //當前節點可達,就計算其所有鄰接點距離
                for (int i = 0; i < node.getNodeNum(); i++) {
                    String k = node.getName(i);
                    double new_dis=Double.parseDouble(node.getValue(i)) + Double.parseDouble(node.getDistance());
                    //System.out.println(k+" "+new_dis+"\n");
                    context.write(new Text(k), new Text(new_dis+""));
                }
            }
            context.write(key, new Text(adj));
        }
    }

 
* reduce程式碼

    public static class ShortestPathReducer extends Reducer<Text, Text, Text, Text> {
        protected void reduce(Text key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {

            Node node = new Node();
            boolean f=false; //標記該節點是否有更新最短距離
            double min=9999999;//存到這個節點最短的距離
            int i = 0;
            //遍歷每個節點的values,其中一個包含其鄰接點資訊,要單獨取出來
            for (Text t : values) {
                String[] strs = StringUtils.split(t.toString(), ' ');
                System.out.println(t.toString());

                if (strs.length>1 ) {
                    //包含鄰接點的資訊的那條資訊單獨處理
                    node.getFormatString(t.toString());
                }
                else {
                    //否則取出距離
                    double dis_new=Double.parseDouble(strs[0]);
                    //和最短的比較,
                    if (dis_new<min){
                        min=dis_new;
                    }
                }
                i++;
            }
            //獲取這一次reduce的最短距離和儲存的最短距離
            String dis_new=min+"";
            String dis_old=node.getDistance();
            //如果沒有訪問過又有新的距離,那麼直接更新
            if(dis_old.equals("inf")&&min!=9999999){
                node.setDistance(dis_new);
                f=true;
            } 
            //否則需要判斷距離是不是比當前的小,距離更近才更新
            else if (min < Double.parseDouble(dis_old)){
                node.setDistance(dis_new);
                f=true;
            }

            ////如果有更新最短距離,那麼更改停止標記
            if (f==true){
                context.getCounter(Stop_flag.Flag).increment(1L);
                //context.getConfiguration().setBoolean("stop", true);
            }
            //獲得stop標誌,用於判斷是否是最後一次輸出
            boolean stop=context.getConfiguration().getBoolean("stop", false);
            if (stop==true)
                context.write(key, new Text(node.getDistance()));
            else
                context.write(key, new Text(node.toString()));
        }
    }

 
* 主函式

    static enum Stop_flag {Flag}
    public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException{
        Configuration conf = new Configuration();

        if (args.length != 2) {
            System.err.println("Usage: ShortestPath <dataPath> <outPath>");
            System.exit(2);
        }
        args = new GenericOptionsParser(conf, args).getRemainingArgs();
        String dataPath = args[0];
        String outPath = args[1];

        FileSystem fs = FileSystem.get(conf);
        int count = 1;
        while (true) {
            //System.out.println(conf.getInt("Mapcount", 0)+"\n");

            conf.setInt("Mapcount", count);
            conf.setBoolean("stop",false);
            Job job = Job.getInstance(conf);
            //jar包名
            job.setJarByClass(ShortestPath.class);
            //設定對應的class,在打包的jar裡面
            job.setMapperClass(ShortestPathMapper.class);
            job.setReducerClass(ShortestPathReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setInputFormatClass(KeyValueTextInputFormat.class);

            //每次的輸出檔案要為新的,這樣可以看到更新的過程
            Path outPath1 = new Path(outPath+"tmp" + count);
            if (fs.exists(outPath1)) {
                fs.delete(outPath1, true);
            }
            FileOutputFormat.setOutputPath(job, outPath1);

            //輸入檔案第一次為輸入的路徑,後面就是上一次reduce生成的檔案
            if (count == 1)
                FileInputFormat.addInputPath(job, new Path(dataPath));
            else
                FileInputFormat.addInputPath(job, new Path(outPath+"tmp" + (count - 1)));

            if (job.waitForCompletion(true)) {
                System.out.println("run time: "+count);                
                //if (conf.getBoolean("stop", false)) break;
                long num = job.getCounters().findCounter(Stop_flag.Flag).getValue();

                //如果所有節點都沒有更新距離,那麼演算法結束
                if (num==0) {
                    //結束的mapreduce輸出只輸出最短路徑
                    conf.setInt("Mapcount", count);
                    //設定stop標記為true
                    conf.setBoolean("stop",true);
                    Job job1 = Job.getInstance(conf);
                    job1.setJarByClass(ShortestPath.class);
                    job1.setMapperClass(ShortestPathMapper.class);
                    job1.setReducerClass(ShortestPathReducer.class);
                    job1.setMapOutputKeyClass(Text.class);
                    job1.setMapOutputValueClass(Text.class);
                    job1.setInputFormatClass(KeyValueTextInputFormat.class);
                    //輸入檔案為最後的節點,輸出檔案為final檔案
                    FileInputFormat.addInputPath(job1, new Path(outPath+"tmp" + count));
                    FileOutputFormat.setOutputPath(job1,new Path(outPath+"final"));
                    job1.waitForCompletion(true);
                    System.out.println("end ");
                    break;
                }
            }
            count++;
        }


    }

ShortestPath演算法執行結果
測試資料集
圖片名稱

A   B,3 D,1
B   A,3 E,7 F,2
C   D,2 E,5 F,3
D   A,1 C,2 E,5
E   B,7 C,5 D,5 
F   B,2 C,3

執行結果
圖片名稱

上圖紅框為執行完成後檢視生產的所有中間檔案tmp1-3和最終的執行結果final

A   0
B   3.0
C   3.0
D   1.0
E   6.0
F   5.0

表示從A點到各個節點的最短距離,結合上面的給出的資料圖很容易驗證是正確的,
然後黃色框的是把中間執行結果打印出來了,可以看到運行了三次,然後各點到A點的距離也在不斷的減少,下面再貼一些mapreduce執行的中間過程截圖

圖片名稱 圖片名稱
這列印的是reduce階段接收到的每個節點的現在計算出的距離和鄰接表,從上往下依次是A,B,C…原因就是因為reduce會按照key值排序,可以看出values列表是無序的,這就是之前也說過的問題。
左圖為執行之後可以看到有些節點還是不可達的,距離為10^7或者9999999,(因為java的double精度丟失,很奇怪,竟然10^7就丟失精度了。。),右圖為已經完成最短路徑計算的。
實驗部分至此結束

四.實驗總結

這次大作業肯定是收穫很大的,以後還是可以吹噓自己是學過hadoop並且親自寫過mapreduce的,並且通過自己的實驗從搭環境到wordcount程式跑起來,然後是學習別人的程式碼,從簡單的例子,比如統計,排序,學習了mapreduce的框架和一些特性,這裡總結一下框架,也就是每個hadoop程式都要走的

//配置檔案,可以儲存一些配置
Configuration conf = new Configuration();
//獲取一個job
Job job = Job.getInstance(conf);

job.setJarByClass(ShortestPath.class);

//設定map和reduce的類,就自己寫的map和reduce過程
job.setMapperClass(ShortestPathMapper.class);
job.setReducerClass(ShortestPathReducer.class);

//設定輸入輸出個格式,這裡用到text
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);

//輸入輸出檔案,在hdfs檔案系統上
FileInputFormat.addInputPath(job, new Path());
FileOutputFormat.setOutputPath(job, new Path());

然後還可以提前設定一些在mapreduce過程中用到的變數

//設定
conf.setInt("Mapcount", count);
conf.setBoolean("stop",true);

//在mapreduce過程中獲取
boolean stop=context.getConfiguration().getBoolean("stop", false);

注意這些配置變數是不能在map和reduce過程中修改的,我在實驗中嘗試修改結果失敗了
總之,實驗過程雖然艱辛但是還是比較好玩的,hadoop這個平臺以後有需要還會來繼續研究研究,爭取弄點更有意思的東西。