1. 程式人生 > >一個基於Mahout與hadoop的聚類搭建

一個基於Mahout與hadoop的聚類搭建

 mahout是基於hadoop的資料探勘工具,因為有了hadoop,所以進行海量資料的挖掘工作顯得更為簡單。但是因為演算法需要支援M/R,所以不是所有常用的資料探勘演算法都會支援。這篇文章會告訴你,如何使用hadoop + mahout搭出一個簡易的聚類工具。 

第一步:搭建hadoop平臺。

我使用的是ubuntu 11.04,如果沒有ubuntu的開發環境,就參考我的帖子《Ubuntu 10.10 java 開發環境》

#1 在ubuntu下面建立一個使用者組與使用者
Java程式碼  收藏程式碼
  1. beneo@ubuntu:~$ sudo addgroup hadoop  
  2. beneo@ubuntu:~$ sudo adduser --ingroup hadoop hduser  


#2 安裝ssh-server
Java程式碼  收藏程式碼
  1. beneo@ubuntu:~$ sudo apt-get install ssh  
  2. beneo@ubuntu:~$ su - hduser  
  3. hduser@ubuntu:~$ ssh-keygen -t rsa -P ""  
  4. hduser@ubuntu:~$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys  


#3 驗證ssh通訊
Java程式碼  收藏程式碼
  1. hduser@ubuntu:ssh localhost  


ssh localhost 後,選擇 yes,如果沒有問題,就可以安裝hadoop了 


#4 新增java_home
修改conf/hadoop-env.sh檔案,讓JAVA_HOME指向正確的地址 

#5 修改下面的配置
conf/core-site.xml: 
Java程式碼  收藏程式碼
  1. <configuration>  
  2.      <property>  
  3.          <name>fs.default.name</name>  
  4.          <value>hdfs://localhost:9000</value>  
  5.      </property>  
  6. </configuration>  

conf/hdfs-site.xml: 

Java程式碼  收藏程式碼
  1. <configuration>  
  2.      <property>  
  3.          <name>dfs.replication</name>  
  4.          <value>1</value>  
  5.      </property>  
  6. </configuration>  

conf/mapred-site.xml: 
Java程式碼  收藏程式碼
  1. <configuration>  
  2.      <property>  
  3.          <name>mapred.job.tracker</name>  
  4.          <value>localhost:9001</value>  
  5.      </property>  
  6. </configuration>  


#6 Format a new distributed-filesystem:
Java程式碼  收藏程式碼
  1. $ bin/hadoop namenode -format  


#7 Start the hadoop daemons:
Java程式碼  收藏程式碼
  1. $ bin/start-all.sh  


#8 驗證啟動成功沒有
Java程式碼  收藏程式碼
  1. $ jps  

數一下有沒有6個,沒有的話,刪除logs下面的檔案,然後從#6開始 

#9 別慌,先開啟網頁,打不開,等!!!
Java程式碼  收藏程式碼
  1. NameNode - http://localhost:50070/  
  2. JobTracker - http://localhost:50030/  


第一步搭建hadoop結束 

第二步,Mahout的配置

#1 下載Mahout,解壓
#2 .bash_profile裡面設定HADOOP_HOME
#3 mahout/bin/mahout 看看列印結果

第三步,做一個聚類的demo吧

我的聚類是文字 -> lucene index -> mahout -> clustering dumper 
可以選擇的是 sequeneceFile -> mahout -> clustering dumper 

我直接貼程式碼吧,用的是groovy,可能寫的不好 
#1 text -> lucene index
Java程式碼  收藏程式碼
  1. def assembleDoc = {  
  2.     label, content ->  
  3.     assert !label.toString().trim().empty  
  4.     assert !content.toString().trim().empty  
  5.     def doc = new Document()  
  6.     doc.add(new Field("label", label, Field.Store.YES, Field.Index.NOT_ANALYZED))  
  7.     doc.add(new Field("content", content, Field.Store.NO, Field.Index.ANALYZED, TermVector.YES))  
  8.     doc  
  9. }  
  10. def mockContent = {  
  11.     def set = []  
  12.     new File("""/home/beneo/text.txt""").newReader().eachLine {  
  13.         String line ->  
  14.         set << line  
  15.     }  
  16.     set  
  17. }  
  18. def mockExpandoSet = {  
  19.     def lst = []  
  20.     mockContent()?.each {  
  21.         content ->  
  22.         // 過濾掉所有非中文字元  
  23.         def line = content.replaceAll("[^\u4e00-\u9fa5]+""")  
  24.         if (line != null && line.trim().length() > 2) {  
  25.             println(content)  
  26.             def expando = new Expando()  
  27.             expando.label = content  
  28.             expando.content = line  
  29.             lst << expando  
  30.         }  
  31.     }  
  32.     lst  
  33. }  
  34. //建立一個dic  
  35. def directory = FSDirectory.open(new File("""/home/beneo/index"""), NoLockFactory.getNoLockFactory())  
  36. // 用的是 IK分詞  
  37. def analyzer = new IKAnalyzer()  
  38. //建立一個indexWriter,這個wirter就是用來產生出index  
  39. def indexWriter = new IndexWriter(directory, analyzer, true, IndexWriter.MaxFieldLength.UNLIMITED)  
  40. //從本地獲得文字  
  41. mockExpandoSet().each {  
  42.     expando ->  
  43.     indexWriter.addDocument(assembleDoc(expando.label, expando.content))  
  44. }  
  45. indexWriter.commit()  
  46. indexWriter.close()  
  47. directory.close()  


