基於內容的協同過濾推薦演算法:給使用者推薦和他們之前喜歡的物品在內容上相似的其他物品

物品特徵建模(item profile)

以電影為例

1表示電影具有某特徵,0表示電影不具有某特徵

                          科幻    言情    喜劇    動作    紀實    國產    歐美    日韓    斯嘉麗的約翰    成龍    范冰冰

復仇者聯盟:        1         0         0         1        0        0          1        0            1                  0         0          

綠巨人:              1         0         0         1        0        0          1         0            0                 0         0

寶貝計劃:           0         0         1         1        0        1          0         0            0                 1         1

十二生肖:           0         0         0         1        0        1          0         0            0                 1         0

演算法步驟

1.構建item profile矩陣

物品ID——標籤

        tag:     (1)        (2)        (3)        (4)        (5)        (6)        (7)        (8)        (9)       

I1:              1          0          0          1          1          0          1          0          0  

I2:              0          1          0          1          0          0          1          0          1       

I3:              0          1          1          0          0          1          0          1          1      

I4:              1         0           1          1          1          0          0          0          0       

I5:              0         1           0          1          0          0          1          1          0       

2.構建item user評分矩陣

使用者ID——物品ID

            I1        I2        I3        I4        I5

U1        1         0         0         0         5

U2        0         4         0         1         0

U3        0         5         3         0         1

3.item user    X    item profile    = user profile

使用者ID——標籤

tag:     (1)        (2)        (3)        (4)        (5)        (6)        (7)        (8)        (9)                   

U1       1          5           0         6           1          0         6           5          0

U2       1          4           1         5           1          0         4           0          4

U3       0          9           3         6           0          3         6           4          8

值的含義:使用者對所有標籤感興趣的程度

比如: U1-(1)表示使用者U1對特徵(1)的偏好權重為1,可以看出使用者U1對特徵(4)(7)最感興趣,其權重為6

4.對user profile 和 item profile求餘弦相似度

左側矩陣的每一行與右側矩陣的每一行計算餘弦相似度

cos<U1,I1>表示使用者U1對物品I1的喜好程度,最後需要將已有評分的物品置零,不推薦該物品

專案目錄:


輸入檔案如下



MapReduce步驟

1.將item profile轉置

輸入:物品ID(行)——標籤ID(列)——0或1        物品特徵建模

輸出:標籤ID(行)——物品ID(列)——0或1

程式碼:

mapper1

package step1;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author liyijie
 * @date 2018年5月13日下午10:36:18
 * @email [email protected]
 * @remark
 * @version 
 * 
 * 將item profile轉置
 */
public class Mapper1  extends Mapper<LongWritable, Text, Text, Text>  {
	private Text outKey = new Text();
	private Text outValue = new Text();
	/**
	 * key:1
	 * value:1	1_0,2_3,3_-1,4_2,5_-3
	 * */
    @Override  
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
       String[] rowAndLine = value.toString().split("\t");
       
       //矩陣行號	物品ID
       String itemID = rowAndLine[0];
       //列值	使用者ID_分值
       String[] lines = rowAndLine[1].split(",");
       
       
       for(int i = 0 ; i<lines.length; i++){
    	   String userID = lines[i].split("_")[0];
    	   String score = lines[i].split("_")[1];
    	   
    	   //key:列號	 使用者ID	value:行號_值	 物品ID_分值
    	   outKey.set(userID);
    	   outValue.set(itemID+"_"+score);
    	   
    	   context.write(outKey, outValue);
       }
    } 
}

reducer1

package step1;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author liyijie
 * @date 2018年5月13日下午10:56:28
 * @email [email protected]
 * @remark
 * @version 
 * 
 * 
 * 
 * 將item profile轉置
 */
public class Reducer1 extends Reducer<Text, Text, Text, Text> {
	private Text outKey = new Text();
	private Text outValue = new Text();
	
	 //key:列號	 使用者ID		value:行號_值,行號_值,行號_值,行號_值...	物品ID_分值
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		StringBuilder sb = new StringBuilder();
		
		//text:行號_值		物品ID_分值
		for(Text text:values){  
            sb.append(text).append(",");
        }  
		String line = null;
		if(sb.toString().endsWith(",")){
			line = sb.substring(0, sb.length()-1);
		}
	
		
		outKey.set(key);
		outValue.set(line);
		
