雲端計算期末報告無圖 kmeans和最短路徑演算法hadoop實現詳解
《雲端計算應用開發實驗》大作業報告
一.實驗環境與實驗工具
ubuntu 16.04真機 + hadoop2.6 + 本地偽分佈
二.實驗原理
以下內容為科普性內容,不過裡面還是有一些關鍵的解釋在配環境的時候用得上
Hadoop是一個能夠讓使用者輕鬆架構和使用的分散式計算平臺。
使用者可以輕鬆地在Hadoop上開發和執行處理海量資料的應用程式。它主要有以下幾個優點:
- 高可靠性。Hadoop按位儲存和處理資料的能力值得人們信賴。
- 高擴充套件性。Hadoop是在可用的計算機集簇間分配資料並完成計算任務的,這些集簇可以方便地擴充套件到數以千計的節點中。
- 高效性。Hadoop能夠在節點之間動態地移動資料,並保證各個節點的動態平衡,因此處理速度非常快。
- 高容錯性。Hadoop能夠自動儲存資料的多個副本,並且能夠自動將失敗的任務重新分配
- 低成本。與一體機、商用資料倉庫以及QlikView、Yonghong Z-Suite等資料集市相比,Hadoop是開源的,專案的軟體成本因此會大大降低
Hadoop的核心就是HDFS和MapReduce
HDFS(Hadoop Distributed File System,Hadoop分散式檔案系統),它是一個高度容錯性的系統,適合部署在廉價的機器上。HDFS能提供高吞吐量的資料訪問,適合那些有著超大資料集(large data set)的應用程式。
HDFS的關鍵元素:
- Block:將一個檔案進行分塊,通常是64M。
- NameNode:儲存整個檔案系統的目錄資訊、檔案資訊及分塊資訊,這是由唯一一臺主機專門儲存,當然這臺主機如果出錯,NameNode就失效了。
- SecondaryNameNode 它不是 namenode 的冗餘守護程序,而是提供週期檢查點和清理任務。
- 出於對可擴充套件性和容錯性等考慮,我們一般將SecondaryNameNode執行在一臺非NameNode的機器上。
- DataNode:它負責管理連線到節點的儲存(一個叢集中可以有多個節點)。每個儲存資料的節點執行一個 datanode 守護程序。
上面關於namenode和datanode的定義還是比較重要的
MapReduce是一種程式設計模型,用於大規模資料集(大於1TB)的並行運算。概念”Map(對映)”和”Reduce(歸約)”,是它的主要思想。它極大地方便了程式設計人員在不會分散式並行程式設計的情況下,將自己的程式執行在分散式系統上。
MapReduce物理實體
- 客戶端(client):編寫mapreduce程式,配置作業,提交作業,這就是程式設計師完成的工作;
- JobTracker:初始化作業,分配作業,與TaskTracker通訊,協調整個作業的執行;
- TaskTracker:保持與JobTracker的通訊,在分配的資料片段上執行Map或Reduce任務,TaskTracker和JobTracker的不同有個很重要的方面,就是在執行任務時候TaskTracker可以有n多個,JobTracker則只會有一個;
- Hdfs:儲存作業的資料、配置資訊等等,最後的結果也是儲存在hdfs上面
MapReduce作業執行機制:
- ### 這一過程在實驗中體現得比較明顯,之後會有程式碼的詳細分析
1.在客戶端啟動一個作業。
2.向JobTracker請求一個Job ID。
3.將執行作業所需要的資原始檔複製到HDFS上,包括MapReduce程式打包的JAR檔案、配置檔案和客戶端計算所得的輸入劃分資訊。這些檔案都存放在JobTracker專門為該作業建立的資料夾中。資料夾名為該作業的Job ID。JAR檔案預設會有10個副本(mapred.submit.replication屬性控制);輸入劃分資訊告訴了JobTracker應該為這個作業啟動多少個map任務等資訊。
4.JobTracker接收到作業後,將其放在一個作業佇列裡,等待作業排程器對其進行排程,當作業排程器根據自己的排程演算法排程到該作業時,會根據輸入劃分資訊為每個劃分建立一個map任務,並將map任務分配給TaskTracker執行。對於map和reduce任務,TaskTracker根據主機核的數量和記憶體的大小有固定數量的map槽和reduce槽。
5.TaskTracker每隔一段時間會給JobTracker傳送一個心跳,告訴JobTracker它依然在執行,同時心跳中還攜帶著很多的資訊,比如當前map任務完成的進度等資訊。當JobTracker收到作業的最後一個任務完成資訊時,便把該作業設定成“成功”。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條訊息給使用者。
用最簡短的語言解釋MapReduce:
We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes.
Now we get together and add our individual counts. That’s reduce.
三.實驗過程
以下內容為正題,從環境搭建開始:
重搭真機偽分佈環境
之前的實驗是在虛擬機器上跑的,利用兩臺虛擬機器一臺做主機一臺做從機,虛擬機器效能差,兩臺虛擬機器之前的連線速度極慢,map reduce的過程也非常的慢,親測是沒跑動上面的演算法,所以我重新在ubuntu16.04的真機上搭建了偽分散式的hadoop環境和hdfs分散式檔案系統,當然也順便把單機模式的hadoop也跑了一下wordcount,偽分佈和完全分佈就只是不需要配置slave檔案,即自己就是自己的從機,因為TA給了配置文件,但是是全分佈的,而且有些也不需要,我就把關鍵的地方說一下
1. java安裝環境變數啥的基本常識,注意一下要在~/.bashrc
裡面新增的環境變數才會一直生效,配置完後需要source一下才能立即生效
2. 配置ssh,ssh畢竟是ubuntu最常用的功能之一,方便以後ssh自己的筆記本,這裡就不用改名為master了,好難看,還是保持的當前的名字,但是在/etc/hosts那裡我還是改成了自己的pc名,ip就是本地ip:127.0.0.1,不用新增master和slave,然後配置ssh就是讓自己的機器能直接通過賬戶密碼ssh控制,新增密匙什麼的按教程來就沒問題
3. hadoop安裝配置,並且和java一樣設定hadoop的環境變數就是在.bashrc檔案裡新增hadoop的bin路徑,然後關鍵是三個檔案的配置,注意嘗試過單機模式,就不用配置core-site檔案,然後輸入輸出就是本地的檔案,我跑了一下wordcount試了一下成功了就繼續配置偽分佈模式
* 修改core-site.xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://cx-ThinkPad-X230:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/hadoop/hadoop/tmp</value>
</property>
這裡第一個fs.defaultFS就是配置預設的檔案系統,主要要寫上自己主機名稱,9000為埠號,預設為這個如果不是那麼需要在程式碼中設定,
第二個是hadoop.tmp.dir,下面的路徑名就是你第一次執行hdfs namenode -format
這在這個資料夾下生成hdfs系統,這是一個基目錄,所以如果你的hdfs檔案系統出了問題或者想重新配置,將這個資料夾刪掉再重新執行hdfs namenode -format
即可(親測經驗之談。。。)
- 修改hdfs-site.xml
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/usr/local/hadoop/hadoop/tmp/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/usr/local/hadoop/hadoop/tmp/dfs/data</value>
</property>
dfs.replication是副本數,即block的備份數
後面兩個就是節點的路徑,需要在第一個配置檔案的子目錄下面,這點十分重要,非常之坑,親測我這裡的路徑和上面的路徑名不一致,導致我後面只要一跑mapreduce導致系統直接登出,迷了我很久,後來從偽分佈又換成單機,又換成偽分佈,配了幾次才發現問題
NameNode 和Datanode在前面已經介紹得很清楚,就不過多介紹
這裡在hdfs檔案系統format之後我嘗試在/usr/local/hadoop/hadoop/tmp
即我指定的目錄中檢視我put上去的東西,結果並不能找到檔案,而是一些奇怪的檔案,看來hdfs檔案系統將傳上去的檔案處理過,要訪問只能通過hdfs方式檢視
然後配置檔案還要配置(其實偽分佈模式可以不用搞這個的)
YARN 是從 MapReduce 中分離出來的,負責資源管理與任務排程。YARN 運行於 MapReduce 之上,提供了高可用性、高擴充套件性,如果配了用sbin/start-yarn.sh
啟動即可,會看到多的nodemanager和recoursemanager,配置和教程一樣就不多說了
上面的配置過程還是折騰了許久,因為沒有按照TA的教材自己研究,網上教程版本太多,參差不齊,不過有問題才能學到東西,配置過程也加深了對hadoop和hdfs的理解,這個東西還是很強大的,也激起了繼續折騰的興趣
演算法實現
1.Kmeans
這裡先介紹一下Kmeans演算法
K-means演算法是硬聚類演算法,是典型的基於原型的目標函式聚類方法的代表,採用距離作為相似性的評價指標,即認為兩個物件的距離越近,其相似度就越大。該演算法認為簇是由距離靠近的物件組成的,因此把得到緊湊且獨立的簇作為最終目標。
演算法過程如下:
- 從N個文件隨機選取K個文件作為質心
- 對剩餘的每個文件測量其到每個質心的距離,並把它歸到最近的質心的類
- 重新計算已經得到的各個類的質心
- 迭代2~3步直至新的質心與原質心相等或小於指定閾值,演算法結束
從 K-means 演算法框架可以看出,該演算法需要不斷地進行樣本分類調整,不斷地計算調整後的新的聚類中心,因此當資料量非常大時,演算法的時間開銷是非常大的。而Hadoop的分散式計算平臺恰好可以彌補這一缺點,所以K-means演算法也十分適合hadoop
其實這個演算法在hadoop實現非常簡單粗暴,感覺也是hadoop和機器學習結合最通俗易懂的例子,個人覺得就像機器學習和hadoop結合的”wordcount”,這裡主要是對程式碼的詳細分析,包括了對mapreduce的理解
先介紹map和reduce過程
* map過程:
首先從HDFS上讀取中心點,然後讓當前文件與這k箇中心點進行距離運算,找出距離最近的的一箇中心點,以中心點為key值,將記錄原樣輸出。
public static class Map extends Mapper<LongWritable, Text, IntWritable, Text>{
//中心集合
ArrayList<ArrayList<Double>> centers = null;
int k = 0;
//讀取中心
protected void setup(Context context) throws IOException,
InterruptedException {
//從HDFS上讀取資料
centers = Utils.getCentersFromHDFS(context.getConfiguration().get("centersPath"),false);
k = centers.size();//獲得中心點的數目
}
/**
* 1.每次讀取一條要分類的條記錄與中心做對比,歸類到對應的中心
* 2.以中心ID為key,中心包含的記錄為value輸出(例如: 1 0.2 。 1為聚類中心的ID,0.2為靠近聚類中心的某個值)
*/
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//讀取一行資料
ArrayList<Double> fileds = Utils.textToArray(value);
int sizeOfFileds = fileds.size();//維度
double mindistance = 1999999999;
int centerIndex = 0;
//依次取出k箇中心點與當前讀取的記錄做計算
for(int i=0;i<k;i++){
double currentdistance = 0;
for(int j=1;j<sizeOfFileds;j++){
double centerPoint = Math.abs(centers.get(i).get(j));
double filed = Math.abs(fileds.get(j));
currentdistance += Math.pow((centerPoint - filed), 2);
}
//迴圈找出距離該記錄最接近的中心點的ID
if(currentdistance<mindistance){
mindistance = currentdistance;
centerIndex = i;//記錄下是第幾個中心點
}
}
//以中心點為Key 將記錄原樣輸出
context.write(new IntWritable(centerIndex+1), value);
}
}
從HDFS讀取檔案函式getCentersFromHDFS():根據路徑讀取檔案,轉換為ArrayList
public static ArrayList<ArrayList<Double>> getCentersFromHDFS(String centersPath,boolean isDirectory)
throws IOException{
ArrayList<ArrayList<Double>> result = new ArrayList<ArrayList<Double>>();
Path path = new Path(centersPath);
Configuration conf = new Configuration();
FileSystem fileSystem = path.getFileSystem(conf);
if(isDirectory){ //判斷是否是目錄
FileStatus[] listFile = fileSystem.listStatus(path);
for (int i = 0; i < listFile.length; i++) {
result.addAll(getCentersFromHDFS(listFile[i].getPath().toString(),false));
}
return result;
}
FSDataInputStream fsis = fileSystem.open(path);
LineReader lineReader = new LineReader(fsis, conf);
Text line = new Text();
while(lineReader.readLine(line) > 0){//讀取中心點檔案,將Text轉換為ArrayList<ArrayList<Double>>返回
ArrayList<Double> tempList = textToArray(line);
result.add(tempList);
}
lineReader.close();
return result;
}
* reduce過程
以key值分簇,計算出該簇的下一輪的中心點。之後又回到main函式中,呼叫比較函式compareCenters()判斷是否已經收斂。
//利用reduce的歸併功能以中心為Key將記錄歸併到一起
public static class Reduce extends Reducer<IntWritable, Text, Text, Text>{
/**
* 1.Key為聚類中心的ID value為以該中心點的邊緣點的集合
* 2.計數所有記錄元素的平均值,求出新的中心
*/
protected void reduce(IntWritable key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
ArrayList<ArrayList<Double>> filedsList = new ArrayList<ArrayList<Double>>();
//依次讀取記錄集,每行為一個ArrayList<Double>
for(Iterator<Text> it =value.iterator();it.hasNext();){
ArrayList<Double> tempList = Utils.textToArray(it.next());
filedsList.add(tempList);
}
//計算新的中心
//每行的元素個數
int filedSize = filedsList.get(0).size();
double[] avg = new double[filedSize];
for(int i=1;i<filedSize;i++){
//求每列的平均值
double sum = 0;
int size = filedsList.size();
for(int j=0;j<size;j++){
sum += filedsList.get(j).get(i);
}
avg[i] = sum / size;
}
context.write(new Text("") , new Text(Arrays.toString(avg).replace("[", "").replace("]", "")));
}
}
* 主函式
讀入輸入的4個引數,執行選取初始中心點的函式getinitcenter(),初始化完成後進入第一次運算。
public static void main(String[] args)
throws ClassNotFoundException, IOException, InterruptedException {
Configuration conf = new Configuration();
if (args.length != 4) {
System.err.println("Usage: InvertedIndex <centerPath> <dataPath> <newCenterPath>");
System.exit(2);
}
args = new GenericOptionsParser(conf, args).getRemainingArgs();//獲取執行命令時輸入的引數
String centerPath = args[0];
String dataPath = args[1];
String newCenterPath = args[2];
int NumOfCenter=Integer.parseInt(args[3]);
Utils.getinitcenter(centerPath,dataPath,NumOfCenter);//隨機選取初始點
int count = 0;
while(true){
run(centerPath,dataPath,newCenterPath,true);
System.out.println(" 第 " + ++count + " 次計算 ");
if(Utils.compareCenters(centerPath,newCenterPath )){//判斷是否收斂,如果收斂則不用再進行運算
//run(centerPath,dataPath,newCenterPath,false);
break;
}
}
}
還有其他的一些通用函式這裡就不介紹了,在附件的程式碼裡都有詳細的註釋
Kmeans演算法執行結果
上圖是執行兩次之後成功分簇,然後檢視結果和原始的資料點,可以看到二維點集
(0,2),(0,0),(1.5,0),(5,0),(5,2)
已經分成了0,1兩簇,第一簇包括(0,2),(0,0),(1.5,0)
,第二簇包括(5,0),(5,2)
2.shortestPath 最短路徑演算法
沒錯,我們雖然小組只有兩個人但是就是折騰了兩個出來,好不容易在真機上搭出來環境,跑得賊快,一個演算法不過癮,
還有原因是因為上面的例子其實基本就是部落格寫的,我們自己就加了個隨機生成簇首點,估計寫的同學都很多(估計TA也發現了大片程式碼雷同),
於是想重新搞一個難一點的,然後自己寫,於是就選了這個演算法
這個演算法的思路也就是map和reduce的流程是參照部落格上面的,但是實現都是自己敲的,算是比較深刻的實踐了mapreduce程式設計,自我感覺要比kmeans難那麼一丟丟,雖然做完了發現也很簡單= =
首先說一下map和reduce的過程
檔案的輸入格式是
A B,12 E,5 D,1
B C,2 D,3
C E,1
D B,3 C,1 E,5
E A,1
K a,n1, b,n2 ... //表示節點到K 到 後面鄰居節點的距離是n
然後在實驗中發現下面的規則
注意節點與鄰居表之間的分隔符是'\t'
,非常重要,因為在map的過程中會自動把一行文字按y遇到的第一個’\t’分隔成map的key和value,如果沒有’\t’,那麼會全部存為value,而key沒有值(列印沒有值,但是實際上我覺得應該是按行數來作為一個key value的)
- map過程
第一次將A B,12 E,5 D,1分解成為
Node <當前與源點距離> <鄰接表>
A 0 B,12 E,5 D,1(源節點)
或者
B inf C,2 D,3(中間節點)
壓入的時候就Node作為key,<當前距離> <鄰接表>一同作為value
之後就是
如果當前與源點距離不為inf就把節點的的所有鄰接表距離算出來(包含源點),並按照
Node作為key,當前距離+下一節點距離作為value
那麼在reduce階段,同一個key也就是每個node節點獲得的values資料存的就是
Nodei <當前與源點距離> <鄰接表>
Nodei dis1
Nodei dis2
Nodei dis3
reduce會自動按key值排序,所以我們獲得到的nodei是按A,B,C..順序得到的,所以也就類似BFS了
- reduce過程
key會排序,但是注意values陣列並不會排序,而是按壓入的順序儲存,
reduce過程任務是更新到當前節點的最短路徑
從上面的Nodei dis中選一個最小的值與下面的距離比較,如果更小則更新
Nodei <當前與源點距離> <鄰接表>
然後一直執行這個過程,直到一輪mapreduce之後所有節點都沒有更新,則停止,最後輸出源節點到所有節點的最短路徑
程式碼+註釋:
* map程式碼
public static class ShortestPathMapper extends Mapper<Text, Text, Text, Text> {
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String name=key.toString();
//configuration 裡可以存一下控制變數,可在mapreduce過程中的獲取
int conuter = context.getConfiguration().getInt("Mapcount", 1);
String adj = "";
//第一次就加上去初始的距離,源點為0其他為不可達
if (conuter == 1) {
if (name.equals("A")) {
adj = "0"+ " " +value.toString();
} else {
adj = "inf"+ " " +value.toString();
}
}
else {
adj = value.toString();
}
Node node = new Node();
node.getFormatString(adj);
if (!node.getDistance().equals("inf")){
//當前節點可達,就計算其所有鄰接點距離
for (int i = 0; i < node.getNodeNum(); i++) {
String k = node.getName(i);
double new_dis=Double.parseDouble(node.getValue(i)) + Double.parseDouble(node.getDistance());
//System.out.println(k+" "+new_dis+"\n");
context.write(new Text(k), new Text(new_dis+""));
}
}
context.write(key, new Text(adj));
}
}
* reduce程式碼
public static class ShortestPathReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Node node = new Node();
boolean f=false; //標記該節點是否有更新最短距離
double min=9999999;//存到這個節點最短的距離
int i = 0;
//遍歷每個節點的values,其中一個包含其鄰接點資訊,要單獨取出來
for (Text t : values) {
String[] strs = StringUtils.split(t.toString(), ' ');
System.out.println(t.toString());
if (strs.length>1 ) {
//包含鄰接點的資訊的那條資訊單獨處理
node.getFormatString(t.toString());
}
else {
//否則取出距離
double dis_new=Double.parseDouble(strs[0]);
//和最短的比較,
if (dis_new<min){
min=dis_new;
}
}
i++;
}
//獲取這一次reduce的最短距離和儲存的最短距離
String dis_new=min+"";
String dis_old=node.getDistance();
//如果沒有訪問過又有新的距離,那麼直接更新
if(dis_old.equals("inf")&&min!=9999999){
node.setDistance(dis_new);
f=true;
}
//否則需要判斷距離是不是比當前的小,距離更近才更新
else if (min < Double.parseDouble(dis_old)){
node.setDistance(dis_new);
f=true;
}
////如果有更新最短距離,那麼更改停止標記
if (f==true){
context.getCounter(Stop_flag.Flag).increment(1L);
//context.getConfiguration().setBoolean("stop", true);
}
//獲得stop標誌,用於判斷是否是最後一次輸出
boolean stop=context.getConfiguration().getBoolean("stop", false);
if (stop==true)
context.write(key, new Text(node.getDistance()));
else
context.write(key, new Text(node.toString()));
}
}
* 主函式
static enum Stop_flag {Flag}
public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException{
Configuration conf = new Configuration();
if (args.length != 2) {
System.err.println("Usage: ShortestPath <dataPath> <outPath>");
System.exit(2);
}
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String dataPath = args[0];
String outPath = args[1];
FileSystem fs = FileSystem.get(conf);
int count = 1;
while (true) {
//System.out.println(conf.getInt("Mapcount", 0)+"\n");
conf.setInt("Mapcount", count);
conf.setBoolean("stop",false);
Job job = Job.getInstance(conf);
//jar包名
job.setJarByClass(ShortestPath.class);
//設定對應的class,在打包的jar裡面
job.setMapperClass(ShortestPathMapper.class);
job.setReducerClass(ShortestPathReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
//每次的輸出檔案要為新的,這樣可以看到更新的過程
Path outPath1 = new Path(outPath+"tmp" + count);
if (fs.exists(outPath1)) {
fs.delete(outPath1, true);
}
FileOutputFormat.setOutputPath(job, outPath1);
//輸入檔案第一次為輸入的路徑,後面就是上一次reduce生成的檔案
if (count == 1)
FileInputFormat.addInputPath(job, new Path(dataPath));
else
FileInputFormat.addInputPath(job, new Path(outPath+"tmp" + (count - 1)));
if (job.waitForCompletion(true)) {
System.out.println("run time: "+count);
//if (conf.getBoolean("stop", false)) break;
long num = job.getCounters().findCounter(Stop_flag.Flag).getValue();
//如果所有節點都沒有更新距離,那麼演算法結束
if (num==0) {
//結束的mapreduce輸出只輸出最短路徑
conf.setInt("Mapcount", count);
//設定stop標記為true
conf.setBoolean("stop",true);
Job job1 = Job.getInstance(conf);
job1.setJarByClass(ShortestPath.class);
job1.setMapperClass(ShortestPathMapper.class);
job1.setReducerClass(ShortestPathReducer.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
job1.setInputFormatClass(KeyValueTextInputFormat.class);
//輸入檔案為最後的節點,輸出檔案為final檔案
FileInputFormat.addInputPath(job1, new Path(outPath+"tmp" + count));
FileOutputFormat.setOutputPath(job1,new Path(outPath+"final"));
job1.waitForCompletion(true);
System.out.println("end ");
break;
}
}
count++;
}
}
ShortestPath演算法執行結果
測試資料集
A B,3 D,1
B A,3 E,7 F,2
C D,2 E,5 F,3
D A,1 C,2 E,5
E B,7 C,5 D,5
F B,2 C,3
執行結果
上圖紅框為執行完成後檢視生產的所有中間檔案tmp1-3
和最終的執行結果final
A 0
B 3.0
C 3.0
D 1.0
E 6.0
F 5.0
表示從A點到各個節點的最短距離,結合上面的給出的資料圖很容易驗證是正確的,
然後黃色框的是把中間執行結果打印出來了,可以看到運行了三次,然後各點到A點的距離也在不斷的減少,下面再貼一些mapreduce執行的中間過程截圖
這列印的是reduce階段接收到的每個節點的現在計算出的距離和鄰接表,從上往下依次是A,B,C…原因就是因為reduce會按照key值排序,可以看出values列表是無序的,這就是之前也說過的問題。
左圖為執行之後可以看到有些節點還是不可達的,距離為10^7或者9999999,(因為java的double精度丟失,很奇怪,竟然10^7就丟失精度了。。),右圖為已經完成最短路徑計算的。
實驗部分至此結束
四.實驗總結
這次大作業肯定是收穫很大的,以後還是可以吹噓自己是學過hadoop並且親自寫過mapreduce的,並且通過自己的實驗從搭環境到wordcount程式跑起來,然後是學習別人的程式碼,從簡單的例子,比如統計,排序,學習了mapreduce的框架和一些特性,這裡總結一下框架,也就是每個hadoop程式都要走的
//配置檔案,可以儲存一些配置
Configuration conf = new Configuration();
//獲取一個job
Job job = Job.getInstance(conf);
job.setJarByClass(ShortestPath.class);
//設定map和reduce的類,就自己寫的map和reduce過程
job.setMapperClass(ShortestPathMapper.class);
job.setReducerClass(ShortestPathReducer.class);
//設定輸入輸出個格式,這裡用到text
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
//輸入輸出檔案,在hdfs檔案系統上
FileInputFormat.addInputPath(job, new Path());
FileOutputFormat.setOutputPath(job, new Path());
然後還可以提前設定一些在mapreduce過程中用到的變數
//設定
conf.setInt("Mapcount", count);
conf.setBoolean("stop",true);
//在mapreduce過程中獲取
boolean stop=context.getConfiguration().getBoolean("stop", false);
注意這些配置變數是不能在map和reduce過程中修改的,我在實驗中嘗試修改結果失敗了
總之,實驗過程雖然艱辛但是還是比較好玩的,hadoop這個平臺以後有需要還會來繼續研究研究,爭取弄點更有意思的東西。