#2 lucene index -> mahout vector 
Java程式碼  收藏程式碼
  1. mahout/bin/mahout lucene.vector -d index/ -i label -o tmp/vector/vector -f content -t tmp/vector/dict -n 2  


#3 mahout vector -> mahout canopy
Java程式碼  收藏程式碼
  1. mahout/bin/mahout canopy -i tmp/vector/vector -o tmp/canopy/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -t1 0.32 -t2 0.08 -ow  


#4 mahout canopy -> mahout kmeans
Java程式碼  收藏程式碼
  1. mahout/bin/mahout kmeans -i tmp/vector/vector -c tmp/canopy/clusters-0/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -o tmp/kmeans/ -cd 0.001 -x 10 -ow -cl  


#5 mahout keamns -> 結果分析
Java程式碼  收藏程式碼
  1. String seqFileDir = "/home/hduser/tmp/kmeans/clusters-2/"  
  2. String pointsDir = "/home/hduser/tmp/kmeans/clusteredPoints/"  
  3. def conf = new Configuration()  
  4. FileSystem fs = new Path(seqFileDir).getFileSystem(conf)  
  5. Map<Integer, List<WeightedVectorWritable>> clusterIdToPoints = readPoints(new Path(pointsDir), new Configuration());  
  6. for (FileStatus seqFile: fs.globStatus(new Path(seqFileDir, "part-*"))) {  
  7.     Path path = seqFile.getPath()  
  8.     SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);  
  9.     org.apache.hadoop.io.Writable key = reader.getKeyClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();  
  10.     org.apache.hadoop.io.Writable value = reader.getValueClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();  
  11.     while (reader.next(key, value)) {  
  12.         Cluster cluster = (Cluster) value;  
  13.         int id = cluster.getId()  
  14.         int np = cluster.getNumPoints()  
  15.         List<WeightedVectorWritable> points = clusterIdToPoints.get(cluster.getId());  
  16.         if (points != null && points.size() > 4) {  
  17.             for (Iterator<WeightedVectorWritable> iterator = points.iterator(); iterator.hasNext();) {  
  18.                 println(((NamedVector) iterator.next().getVector()).getName())  
  19.             }  
  20.             println "======================================"  
  21.         }  
  22.     }  
  23. }  
  24. private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, Configuration conf)  
  25. throws IOException {  
  26.     Map<Integer, List<WeightedVectorWritable>> result = new TreeMap<Integer, List<WeightedVectorWritable>>();  
  27.     FileSystem fs = pointsPathDir.getFileSystem(conf);  
  28.     FileStatus[] children = fs.listStatus(pointsPathDir, new PathFilter() {  
  29.         @Override  
  30.         public boolean accept(Path path) {  
  31.             String name = path.getName();  
  32.             return !(name.endsWith(".crc") || name.startsWith("_"));  
  33.         }  
  34.     });  
  35.     for (FileStatus file: children) {  
  36.         Path path = file.getPath();  
  37.         SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);  
  38.         IntWritable key = reader.getKeyClass().asSubclass(IntWritable.class).newInstance();  
  39.         WeightedVectorWritable value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();  
  40.         while (reader.next(key, value)) {  
  41.             // value is the cluster id as an int, key is the name/id of the  
  42.             // vector, but that doesn't matter because we only care about printing  
  43.             // it  
  44.             // String clusterId = value.toString();  
  45.             List<WeightedVectorWritable> pointList = result.get(key.get());  
  46.             if (pointList == null) {  
  47.                 pointList = new ArrayList<WeightedVectorWritable>();  
  48.                 result.put(key.get(), pointList);  
  49.             }  
  50.             pointList.add(value);  
  51.             value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();  
  52.         }  
  53.     }  
  54.     return result;  
  55. }  


效果我就不展示了