		context.write(outKey,outValue);  
	}
	
	
}

mr1

package step1;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @author liyijie
 * @date 2018年5月13日下午11:07:13
 * @email [email protected]
 * @remark
 * @version 
 * 
 * 將item profile轉置
 */
public class MR1 {
	private static String inputPath = "/content/step1_input";
	private static String outputPath = "/content/step1_output";
	private static String hdfs = "hdfs://node1:9000";
	
	public int run(){
		try {
		Configuration conf=new Configuration();  
		conf.set("fs.defaultFS", hdfs);		
		Job	job = Job.getInstance(conf,"step1");
		
		
		//配置任務map和reduce類  
		job.setJarByClass(MR1.class);  
		job.setJar("F:\\eclipseworkspace\\content\\content.jar");  
	      job.setMapperClass(Mapper1.class);  
	      job.setReducerClass(Reducer1.class);  

	      job.setMapOutputKeyClass(Text.class);  
	      job.setMapOutputValueClass(Text.class);  

	      job.setOutputKeyClass(Text.class);  
	      job.setOutputValueClass(Text.class);  

	      FileSystem fs = FileSystem.get(conf);
	      Path inpath = new Path(inputPath);
	      if(fs.exists(inpath)){
	          FileInputFormat.addInputPath(job,inpath);  
	      }else{
	    	  System.out.println(inpath);
	    	  System.out.println("不存在");
	      }
	      
	      Path outpath = new Path(outputPath);
	      fs.delete(outpath,true);
	      FileOutputFormat.setOutputPath(job, outpath); 
	      
			return job.waitForCompletion(true)?1:-1;
		} catch (ClassNotFoundException | InterruptedException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return -1;
	}
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException  { 
		int result = -1;
		result = new MR1().run();
		if(result==1){
			System.out.println("step1執行成功");
		}else if(result==-1){
			System.out.println("step1執行失敗");
		}
	  }
}

結果



2.item user (評分矩陣)    X    item profile(已轉置)

輸入:根據使用者的行為列表計算的評分矩陣

快取:步驟1輸出

輸出:使用者ID(行)——標籤ID(列)——分值(使用者對所有標籤感興趣的程度)

程式碼:

mapper2

package step2;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author liyijie
 * @date 2018年5月13日下午11:43:51
 * @email [email protected]
 * @remark
 * @version 
 * 
 * 
 * item user (評分矩陣)    X    item profile(已轉置)
 */
public class Mapper2 extends Mapper<LongWritable, Text, Text, Text> {
	private Text outKey = new Text();
	private Text outValue = new Text();
	private List<String> cacheList = new ArrayList<String>();
	
	private DecimalFormat df = new DecimalFormat("0.00");
		
	/**在map執行之前會執行這個方法,只會執行一次
	 * 
	 * 通過輸入流將全域性快取中的矩陣讀入一個java容器中
	 */
	@Override
	protected void setup(Context context)throws IOException, InterruptedException {
		super.setup(context);
		FileReader fr = new FileReader("itemUserScore1");
		BufferedReader br  = new BufferedReader(fr);
		
		//右矩陣	
		//key:行號 物品ID		value:列號_值,列號_值,列號_值,列號_值,列號_值...    使用者ID_分值
		String line = null;
		while((line=br.readLine())!=null){
			cacheList.add(line);
		}
		
		fr.close();
		br.close();
	}


	/**
	 * key: 行號	物品ID
	 * value:行	列_值,列_值,列_值,列_值	使用者ID_分值
	 * */
    @Override  
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
       
       String[] rowAndLine_matrix1 = value.toString().split("\t");
       
       //矩陣行號
       String row_matrix1 = rowAndLine_matrix1[0];
       //列_值
       String[] cloumn_value_array_matrix1 = rowAndLine_matrix1[1].split(",");
       
