1. 程式人生 > >Hadoop偽分佈安裝詳解+MapReduce執行原理+基於MapReduce的KNN演算法實現

Hadoop偽分佈安裝詳解+MapReduce執行原理+基於MapReduce的KNN演算法實現

本篇部落格將圍繞Hadoop偽分佈安裝+MapReduce執行原理+基於MapReduce的KNN演算法實現這三個方面進行敘述。
(一)Hadoop偽分佈安裝

1、簡述Hadoop的安裝模式中–偽分佈模式與叢集模式的區別與聯絡.
Hadoop的安裝方式有三種:本地模式,偽分佈模式,叢集(分佈)模式,其中後兩種模式為重點,有意義
偽分佈:如果Hadoop對應的Java程序都執行在一個物理機器上,稱為偽分佈
分佈:如果Hadoop對應的Java程序執行在多臺物理機器上,稱為分佈.[叢集就是有主有從]
偽分佈模式就是在一臺伺服器上面模擬叢集環境,但僅僅是機器數量少,其通訊機制與執行過程與真正的叢集模式是一樣的.

2、簡述Hadoop偽分佈模式的安裝步驟(7步驟)
①>關閉防火牆
②>設定靜態IP地址
③>修改主機名
④>IP地址與主機名繫結
⑤>設定SSH免密碼登入
⑥>安裝JDK並配置環境變數
⑦>安裝HADOOP並配置環境變數,並修改hadoop的四個配置檔案,最後對hadoop進行格式化

3、簡述Hadoop偽分佈模式安裝步驟的具體細節
1>關閉防火牆(驗證)
service iptables status
service iptables stop(暫時關閉)
chkconfig iptables off(永久關閉)
驗證:chkconfig –list|grep iptables
關閉防火牆的原因:Hadoop叢集在通訊的時候有很多個埠要開啟使用,所以要關閉防火牆
2>設定靜態IP地址(驗證)
如果要使用host-only這種網路連線方式,虛擬機器VMWare的虛擬網絡卡Vmnet1必須要開啟
如果要使用brige橋接這種網路連線方式,虛擬機器VMWare的虛擬網絡卡Vmnet8必須要開啟
配置Ip地址時,預設閘道器為相應虛擬網絡卡的IP地址
設定完IP地址之後要重啟網絡卡:service network restart
驗證:ifconfig
3>修改主機名(驗證)
vi /etc/sysonfig/network
驗證:重啟驗證,使主機名生效
reboot -h now
hostname
或者為了防止重啟麻煩,可以這麼做:
hostname hadoop60(主機名)
exit
4>IP地址與主機名繫結(驗證)
主機名類似於我們的域名,主機名只有與Ip地址繫結,才能ping通主機名,而且之所以將主機名與IP地址繫結,是因為主機名使用方便
Linux:
vi /etc/hosts 然後新增一行 IP地址 主機名
Windows:
進入C:\Windows\System32\drivers\etc\hosts
然後新增一行 IP地址 主機名
驗證:ping 通主機名即可
5>設定SSH(secure shell)免密碼登入(驗證)
執行命令ssh-keygen -t rsa 產生祕鑰檔案:
id_rsa(私鑰) id_rsa.pub(公鑰)
在homedirectory/.ssh處執行
cp id_rsa.pub authorized_keys 產生授權檔案[SSH在驗證的時候會讀取這個授權檔案檔案的內容]
SSH免密碼登入的作用:
通過SSH免密碼登入這種機制,只要知道遠端機器的主機名(hostname)與密碼(passwd),通過secure shell就可以登入到遠端的機器;通過SSH免密碼登陸不用手動啟動相應的程序!
除此之外,也可以通過上面的祕鑰檔案進行無密碼登陸!
驗證:ssh 主機名
下面是SSH免密碼登陸原理圖:
這裡寫圖片描述


6>安裝JDK並配置環境變數(驗證)
之所以安裝JDK,是因為Hadoop是執行在Jdk之上的
安裝Jdk的時候要配置兩個系統變數:
vi /etc/profile
export JAVA_HOME=/usr/local/jdk
export PATH= . :/usr/local/jdk/bin :$PATH
PATH指向的是JAVA_HOME的bin目錄
注意:PATH是內建的環境變數
source /etc/profile
驗證:java -version java javac
7>安裝HADOOP並配置環境變數,並修改hadoop的四個配置檔案,最後對hadoop進行格式化(驗證)
安裝HADOOP並配置環境變數:
安裝Hadoop的時候要配置兩個系統變數:

