1. 程式人生 > >基於hadoop的推薦演算法-mahout版

基於hadoop的推薦演算法-mahout版

基於hadoop的推薦演算法,講其中mahout實現的基於專案的推薦演算法

分為4步:

1.獲得人-物 使用者矩陣

    輸入為所有人對物品的評價或關聯

    map端輸出key為人,value為物品+傾好度

    reeduce端輸出key為人,vallue為多個物品+傾好度

2.獲得物-物 專案矩陣

   輸入為“使用者矩陣”,講每一行人-物資料中的物品做笛卡爾積,生產成物-物的關聯

   map端輸出為key為物,value為關聯度

   reduce端輸出key為物,value為多個物的關聯度

(可以根據各種規則生成專案相似度矩陣表,此處演算法帶過)

修改:

求專案相似矩陣是基於專案的協同過濾演算法的核心

公式有很多種,核心是物品i和物品j相關使用者的交集與並集的商

mahout使用的公式是1.dot(i,j) = sum(Pi(u)*Pi(u))

                  2.norms(i) = sum(Pi(u)^2)

                  3.simi(i,j) = 1/(1+(norms(i)-2*dot(i,j)+noorm(i))^1/2)

mahout的實現方法是

第一個job,用物品-人的矩陣,求得norms,即物品的使用者平方和,輸出是物-norms

第二個job,Map:用人-物的矩陣,求Pi(u)*Pi(u),即相同使用者的物品的評價的乘機,輸出物-多個對端物品的Pi(u)*Pi(u)

           Reduce:用物-多個對端物品的Pi(u)*Pi(u)和物-norms,求得物品的相似矩陣(因為這個時候可以彙總所有和這個物品相關的物品的dot)

第三個job,補全物品的相似矩陣

3.獲得使用者-專案相似矩陣

輸入為人-物 使用者矩陣 和 物-物 專案矩陣

Map端輸出key為物,value為類VectorOrPrefWritable,是包含物與人的傾好度,或是物與物的相似度

reduce端輸出key為物,value為類VectorAndPrefWritable,是彙總當個物品到所有人的傾好度和到所有物品的相似度

4.獲得使用者推薦矩陣

輸入為VectorAndPrefWritable

Map端輸出為key:人,value:物+係數(map端根據單個物品貢獻的係數生成推薦係數,也就是人到物品A的傾好度*物品A到其他物品的相似度)

reduce端輸出為key:人,,value:推薦專案+係數(reduce端使用自定公式,彙總所有單物品貢獻的四叔,求人到其他專案的傾好度,取topn作為當前使用者的推薦專案)

再在這裡貼幾個mahout推薦演算法分析的帖子:

http://eric-gcm.iteye.com/blog/1817822

http://eric-gcm.iteye.com/blog/1818033

http://eric-gcm.iteye.com/blog/1820060

以下是mahout程式碼:

 ItemSimilarityJob類是mahout使用hadoop做推薦引擎的主要實現類,下面開始分析。

run()函式是啟動函式:

Java程式碼  收藏程式碼
  1. public final class RecommenderJob extends AbstractJob {  
  2.   public static final String BOOLEAN_DATA = "booleanData";  
  3.   private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;  
  4.   private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;  
  5.   private static final int DEFAULT_MIN_PREFS_PER_USER = 1;  
  6.   @Override  
  7.   public int run(String[] args) throws Exception {  
  8.     //這裡原來有大一堆程式碼,都是用來載入配置項,不用管它  
  9.     //第一步:準備矩陣,將原始資料轉換為一個矩陣,在PreparePreferenceMatrixJob這個類中完成  
  10.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  11.       ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{  
  12.               "--input", getInputPath().toString(),  
  13.               "--output", prepPath.toString(),  
  14.               "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),  
  15.               "--minPrefsPerUser", String.valueOf(minPrefsPerUser),  
  16.               "--booleanData", String.valueOf(booleanData),  
  17.               "--tempDir", getTempPath().toString()});  
  18.       numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());  
  19.     }  
  20.     //第二步:計算協同矩陣  
  21.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  22.       /* special behavior if phase 1 is skipped */  
  23.       if (numberOfUsers == -1) {  
  24.         numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  25.                 PathType.LIST, null, getConf());  
  26.       }  
  27.       /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like 
  28.        * new DistributedRowMatrix(...).rowSimilarity(...) */  
  29.       //calculate the co-occurrence matrix  
  30.       ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{  
  31.               "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),  
  32.               "--output", similarityMatrixPath.toString(),  
  33.               "--numberOfColumns", String.valueOf(numberOfUsers),  
  34.               "--similarityClassname", similarityClassname,  
  35.               "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),  
  36.               "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),  
  37.               "--threshold", String.valueOf(threshold),  
  38.               "--tempDir", getTempPath().toString()});  
  39.     }  
  40.     //start the multiplication of the co-occurrence matrix by the user vectors  
  41.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  42.       Job prePartialMultiply1 = prepareJob(  
  43.               similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,  
  44.               SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  45.               Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  46.               SequenceFileOutputFormat.class);  
  47.       boolean succeeded = prePartialMultiply1.waitForCompletion(true);  
  48.       if (!succeeded)   
  49.         return -1;  
  50.       //continue the multiplication  
  51.       Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),  
  52.               prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,  
  53.               VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  54.               SequenceFileOutputFormat.class);  
  55.       if (usersFile != null) {  
  56.         prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);  
  57.       }  
  58.       prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,  
  59.               maxPrefsPerUser);  
  60.       succeeded = prePartialMultiply2.waitForCompletion(true);  
  61.       if (!succeeded)   
  62.         return -1;  
  63.       //finish the job  
  64.       Job partialMultiply = prepareJob(  
  65.               new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,  
  66.               SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,  
  67.               ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  68.               SequenceFileOutputFormat.class);  
  69.       setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);  
  70.       succeeded = partialMultiply.waitForCompletion(true);  
  71.       if (!succeeded)   
  72.         return -1;  
  73.     }  
  74.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  75.       //filter out any users we don't care about  
  76.       /* convert the user/item pairs to filter if a filterfile has been specified */  
  77.       if (filterFile != null) {  
  78.         Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,  
  79.                 ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,  
  80.                 ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,  
  81.                 SequenceFileOutputFormat.class);  
  82.         boolean succeeded = itemFiltering.waitForCompletion(true);  
  83.         if (!succeeded)   
  84.           return -1;  
  85.       }  
  86.       String aggregateAndRecommendInput = partialMultiplyPath.toString();  
  87.       if (filterFile != null) {  
  88.         aggregateAndRecommendInput += "," + explicitFilterPath;  
  89.       }  
  90.       //extract out the recommendations  
  91.       Job aggregateAndRecommend = prepareJob(  
  92.               new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,  
  93.               PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,  
  94.               AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,  
  95.               TextOutputFormat.class);  
  96.       Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();  
  97.       if (itemsFile != null) {  
  98.         aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);  
  99.       }  
  100.       if (filterFile != null) {  
  101.         setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);  
  102.       }  
  103.       setIOSort(aggregateAndRecommend);  
  104.       aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,  
  105.               new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());  
  106.       aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);  
  107.       aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);  
  108.       boolean succeeded = aggregateAndRecommend.waitForCompletion(true);  
  109.       if (!succeeded)   
  110.         return -1;  
  111.     }  
  112.     return 0;  
  113.   }  

       第二步,計算協同矩陣,主要在RowSimilarityJob 這個類中完成

Java程式碼  收藏程式碼
  1. ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{  
  2.               "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),  
  3.               "--output", similarityMatrixPath.toString(),  
  4.               "--numberOfColumns", String.valueOf(numberOfUsers),  
  5.               "--similarityClassname", similarityClassname,  
  6.               "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),  
  7.               "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),  
  8.               "--threshold", String.valueOf(threshold),  
  9.               "--tempDir", getTempPath().toString()});  
  10.     }  

   可以看到這個job的輸入路徑就是上一篇中,PreparePreferenceMatrixJob中最後一個reducer的輸出路徑。

下邊詳細分析RowSimilarityJob類的實現:

Java程式碼  收藏程式碼
  1. public class RowSimilarityJob extends AbstractJob {  
  2.   @Override  
  3.   public int run(String[] args) throws Exception {  
  4.     //一大堆載入引數的程式碼,忽略  
  5.     //第一個MapReduce  
  6.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  7.       Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,  
  8.           VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);  
  9.       normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);  
  10.       Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();  
  11.       normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));  
  12.       normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());  
  13.       normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());  
  14.       normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());  
  15.       normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);  
  16.       boolean succeeded = normsAndTranspose.waitForCompletion(true);  
  17.       if (!succeeded) {  
  18.         return -1;  
  19.       }  
  20.     }  
  21.     //第二個MapReduce  
  22.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  23.       Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,  
  24.           IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);  
  25.       pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);  
  26.       Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();  
  27.       pairwiseConf.set(THRESHOLD, String.valueOf(threshold));  
  28.       pairwiseConf.set(NORMS_PATH, normsPath.toString());  
  29.       pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());  
  30.       pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());  
  31.       pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);  
  32.       pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);  
  33.       pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);  
  34.       boolean succeeded = pairwiseSimilarity.waitForCompletion(true);  
  35.       if (!succeeded) {  
  36.         return -1;  
  37.       }  
  38.     }  
  39.     //第三個MapReduce  
  40.     if (shouldRunNextPhase(parsedArgs, currentPhase)) {  
  41.       Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,  
  42.           IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,  
  43.           VectorWritable.class);  
  44.       asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);  
  45.       asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);  
  46.       boolean succeeded = asMatrix.waitForCompletion(true);  
  47.       if (!succeeded) {  
  48.         return -1;  
  49.       }  
  50.     }  
  51.     return 0;  
  52.   }  

 可以看到RowSimilityJob也是分成三個MapReduce過程:

1、Mapper :VectorNormMapper類,輸出 ( userid_index, <itemid_index, pref> )型別