1. 程式人生 > >案例5-挖掘微博廣告高權重詞條

案例5-挖掘微博廣告高權重詞條

微博內容(如圖):ID  content

公式:

TF:詞條在某個微博中出現的詞頻(出現次數).

N:微博總數

DF:詞條在多少個微博中出現過

案例用到四個reduceTask,下標計數從0開始,三個統計詞頻TF,一個統計微博總數N。

 

FirstMapper.java

對輸入檔案的每行記錄微博內容進行分詞,統計微博詞頻TF及微博總數,每個詞條輸出詞頻數1;每個微博輸出一個count=1

package com.jeff.mr.tf;

import java.io.IOException;
import java.io.StringReader;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

/**
 *  TF:詞條在某個微博中出現的詞頻(出現次數).
	N:微博總數
	DF:詞條在多少個微博中出現過
	--------------------------------
 *   第一個MR,計算TF和計算N(微博總數)
 * @author root
 *
 */
public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		//value是微博檔案每一行以製表符\t隔開
		String[]  v =value.toString().trim().split("\t");
		if(v.length>=2){
			String id=v[0].trim();
			String content =v[1].trim();
			//對微博內容進行中文分詞處理
			StringReader sr =new StringReader(content);
			IKSegmenter ikSegmenter =new IKSegmenter(sr, true);
			Lexeme word=null;
			while( (word=ikSegmenter.next()) !=null ){
				String w= word.getLexemeText();//w就是微博內容的每一個詞彙
				//輸出格式為:key為:詞彙_微博ID    value是1,出現次數
				context.write(new Text(w+"_"+id), new IntWritable(1));
			}
			//每執行一次這個方法,就表示統計了一條微博數,將來在第四個reduce分割槽執行,參見FirstPartition,自定義分割槽規則
			context.write(new Text("count"), new IntWritable(1));
		}else{
			System.out.println(value.toString()+"-------------");
		}
	}
	
	
	
}

FirstPartition.java

自定義分割槽,使得key為count的分割槽到最後一個分割槽(編號3),其他的分別分割槽編號為0/1/2三個reduceTask

package com.jeff.mr.tf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * 第一個MR自定義分割槽,把key為count的,即用來計算微博總數的資料分割槽到第四個reduce分割槽,
 * 前三個reduce分割槽用來計算TF,就是單個微博中詞彙出現次數
 * @author root
 *
 */
public class FirstPartition extends HashPartitioner<Text, IntWritable>{

	
	public int getPartition(Text key, IntWritable value, int reduceCount) {
		if(key.equals(new Text("count")))
			return 3;
		else
			return super.getPartition(key, value, reduceCount-1);
	}

}

FirstReduce.java

計算單個詞條的詞頻TF,輸入資料為FirstMapper.java的輸出,key為詞條_id.或者count,值為詞頻個數或者count個數,當key為count時不參與計算只輸出檢視。

輸出格式:詞條_ID 詞頻

package com.jeff.mr.tf;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * c1_001,2
 * c2_001,1
 * count,10000
 * @author root
 *
 */
public class FirstReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
	
	protected void reduce(Text arg0, Iterable<IntWritable> arg1,
			Context arg2)
			throws IOException, InterruptedException {
		
		int sum =0;
		for( IntWritable i :arg1 ){
			sum= sum+i.get();
		}
		if(arg0.equals(new Text("count"))){
			System.out.println(arg0.toString() +"___________"+sum);
		}
		arg2.write(arg0, new IntWritable(sum));
	}

}

在dfs-location上新建路徑:/usr/input/tf-idf並上傳檔案微博內容:

 

接下來就可以執行FirstJob.java來執行第一個MR:

package com.jeff.mr.tf;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class FirstJob {

	public static void main(String[] args) {
		Configuration config =new Configuration();
		config.set("fs.defaultFS", "hdfs://node4:8020");
		config.set("yarn.resourcemanager.hostname", "node4");
		try {
			FileSystem fs =FileSystem.get(config);
//			JobConf job =new JobConf(config);
			Job job =Job.getInstance(config);
			job.setJarByClass(FirstJob.class);
			job.setJobName("weibo1");
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
//			job.setMapperClass();
			job.setNumReduceTasks(4);
			job.setPartitionerClass(FirstPartition.class);
			job.setMapperClass(FirstMapper.class);
			job.setCombinerClass(FirstReduce.class);
			job.setReducerClass(FirstReduce.class);
			
			
			FileInputFormat.addInputPath(job, new Path("/usr/input/tf-idf"));
			
			Path path =new Path("/usr/output/weibo1");
			if(fs.exists(path)){
				fs.delete(path, true);
			}
			FileOutputFormat.setOutputPath(job,path);
			
			boolean f= job.waitForCompletion(true);
			if(f){
				
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

執行成功:

重新整理DFS-Location,看到在/usr/output/weibo1的目錄下生成了四個分割槽檔案,每一個分割槽檔案都是四個reduceTask的輸出檔案

其中第四個分割槽檔案就是用來計算Count微博總數N的,其他三個都是微博中詞彙即出現次數。

比如:0.03元_3824213951437432       1

這個就表示0.03元這個詞在ID為3824213951437432微博中出現了1次

 

 

TwoMapper.java

統計DF,詞條在多少個微博中出現過

輸出格式:詞條 出現的微博個數

package com.jeff.mr.tf;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
//統計df:詞在多少個微博中出現過。
public class TwoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {

		/**
		 * 1  獲取當前	mapper Task的資料片段(split)
		 * 2 當前mapper Task的資料來源於第一個MR輸出的四個檔案
		 */
		FileSplit fs = (FileSplit) context.getInputSplit();
		//可以從fs獲取第一個MR的檔名,除了最後一個檔案是用來計算微博總數的,其他都是TF
		if (!fs.getPath().getName().contains("part-r-00003")) {
			String[] v = value.toString().trim().split("\t");
			if (v.length >= 2) {
				//獲取{0.03元_3824213951437432	1},這種第一個MR的輸出資料,即每一行
				String[] ss = v[0].split("_");
				if (ss.length >= 2) {
					String w = ss[0];//得到每一個詞彙,輸出次數1,此處所有微博的詞彙都會輸出1次
					context.write(new Text(w), new IntWritable(1));
				}
			} else {
				System.out.println(value.toString() + "-------------");
			}
		}

	}
}

TwoReduce.java

package com.jeff.mr.tf;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 計算詞彙在所有微博中出現的次數
 * @author jeffSheng
 * 2018年10月17日
 */
public class TwoReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
	/**
	 * 輸入資料:
	 *    key:0.03元	  value:1(次)
	 * Iterable<IntWritable> arg1,即key相等的一組資料
	 */
	protected void reduce(Text key, Iterable<IntWritable> arg1,Context context)
												throws IOException, InterruptedException {
		int sum =0;
		for( IntWritable i :arg1 ){
			sum= sum + i.get();
		}
		context.write(key, new IntWritable(sum));
	}

}

執行TwoJob.java第二個MR,計算每個詞彙在所有微博出現次數即DF

package com.jeff.mr.tf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class TwoJob {

	public static void main(String[] args) {
		Configuration config =new Configuration();
		config.set("fs.defaultFS", "hdfs://node4:8020");
		config.set("yarn.resourcemanager.hostname", "node4");
		try {
//			JobConf job =new JobConf(config);
			Job job =Job.getInstance(config);
			job.setJarByClass(TwoJob.class);
			job.setJobName("weibo2");
			//設定map任務的輸出key型別、value型別
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
//			job.setMapperClass();
			job.setMapperClass(TwoMapper.class);
			job.setCombinerClass(TwoReduce.class);
			job.setReducerClass(TwoReduce.class);
			
			//mr執行時的輸入資料從hdfs的哪個目錄中獲取
			FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
			FileOutputFormat.setOutputPath(job, new Path("/usr/output/weibo2"));
			
			boolean f= job.waitForCompletion(true);
			if(f){
				System.out.println("執行job成功");
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

重新整理DFS-Location看到/usr/output/weibo2下的DF輸出檔案:

比如0.03元 在所有微博中出現了1次

 

根據公式計算微博詞彙權重:

LastMapper.java

輸入資料為所有詞的TF,所有詞的DF,微博總數N,根據這三個變數計算詞條最終權重。

輸出格式:微博ID 詞條:權重

package com.jeff.mr.tf;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * 最後計算
 * @author root
 *
 */
public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {
	//存放微博總數
	public static Map<String, Integer> cmap = null;
	//存放df
	public static Map<String, Integer> df = null;

	// 在map方法執行之前,即mapperTask初始化的時候執行
	/**
	 * mapReduce的執行過程回顧:
	 * 比如一個檔案被分割成1024個碎片段,則一定有與之對應的1024個mapTask去執行每個碎片段。
	 * mapTask在有碎片段的節點上執行,即 dataNode上有碎片段,在dataNode上執行。所以每個DataNode上就
	 * 有一個NodeManager來執行mapReduce程式,NodeManager裡面有一個與之對應的ApplicationMatser
	 * 負責從resourceManager中請求資源即Contianer中文是容器,其實是資源。申請資源後,ApplicationMatser
	 * 則可以通過一個Executor物件執行mapperTask,並監控和記錄執行狀態、進度等資料彙報給NodeManager,NodeManager
	 * 再彙報給resourceManager。
	 * Executor物件執行mapperTask的時候先初始化對應的MapTask,其實就是我們的LastMapper.
	 * java自定義的xxxMapper,只要初始化成功就呼叫LastMapper的setUp方法,這個時候map方法還沒執行,
	 * map方法是迴圈呼叫的,即每一行都呼叫一次,但是setUp方法只會呼叫一次。不過1024個碎片段對應1024個mapTask,
	 * 就會執行setup方法1024次,還是狠多次,所以我們可以考慮從共享記憶體中取得一部分資料,比如微博總數N和DF記錄。
	 * 我們使用cmap和df兩個Map來存放,判斷是否為空,即保證存過就不用再存了。
	 * 
	 * 
	 */
	protected void setup(Context context) throws IOException,
			InterruptedException {
		System.out.println("******************");
		if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
			URI[] ss = context.getCacheFiles();
			if (ss != null) {
				for (int i = 0; i < ss.length; i++) {
					URI uri = ss[i];
					if (uri.getPath().endsWith("part-r-00003")) {//微博總數
						Path path =new Path(uri.getPath());
//						FileSystem fs =FileSystem.get(context.getConfiguration());
//						fs.open(path);
						BufferedReader br = new BufferedReader(new FileReader(path.getName()));
						String line = br.readLine();
						if (line.startsWith("count")) {
							String[] ls = line.split("\t");
							cmap = new HashMap<String, Integer>();
							cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
						}
						br.close();
					} else if (uri.getPath().endsWith("part-r-00000")) {//詞條的DF
						df = new HashMap<String, Integer>();
						Path path =new Path(uri.getPath());
						BufferedReader br = new BufferedReader(new FileReader(path.getName()));
						String line;
						while ((line = br.readLine()) != null) {
							String[] ls = line.split("\t");
							df.put(ls[0], Integer.parseInt(ls[1].trim()));
						}
						br.close();
					}
				}
			}
		}
	}

	
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		FileSplit fs = (FileSplit) context.getInputSplit();
//		System.out.println("--------------------");
		if (!fs.getPath().getName().contains("part-r-00003")) {
			String[] v = value.toString().trim().split("\t");
			if (v.length >= 2) {
				int tf =Integer.parseInt(v[1].trim());//tf值
				String[] ss = v[0].split("_");
				if (ss.length >= 2) {
					String w = ss[0];
					String id=ss[1];
					//根據公式計算權重,輸出:微博Id  詞彙1:權重1 詞彙2:權重2  
					double s=tf * Math.log(cmap.get("count")/df.get(w));
					NumberFormat nf =NumberFormat.getInstance();
					nf.setMaximumFractionDigits(5);
					context.write(new Text(id), new Text(w+":"+nf.format(s)));
				}
			} else {
				System.out.println(value.toString() + "-------------");
			}
		}
	}
}

LastReduce.java

計算所有詞條的最終權重,相同微博在後邊顯示其所有的詞條:權重,並使用製表符\t隔開。

輸出格式:微博ID  詞條:權重  詞條:權重

package com.jeff.mr.tf;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class LastReduce extends Reducer<Text, Text, Text, Text>{
	
	protected void reduce(Text key, Iterable<Text> arg1,
			Context context)
			throws IOException, InterruptedException {
		
		StringBuffer sb =new StringBuffer();
		
		for( Text i :arg1 ){
			sb.append(i.toString()+"\t");
		}
		
		context.write(key, new Text(sb.toString()));
	}

}

 

執行LastJob計算最終輸出結果:

我們這裡採用的是在本地提交到Linux環境下進行執行測試的

package com.jeff.mr.tf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class LastJob {

	public static void main(String[] args) {
		Configuration config =new Configuration();
//		config.set("fs.defaultFS", "hdfs://node1:8020");
//		config.set("yarn.resourcemanager.hostname", "node1");
		config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");
		try {
			FileSystem fs =FileSystem.get(config);
//			JobConf job =new JobConf(config);
			Job job =Job.getInstance(config);
			job.setJarByClass(LastJob.class);
			job.setJobName("weibo3");
			
//			DistributedCache.addCacheFile(uri, conf);
			//2.5
			/**
			 * 之所以以下兩行可以載入到記憶體因為微博總數的檔案和df檔案其實都不大,所有可以在任務啟動之初先載入到記憶體
			 */
			//把微博總數N載入到記憶體
			job.addCacheFile(new Path("/usr/output/weibo1/part-r-00003").toUri());
			//把df載入到記憶體
			job.addCacheFile(new Path("/usr/output/weibo2/part-r-00000").toUri());
			
			//設定map任務的輸出key型別、value型別
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
//			job.setMapperClass();
			job.setMapperClass(LastMapper.class);
			job.setReducerClass(LastReduce.class);
			
			//mr執行時的輸入資料從hdfs的哪個目錄中獲取
			FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
			Path outpath =new Path("/usr/output/weibo3");
			if(fs.exists(outpath)){
				fs.delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job,outpath );
			
			boolean f= job.waitForCompletion(true);
			if(f){
				System.out.println("執行job成功");
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

需要做的是將工程打包放在桌面weibo3.jar,然後在LastJob中新增:

config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");

配置檔案放在src下:

開始執行:

開啟:http://node1:18088/cluster

觀察剛開始執行

觀察執行完成;

重新整理DFS-Location

比如:3823890239358658     繼續:4.89035  支援:3.04452 

表示在微博ID為3823890239358658微博中,[繼續]的全部微博中權重為4.89035,[支援]的全部微博中權重為3.04452

有了這些結果,我們就可以做出一些商業或者其他領域的重要選擇!

 

當然也可以在本地進行測試,就是在LastMapper的setUp中註釋掉的程式碼:

FileSystem fs =FileSystem.get(context.getConfiguration());

FSDataInputStream fsdInputStream = fs.open(path);

將輸入流封裝進BufferedReader即可。