vi  /etc/profile
export  HADOOP_HOME=/usr/local/hadoop
export  PATH= .:/usr/local/jdk/bin:/usr/local/hadoop/bin:$PATH

PATH指向的是Hadoop_HOME的bin目錄
source /etc/profile
修改Hadoop的四個配置檔案:
預設的hadoop是一種本地執行模式,而在這裡講的是偽分佈,所以要修改配置檔案,適合偽分佈模式
這四個配置檔案分別是:
hadoop-env.sh(1)、core-site.xml(2)、hdfs-site.xml(2)、mapred-site.xml(1)
最後對hadoop進行格式化:
啟動hadoop之前要對hadoop格式化,因為新買的行動硬碟使用之前要進行格式化,格式化之後才能使用,格式化的目的是為了對磁碟上的空間按照一定的檔案格式進行處理
hadoop namenode -format
最後一步:消除警告:
export HADOOP_HOME_WARN_SUPPRESS=1
驗證:start-all.sh 之後會啟動5個java 程序
8>檢驗是否安裝成功
方式1:執行命令jps,看是否啟動5個程序
方式2:通過瀏覽器http://hadoop:50070http://hadoop:50030 分別檢視NameNode與JobTracker是否正常啟動。
4、簡述四個配置檔案中配置的內容
預設的hadoop是一種本地執行模式,通過修改配置檔案可以變成偽分佈模式.
四個配置檔案分別是:
第一個是hadoop環境變數指令碼檔案hadoop-env.sh
在這裡面需要指定Java_HOME(jdk)的具體路徑,因為hadoop是執行在JDK之上的.
第二個是hadoop的核心配置檔案core-site.xml
在這裡面需要指定HDFS(NameNode)的訪問路徑(fs.default.name)以及NameNode、DataNode等存放資料的公共目錄(hadoop.tmp.dir)
第三個是HDFS的配置檔案hdfs-site.xml
在這裡面需要指定DataNode存放block塊的副本數(dfs.replication),預設是3個.以及對HDFS的訪問許可權(dfs.permissions).
第四個是MapReduce的配置檔案mapred-site.xml
在這裡面需要指定MapReduce中JobTracker的訪問路徑
(mapred.job.tracker)
5、簡述宿主機(windows)與客戶機(安裝在虛擬機器中的linux)網路連線方式
首先要明確:之所以使用虛擬機器,是因為通過虛擬機器可以模擬出一臺機器,因為現有環境機器數量不夠
網路連線方式:
1>host-only
這種網路連線方式的特點是宿主機與客戶機單獨組網,客戶機(虛擬機器)與宿主機所在的區域網中的其它電腦之間不能夠相互通訊。
如果要使用host-only這種網路連線方式,虛擬機器VMWare的虛擬網絡卡Vmnet1必須要開啟
Vmnet1:192.168.80.1
Linux:192.168.80.100
Windows:192.168.70.1
優點:隔離網路,安全
缺點:虛擬機器與其它伺服器之間是不能通訊的
2>bridge(橋接)
這種網路連線方式的特點是客戶機(虛擬機器)與宿主機所在的區域網中的其它電腦之間能夠相互通訊。
如果要使用橋接這種網路連線方式,虛擬機器VMWare的虛擬網絡卡Vmnet8必須要開啟
Vmnet1:192.168.70.1
Linux:192.168.70.100
Windows:192.168.70.1
優點:因為同在一個區域網中,彼此之間資料的傳輸方便
缺點:不安全
3>NAT方式(略)
6、簡述Hadoop的目錄結構
bin目錄:用於存放hadoop常用命令的資料夾
conf目錄:用於存放hadoop相關配置檔案的資料夾
src目錄:用於存放hadoop原始碼的資料夾
docs目錄:用於存放於hadoop相關的文件與api
等等
7、簡述hadoop中常用的重要命令
start-all.sh:啟動hadoop
stop-all.sh :關閉hadoop
分別啟動:
啟動hdfs:start-dfs.sh
啟動mapreduce:starr-mapred.sh
分別啟動各個程序:
hadoop-daemon.sh start 程序名稱
這種啟動方式適用於單獨增加,刪除節點的情況,在hadoop叢集搭建的過程中經常使用.

(二)MapReduce執行原理  
執行原理圖示:
這裡寫圖片描述

