基於hadoop的推薦演算法-mahout版
基於hadoop的推薦演算法,講其中mahout實現的基於專案的推薦演算法
分為4步:
1.獲得人-物 使用者矩陣
輸入為所有人對物品的評價或關聯
map端輸出key為人,value為物品+傾好度
reeduce端輸出key為人,vallue為多個物品+傾好度
2.獲得物-物 專案矩陣
輸入為“使用者矩陣”,講每一行人-物資料中的物品做笛卡爾積,生產成物-物的關聯
map端輸出為key為物,value為關聯度
reduce端輸出key為物,value為多個物的關聯度
(可以根據各種規則生成專案相似度矩陣表,此處演算法帶過)
修改:
求專案相似矩陣是基於專案的協同過濾演算法的核心
公式有很多種,核心是物品i和物品j相關使用者的交集與並集的商
mahout使用的公式是1.dot(i,j) = sum(Pi(u)*Pi(u))
2.norms(i) = sum(Pi(u)^2)
3.simi(i,j) = 1/(1+(norms(i)-2*dot(i,j)+noorm(i))^1/2)
mahout的實現方法是
第一個job,用物品-人的矩陣,求得norms,即物品的使用者平方和,輸出是物-norms
第二個job,Map:用人-物的矩陣,求Pi(u)*Pi(u),即相同使用者的物品的評價的乘機,輸出物-多個對端物品的Pi(u)*Pi(u)
Reduce:用物-多個對端物品的Pi(u)*Pi(u)和物-norms,求得物品的相似矩陣(因為這個時候可以彙總所有和這個物品相關的物品的dot)
第三個job,補全物品的相似矩陣
3.獲得使用者-專案相似矩陣
輸入為人-物 使用者矩陣 和 物-物 專案矩陣
Map端輸出key為物,value為類VectorOrPrefWritable,是包含物與人的傾好度,或是物與物的相似度
reduce端輸出key為物,value為類VectorAndPrefWritable,是彙總當個物品到所有人的傾好度和到所有物品的相似度
4.獲得使用者推薦矩陣
輸入為VectorAndPrefWritable
Map端輸出為key:人,value:物+係數(map端根據單個物品貢獻的係數生成推薦係數,也就是人到物品A的傾好度*物品A到其他物品的相似度)
reduce端輸出為key:人,,value:推薦專案+係數(reduce端使用自定公式,彙總所有單物品貢獻的四叔,求人到其他專案的傾好度,取topn作為當前使用者的推薦專案)
再在這裡貼幾個mahout推薦演算法分析的帖子:
http://eric-gcm.iteye.com/blog/1817822
http://eric-gcm.iteye.com/blog/1818033
http://eric-gcm.iteye.com/blog/1820060
以下是mahout程式碼:
ItemSimilarityJob類是mahout使用hadoop做推薦引擎的主要實現類,下面開始分析。
run()函式是啟動函式:
Java程式碼- public final class RecommenderJob extends AbstractJob {
- public static final String BOOLEAN_DATA = "booleanData";
- private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
- private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
- private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
- @Override
- public int run(String[] args) throws Exception {
- //這裡原來有大一堆程式碼,都是用來載入配置項,不用管它
- //第一步:準備矩陣,將原始資料轉換為一個矩陣,在PreparePreferenceMatrixJob這個類中完成
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
- "--input", getInputPath().toString(),
- "--output", prepPath.toString(),
- "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
- "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
- "--booleanData", String.valueOf(booleanData),
- "--tempDir", getTempPath().toString()});
- numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf());
- }
- //第二步:計算協同矩陣
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- /* special behavior if phase 1 is skipped */
- if (numberOfUsers == -1) {
- numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
- PathType.LIST, null, getConf());
- }
- /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should refactor this call to something like
- * new DistributedRowMatrix(...).rowSimilarity(...) */
- //calculate the co-occurrence matrix
- ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
- "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
- "--output", similarityMatrixPath.toString(),
- "--numberOfColumns", String.valueOf(numberOfUsers),
- "--similarityClassname", similarityClassname,
- "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
- "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
- "--threshold", String.valueOf(threshold),
- "--tempDir", getTempPath().toString()});
- }
- //start the multiplication of the co-occurrence matrix by the user vectors
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job prePartialMultiply1 = prepareJob(
- similarityMatrixPath, prePartialMultiplyPath1, SequenceFileInputFormat.class,
- SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
- Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
- SequenceFileOutputFormat.class);
- boolean succeeded = prePartialMultiply1.waitForCompletion(true);
- if (!succeeded)
- return -1;
- //continue the multiplication
- Job prePartialMultiply2 = prepareJob(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS),
- prePartialMultiplyPath2, SequenceFileInputFormat.class, UserVectorSplitterMapper.class, VarIntWritable.class,
- VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
- SequenceFileOutputFormat.class);
- if (usersFile != null) {
- prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE, usersFile);
- }
- prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
- maxPrefsPerUser);
- succeeded = prePartialMultiply2.waitForCompletion(true);
- if (!succeeded)
- return -1;
- //finish the job
- Job partialMultiply = prepareJob(
- new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2), partialMultiplyPath,
- SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,
- ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
- SequenceFileOutputFormat.class);
- setS3SafeCombinedInputPath(partialMultiply, getTempPath(), prePartialMultiplyPath1, prePartialMultiplyPath2);
- succeeded = partialMultiply.waitForCompletion(true);
- if (!succeeded)
- return -1;
- }
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- //filter out any users we don't care about
- /* convert the user/item pairs to filter if a filterfile has been specified */
- if (filterFile != null) {
- Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class,
- ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,
- ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,
- SequenceFileOutputFormat.class);
- boolean succeeded = itemFiltering.waitForCompletion(true);
- if (!succeeded)
- return -1;
- }
- String aggregateAndRecommendInput = partialMultiplyPath.toString();
- if (filterFile != null) {
- aggregateAndRecommendInput += "," + explicitFilterPath;
- }
- //extract out the recommendations
- Job aggregateAndRecommend = prepareJob(
- new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class,
- PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,
- AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,
- TextOutputFormat.class);
- Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration();
- if (itemsFile != null) {
- aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile);
- }
- if (filterFile != null) {
- setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath);
- }
- setIOSort(aggregateAndRecommend);
- aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,
- new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
- aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations);
- aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
- boolean succeeded = aggregateAndRecommend.waitForCompletion(true);
- if (!succeeded)
- return -1;
- }
- return 0;
- }
第二步,計算協同矩陣,主要在RowSimilarityJob 這個類中完成
Java程式碼- ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
- "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
- "--output", similarityMatrixPath.toString(),
- "--numberOfColumns", String.valueOf(numberOfUsers),
- "--similarityClassname", similarityClassname,
- "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
- "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
- "--threshold", String.valueOf(threshold),
- "--tempDir", getTempPath().toString()});
- }
可以看到這個job的輸入路徑就是上一篇中,PreparePreferenceMatrixJob中最後一個reducer的輸出路徑。
下邊詳細分析RowSimilarityJob類的實現:
Java程式碼- public class RowSimilarityJob extends AbstractJob {
- @Override
- public int run(String[] args) throws Exception {
- //一大堆載入引數的程式碼,忽略
- //第一個MapReduce
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
- VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
- normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
- Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
- normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
- normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
- normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
- normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
- normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
- boolean succeeded = normsAndTranspose.waitForCompletion(true);
- if (!succeeded) {
- return -1;
- }
- }
- //第二個MapReduce
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
- IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
- pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
- Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
- pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
- pairwiseConf.set(NORMS_PATH, normsPath.toString());
- pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
- pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
- pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
- pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
- pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
- boolean succeeded = pairwiseSimilarity.waitForCompletion(true);
- if (!succeeded) {
- return -1;
- }
- }
- //第三個MapReduce
- if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
- IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
- VectorWritable.class);
- asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
- asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
- boolean succeeded = asMatrix.waitForCompletion(true);
- if (!succeeded) {
- return -1;
- }
- }
- return 0;
- }
可以看到RowSimilityJob也是分成三個MapReduce過程:
1、Mapper :VectorNormMapper類,輸出 ( userid_index, <itemid_index, pref> )型別