       for(String line:cacheList){
    	   
    	   String[] rowAndLine_matrix2 = line.toString().split("\t");
    	   //右側矩陣line
    	   //格式: 列 tab 行_值,行_值,行_值,行_值
    	   String cloumn_matrix2 = rowAndLine_matrix2[0];
    	   String[] row_value_array_matrix2 = rowAndLine_matrix2[1].split(",");
    	   
    	   
	       //矩陣兩位相乘得到的結果	
		   double result = 0;
    	   
    	   
    	   //遍歷左側矩陣一行的每一列
    	  for(String cloumn_value_matrix1:cloumn_value_array_matrix1){
    		  String cloumn_matrix1 = cloumn_value_matrix1.split("_")[0];
    		  String value_matrix1 = cloumn_value_matrix1.split("_")[1];
    		  
    		  //遍歷右側矩陣一行的每一列
    		  for(String cloumn_value_matrix2:row_value_array_matrix2){
    			  if(cloumn_value_matrix2.startsWith(cloumn_matrix1+"_")){
    				  String value_matrix2 = cloumn_value_matrix2.split("_")[1];
    				  //將兩列的值相乘並累加
    				  result+= Double.valueOf(value_matrix1)*Double.valueOf(value_matrix2);
    				  
    			  }
    		  }
    	  }
    	  
    	  if(result==0){
    		  continue;
    	  }
    	  //result就是結果矩陣中的某個元素,座標	行:row_matrix1 	列:row_matrix2(右側矩陣已經被轉置)
    	  outKey.set(row_matrix1);
    	  outValue.set(cloumn_matrix2+"_"+df.format(result));
    	  //輸出格式為	key:行	value:列_值
    	  context.write(outKey, outValue);
       }
    } 
}

reducer2

package step2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author liyijie
 * @date 2018年5月13日下午11:43:59
 * @email [email protected]
 * @remark
 * @version 
 * 
 * item user (評分矩陣)    X    item profile(已轉置)
 */
public class Reducer2 extends Reducer<Text, Text, Text, Text>{
	private Text outKey = new Text();
	private Text outValue = new Text();
	
	 //	key:行 物品ID	value:列_值	使用者ID_分值
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		StringBuilder sb = new StringBuilder();

		for(Text text:values){  
			sb.append(text+",");
        }
		
		String line = null;
		if(sb.toString().endsWith(",")){
			line = sb.substring(0, sb.length()-1);
		}
	

		outKey.set(key);
		outValue.set(line);

		context.write(outKey,outValue);  
	}
	
}

mr2

package step2;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @author liyijie
 * @date 2018年5月13日下午11:44:07
 * @email [email protected]
 * @remark
 * @version 
 * 
 * item user (評分矩陣)    X    item profile(已轉置)
 */
public class MR2 {
	private static String inputPath = "/content/step2_input";
	private static String outputPath = "/content/step2_output";
	//將step1中輸出的轉置矩陣作為全域性快取
	private static String cache="/content/step1_output/part-r-00000";
	
	private static String hdfs = "hdfs://node1:9000";
	
	public int run(){
		try {
		Configuration conf=new Configuration();  
		conf.set("fs.defaultFS", hdfs);		
		Job	job = Job.getInstance(conf,"step2");
		//如果未開啟,使用 FileSystem.enableSymlinks()方法來開啟符號連線。
		FileSystem.enableSymlinks();
		//要使用符號連線,需要檢查是否啟用了符號連線
		 boolean areSymlinksEnabled = FileSystem.areSymlinksEnabled();
		 System.out.println(areSymlinksEnabled);
		//新增分散式快取檔案
		job.addCacheArchive(new URI(cache+"#itemUserScore1"));
		
	
		//配置任務map和reduce類  
		job.setJarByClass(MR2.class);  
		job.setJar("F:\\eclipseworkspace\\content\\content.jar");  
	      job.setMapperClass(Mapper2.class);  
	      job.setReducerClass(Reducer2.class);  

	      job.setMapOutputKeyClass(Text.class);  
	      job.setMapOutputValueClass(Text.class);  

	      job.setOutputKeyClass(Text.class);  
	      job.setOutputValueClass(Text.class);  

	      FileSystem fs = FileSystem.get(conf);
	      Path inpath = new Path(inputPath);
	      if(fs.exists(inpath)){
	          FileInputFormat.addInputPath(job,inpath);  
	      }else{
	    	  System.out.println(inpath);
	    	  System.out.println("不存在");
	      }
	      
	      Path outpath = new Path(outputPath);
	      fs.delete(outpath,true);
	      FileOutputFormat.setOutputPath(job, outpath); 
	      
			return job.waitForCompletion(true)?1:-1;
		} catch (ClassNotFoundException | InterruptedException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (URISyntaxException e) {
			e.printStackTrace();
		}
		return -1;
	}
	
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException  { 
		int result = -1;
		result = new MR2().run();
		if(result==1){
			System.out.println("step2執行成功");
		}else if(result==-1){
			System.out.println("step2執行失敗");
		}
	  }
}

結果



3.cos<步驟1輸入,步驟2輸出>

輸入:步驟1輸入        物品特徵建模

快取:步驟2輸出

輸出:使用者ID(行)——物品ID(列)——相似度

程式碼:

mapper3

package step3;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author liyijie
 * @date 2018年5月13日下午11:43:51
 * @email [email protected]
 * @remark
 * @version 
 * 
 * 
 * cos<步驟1輸入,步驟2輸出>
 */
public class Mapper3 extends Mapper<LongWritable, Text, Text, Text> {
	private Text outKey = new Text();
	private Text outValue = new Text();
	private List<String> cacheList = new ArrayList<String>();
	//			右矩陣列值    下標右行       右值
	//private Map<String,String[]> cacheMap = new HashMap<>();
	