MapReduce程式的執行過程分為兩個階段:Mapper階段和Reducer階段。
其中Mapper階段可以分為6個步驟:
第一階段:先將HDFS中的輸入檔案file按照一定的標準進行切片,預設切片的類為FileInputFormat,通過切片輸入檔案將會變成split1、split2、split3……;隨後對輸入切片split按照一定的規則解析成鍵值對<k1,v1>,預設處理的類為TextInputFormat。其中k1就是我們常說的起始偏移量,v1就是行文字的內容。
第二階段:呼叫自己編寫的map邏輯,將輸入的鍵值對<k1,v1>變成<k2,v2>。在這裡要注意:每一個鍵值對<k1,v1>都會呼叫一次map函式。
第三階段:按照一定的規則對輸出的鍵值對<k2,v2>進行分割槽:分割槽的規則是針對k2進行的,比如說k2如果是省份的話,那麼就可以按照不同的省份進行分割槽,同一個省份的k2劃分到一個區。注意:預設分割槽的類是HashPartitioner類,這個類預設只分為一個區,因此Reducer任務的數量預設也是1.
第四階段:對每個分割槽中的鍵值對進行排序。注意:所謂排序是針對k2進行的,v2是不參與排序的,如果要讓v2也參與排序,需要自定義排序的類,具體過程可以參看博主文章。
第五階段:排序完之後要進行分組,即相同key的value放到同一個集合當中,例如在WordCount程式中的<hello,{1,1}>執行的就是這個步驟,但是要注意:分組也是針對key進行的,經過分組完之後,就得到了我們熟悉的鍵值對<k2,v2s>.
第六階段(可選):對分組後的資料進行歸約處理。通過歸約處理鍵值對<k2,v2s>變成了<k2,v2>,經過這一階段,傳送到Reducer任務端的資料量會減少。但是規約的使用是有條件的,所以這一階段是可以選擇的。

Mapper任務處理完之後,就進入到了我們的Reducer階段:
Reducer任務的執行過程可以分為3個階段:
第一階段:對多個Mapper任務的輸出,按照不同的分割槽,通過網路拷貝到不同的Reducer節點上進行處理,將資料按照分割槽拷貝到不同的Reducer節點之後,對多個Mapper任務的輸出在進行合併,排序。例如:在WordCount程式中,若一個Mapper任務輸出了<hello,{1,1}>,另外一個Mapper任務的輸出為<hello,{1,1,1}>,經過在次合併之後變為<hello,{1,1,1,1,1}>.
第二階段:呼叫自己的reduce邏輯,將鍵值對<k2,v2s>變為<k3,v3>.在這裡注意:每一個鍵值對<k2,v2s>都會呼叫一次reduce函式。
第三階段:將Reducer任務的輸出儲存到指定的檔案中。

(三)基於MapReduce的KNN演算法實現
1、KNN演算法簡介
K近鄰演算法的思想較為簡單—–簡單來說就是“近朱者赤,近墨者黑”,KNN演算法將沒有分類標籤的資料與樣本集合中的所有的資料進行距離計算,然後提取出最相似的K個數據.K個數據中分類標籤出現最多的分類就是新資料的分類標籤.
總結下來:就是KNN演算法很容易理解,同時易於實現。但是KNN演算法的計算複雜度較高,計算量較大,對於每一個待分類的樣本都要計算它到全體已知樣本的距離,才能求得它的K個最近鄰點。因此在大資料環境下,尋找準確的近鄰需要花費較多的響應時間,限制了在實際中的一些應用—-此時我們的Hadoop平臺就得到了應用。
2、map函式和reduce函式虛擬碼實現
這裡寫圖片描述
這裡寫圖片描述
3、基於MapReduce的KNN演算法實現程式碼

package IT002;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;