	private DecimalFormat df = new DecimalFormat("0.00");
	
	/**在map執行之前會執行這個方法,只會執行一次
	 * 
	 * 通過輸入流將全域性快取中的矩陣讀入一個java容器中
	 */
	@Override
	protected void setup(Context context)throws IOException, InterruptedException {
		super.setup(context);
		FileReader fr = new FileReader("itemUserScore2");
		BufferedReader br  = new BufferedReader(fr);
		
		//右矩陣	
		//key:行號 物品ID		value:列號_值,列號_值,列號_值,列號_值,列號_值...    使用者ID_分值
		String line = null;
		while((line=br.readLine())!=null){
			cacheList.add(line);
		}
		
		fr.close();
		br.close();
	}


	/**
	 * key: 行號	物品ID
	 * value:行	列_值,列_值,列_值,列_值	使用者ID_分值
	 * */
    @Override  
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
       
       String[] rowAndLine_matrix1 = value.toString().split("\t");
       
       //矩陣行號
       String row_matrix1 = rowAndLine_matrix1[0];
       //列_值
       String[] cloumn_value_array_matrix1 = rowAndLine_matrix1[1].split(",");
       
       //計算左側矩陣行的空間距離
       double denominator1 = 0;
       for(String column_value:cloumn_value_array_matrix1){
    	   String score = column_value.split("_")[1];
    	   denominator1 += Double.valueOf(score)*Double.valueOf(score);
       }
       denominator1 = Math.sqrt(denominator1);

       for(String line:cacheList){
    	   
    	   String[] rowAndLine_matrix2 = line.toString().split("\t");
    	   //右側矩陣line
    	   //格式: 列 tab 行_值,行_值,行_值,行_值
    	   String cloumn_matrix2 = rowAndLine_matrix2[0];
    	   String[] row_value_array_matrix2 = rowAndLine_matrix2[1].split(",");
    	   
    	 //計算右側矩陣行的空間距離
	       double denominator2 = 0;
	       for(String column_value:row_value_array_matrix2){
	    	   String score = column_value.split("_")[1];
	    	   denominator2 += Double.valueOf(score)*Double.valueOf(score);
	       }
	       denominator2 = Math.sqrt(denominator2);
    	   
	       //矩陣兩位相乘得到的結果	分子
		   double numerator = 0;
    	   
    	   
    	   //遍歷左側矩陣一行的每一列
    	   
    	  for(String cloumn_value_matrix1:cloumn_value_array_matrix1){
    		  String cloumn_matrix1 = cloumn_value_matrix1.split("_")[0];
    		  String value_matrix1 = cloumn_value_matrix1.split("_")[1];
    		  
    		  //遍歷右側矩陣一行的每一列
    		  for(String cloumn_value_matrix2:row_value_array_matrix2){
    			  if(cloumn_value_matrix2.startsWith(cloumn_matrix1+"_")){
    				  String value_matrix2 = cloumn_value_matrix2.split("_")[1];
    				  //將兩列的值相乘並累加
    				  numerator+= Double.valueOf(value_matrix1)*Double.valueOf(value_matrix2);
    				  
    			  }
    		  }
    	  }
    	  
		   double cos = numerator/(denominator1*denominator2);
		   if(cos == 0){
			   continue;
		   }
    	  
    	  //cos就是結果矩陣中的某個元素,座標	行:row_matrix1 	列:row_matrix2(右側矩陣已經被轉置)
    	  outKey.set(cloumn_matrix2);
    	  outValue.set(row_matrix1+"_"+df.format(cos));
    	  //輸出格式為	key:行	value:列_值
    	  context.write(outKey, outValue);
       }
    } 
}