//本程式的目的是實現MR-K-NN演算法
public class KNN0
{   
      public static String path1 = "hdfs://hadoop60:9000/test.txt";//讀取HDFS中的測試集
      public static String path2 = "hdfs://hadoop60:9000/testdir";
      public static void main(String[] args) throws Exception
      {
          FileSystem fileSystem = FileSystem.get(new Configuration());

          if(fileSystem.exists(new Path(path2)))
          {
              fileSystem.delete(new Path(path2), true);
          }

          Job job = new Job(new Configuration(),"KNN");
          job.setJarByClass(KNN0.class);
          FileInputFormat.setInputPaths(job, new Path(path1));//在這裡指定輸入檔案的父目錄即可,MapReduce會自動讀取輸入目錄下所有的檔案
          job.setInputFormatClass(TextInputFormat.class);
          job.setMapperClass(MyMapper.class);
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(Text.class);

          job.setNumReduceTasks(1);
          job.setPartitionerClass(HashPartitioner.class);


          job.setReducerClass(MyReducer.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(NullWritable.class);
          job.setOutputFormatClass(TextOutputFormat.class);
          FileOutputFormat.setOutputPath(job, new Path(path2));
          job.waitForCompletion(true);
          //檢視執行結果
          FSDataInputStream fr = fileSystem.open(new Path("hdfs://hadoop60:9000/testdir/part-r-00000"));
          IOUtils.copyBytes(fr, System.out, 1024, true);
      }
     public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>
     {
           public ArrayList<Instance> trainSet = new ArrayList<Instance>();
           public int k = 9;//k在這裡可以根據KNN演算法實際要求取值
           protected void setup(Context context)throws IOException, InterruptedException
           {
                 FileSystem fileSystem = null;  
                 try  
                {  
                   fileSystem = FileSystem.get(new URI("hdfs://hadoop60:9000/"), new Configuration());      
                 } catch (Exception e){}  
                FSDataInputStream fr0 = fileSystem.open(new Path("hdfs://hadoop60:9000/trainData.txt"));   
                BufferedReader fr1 = new BufferedReader(new InputStreamReader(fr0));   

                String str = fr1.readLine();  
                 while(str!=null)  
                 {  
                     Instance trainInstance = new Instance(str);
                     trainSet.add(trainInstance);
                     str = fr1.readLine();  
                 }   
           }
          protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException
          {
                 ArrayList<Double> distance = new ArrayList<Double>(k);
                 ArrayList<String>  trainLable = new ArrayList<String>(k);
                 for(int i=0;i<k;i++)
                 {
                      distance.add(Double.MAX_VALUE);
                      trainLable.add(String.valueOf(-1.0));
                 }

                 Instance testInstance = new Instance(v1.toString());
                 for(int i=0;i<trainSet.size();i++)
                 {
                        double dis = Distance.EuclideanDistance(trainSet.get(i).getAttributeset(),testInstance.getAttributeset());

                        for(int j=0;j<k;j++)
                        {
                            if(dis <(Double) distance.get(j))
                            {
                                distance.set(j, dis);
                                trainLable.set(j,trainSet.get(i).getLable()+"");
                                break;
                            }
                        } 
                 }  
                 for(int i=0;i<k;i++)
                 {
                       context.write(new Text(v1.toString()),new Text(trainLable.get(i)+""));
                 }
          }
     }
     public static class MyReducer  extends Reducer<Text, Text, Text, NullWritable>
     {
            protected void reduce(Text k2, Iterable<Text> v2s,Context context)throws IOException, InterruptedException
            {
                  String predictlable ="";  
                  ArrayList<String> arr = new ArrayList<String>();
                  for (Text v2 : v2s)
                  { 
                      arr.add(v2.toString());   
                  }
                  predictlable = MostFrequent(arr);
                  String  preresult = k2.toString()+"\t"+predictlable;//**********根據實際情況進行修改**************
                  context.write(new Text(preresult),NullWritable.get());
            } 
            public String MostFrequent(ArrayList arr)
            {
                   HashMap<String, Double> tmp = new HashMap<String,Double>();
                   for(int i=0;i<arr.size();i++)
                   {
                        if(tmp.containsKey(arr.get(i)))
                        {
                             double frequence = tmp.get(arr.get(i))+1;
                             tmp.remove(arr.get(i));
                             tmp.put((String) arr.get(i),frequence);
                        }
                        else
                            tmp.put((String) arr.get(i),new Double(1));
                   }
                   Set<String> s = tmp.keySet();

                   Iterator it = s.iterator();
                   double lablemax=Double.MIN_VALUE;
                   String predictlable = null;
                   while(it.hasNext())
                   {
                       String key = (String) it.next();
                       Double lablenum = tmp.get(key);
                       if(lablenum > lablemax)
                       {
                            lablemax = lablenum;
                            predictlable = key;
                       }
                   }
                   return predictlable;
            }
     }
}
class Distance
{
      public static double EuclideanDistance(double[] a,double[] b)
      {
           double sum = 0.0;
           for(int i=0;i<a.length;i++)
           {
               sum +=Math.pow(a[i]-b[i],2);   
           }    
           return Math.sqrt(sum);//計算測試樣本與訓練樣本之間的歐式距離
      }
}
class Instance
{
      public double[] attributeset;//存放樣例屬性
      public double lable;//存放樣例標籤
      public  Instance(String line)
      {
            String[] splited = line.split("\t");
            attributeset = new double[splited.length-1];
            for(int i=0;i<attributeset.length;i++)
            {
                  attributeset[i] = Double.parseDouble(splited[i]);  
            }
            lable = Double.parseDouble(splited[splited.length-1]);      
      }
      public double[] getAttributeset()
      {
           return attributeset; 
      }
      public double getLable()
      {
            return lable;
      }
}