reducer3

package step3;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @author liyijie
 * @date 2018年5月13日下午11:43:59
 * @email [email protected]
 * @remark
 * @version 
 * 
 * cos<步驟1輸入,步驟2輸出>
 */
public class Reducer3 extends Reducer<Text, Text, Text, Text>{
	private Text outKey = new Text();
	private Text outValue = new Text();
	
	 //	key:行 物品ID	value:列_值	使用者ID_分值
	@Override
	protected void reduce(Text key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		StringBuilder sb = new StringBuilder();

		for(Text text:values){  
			sb.append(text+",");
        }
		
		String line = null;
		if(sb.toString().endsWith(",")){
			line = sb.substring(0, sb.length()-1);
		}
	

		outKey.set(key);
		outValue.set(line);

		context.write(outKey,outValue);  
	}
	
}

mr3

package step3;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @author liyijie
 * @date 2018年5月13日下午11:44:07
 * @email [email protected]
 * @remark
 * @version 
 * 
 * cos<步驟1輸入,步驟2輸出>
 */
public class MR3 {
	private static String inputPath = "/content/step1_input";
	private static String outputPath = "/content/step3_output";
	//將step1中輸出的轉置矩陣作為全域性快取
	private static String cache="/content/step2_output/part-r-00000";
	
	private static String hdfs = "hdfs://node1:9000";
	
	public int run(){
		try {
		Configuration conf=new Configuration();  
		conf.set("fs.defaultFS", hdfs);		
		Job	job = Job.getInstance(conf,"step3");
		//如果未開啟,使用 FileSystem.enableSymlinks()方法來開啟符號連線。
		FileSystem.enableSymlinks();
		//要使用符號連線,需要檢查是否啟用了符號連線
		 boolean areSymlinksEnabled = FileSystem.areSymlinksEnabled();
		 System.out.println(areSymlinksEnabled);
		//新增分散式快取檔案
		job.addCacheArchive(new URI(cache+"#itemUserScore2"));
		
	
		//配置任務map和reduce類  
		job.setJarByClass(MR3.class);  
		job.setJar("F:\\eclipseworkspace\\content\\content.jar");  
	      job.setMapperClass(Mapper3.class);  
	      job.setReducerClass(Reducer3.class);  

	      job.setMapOutputKeyClass(Text.class);  
	      job.setMapOutputValueClass(Text.class);  

	      job.setOutputKeyClass(Text.class);  
	      job.setOutputValueClass(Text.class);  

	      FileSystem fs = FileSystem.get(conf);
	      Path inpath = new Path(inputPath);
	      if(fs.exists(inpath)){
	          FileInputFormat.addInputPath(job,inpath);  
	      }else{
	    	  System.out.println(inpath);
	    	  System.out.println("不存在");
	      }
	      
	      Path outpath = new Path(outputPath);
	      fs.delete(outpath,true);
	      FileOutputFormat.setOutputPath(job, outpath); 
	      
			return job.waitForCompletion(true)?1:-1;
		} catch (ClassNotFoundException | InterruptedException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (URISyntaxException e) {
			e.printStackTrace();
		}
		return -1;
	}
	
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException  { 
		int result = -1;
		result = new MR3().run();
		if(result==1){
			System.out.println("step3執行成功");
		}else if(result==-1){
			System.out.println("step3執行失敗");
		}
	  }
}

結果


.