1. 程式人生 > >元資料與資料治理|MapReduce統計詞語出現次數(第五篇)

元資料與資料治理|MapReduce統計詞語出現次數(第五篇)

晨曦同學(Dota界號稱利神)前段時間分享了這樣一個問題:如何在一個很大的檔案中(該檔案包含了中英文)找出出現頻率比較高的幾個詞呢?我們來分析一下。找出現頻率比較高的詞語,首先要有一個支援中文的分詞器(IK,庖丁解牛等等),這個問題不大;分詞之後呢就要統計詞語出現次數,類似於MapReduce程式中WordCount,這可是學習MapReduce的hello world程式呀,當然很容易搞定;最後還要來個排序,統計完了我們期望出現次數高的詞語出現在前面,MapReduce預設就支援排序,也沒問題。

解決這個問題需要兩個Job,一個是統計Job,一個是排序Job。

統計Job的Mapper需要做的事情就是分詞,這裡我們選用IKanalyzer分詞器,可能IK在官網上不好下載,我給大家準備好了,

點此下載。分詞之後,將每個單詞個數置為1(跟WordCount程式一樣)。

public static class AnalyzerMapper extends Mapper<Object, Text, Text, IntWritable> {
	private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
	@Override
	protected void map(Object key, Text value,
			Mapper<Object, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		breakupSentence(value.toString(), context);
	}
    
	/**
	 * 用分詞器將一段話拆分成多個詞。
	 * 分出一個詞就將數量置為1。
	 * 
	 * @param sentence
	 * @param context
	 * @throws IOException 
	 * @throws InterruptedException 
	 */
	private void breakupSentence(String sentence, Mapper<Object, Text, Text,
			IntWritable>.Context context) throws IOException, InterruptedException {
		Analyzer analyzer = new IKAnalyzer(true);
		TokenStream tokenStream = analyzer.tokenStream("content",
				new StringReader(sentence));
		tokenStream.addAttribute(CharTermAttribute.class);
		while (tokenStream.incrementToken()) {
			CharTermAttribute charTermAttribute = tokenStream
					.getAttribute(CharTermAttribute.class);
			word.set(charTermAttribute.toString());

			context.write(word, one);
		}
	}
    
}

 

別忘了給IK設定停止詞字典,過濾掉那些"了",”呢“,”啊“,”的“,"is", "and", "a" 之類的語氣詞、助詞、連詞、量詞等。

IKAnalyzer.cfg.xml

<properties>  
	<comment>IK Analyzer 擴充套件配置</comment>
	<!--使用者可以在這裡配置自己的擴充套件字典 
	<entry key="ext_dict">ext.dic;</entry> 
	-->
	<!--使用者可以在這裡配置自己的擴充套件停止詞字典-->
	<entry key="ext_stopwords">stopword.dic;chinese_stopword.dic</entry> 
	
</properties>

 

chinese_stopword.dic

的
呢
吧
和
......

 

統計Job的Reducer就是統計各個詞語的出現次數,跟WordCount程式中的完全一致,不再煩述。我們可以將該Reducer設定為Job的CombinerClass,這樣每次Mapper Task向Reducer Task傳遞資料時候,先執行Combiner,將結果先做個統計,減少了Mapper向Reducer的資料傳輸。

public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

	private IntWritable result = new IntWritable();
	
	 @Override
	 protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		
		  int sum = 0;
	      for (IntWritable val : values) {
	        sum += val.get();
	      }
	      result.set(sum);
	      context.write(key, result);
    }
	
}

 

 

接下來再看排序Job,MapReduce任務是通過key來排序的,我們需要將詞語出現的次數排序,所以需要先將統計Job的結果Key-Value互換,排序完成後,再換回來即可。

排序Job的Mapper將統計Job的結果Key-Value互換,程式碼如下:

public static class SortMapper extends Mapper<Object, Text, IntWritable, Text> {
	private final static IntWritable wordCount = new IntWritable(1);
	private Text word = new Text();
	@Override
	protected void map(Object key, Text value,
			Mapper<Object, Text, IntWritable, Text>.Context context)
			throws IOException, InterruptedException {
		StringTokenizer tokenizer = new StringTokenizer(value.toString());
	    while (tokenizer.hasMoreTokens()) {
	    	String a = tokenizer.nextToken().trim();
	        word.set(a);
	        String b = tokenizer.nextToken().trim();
	        wordCount.set(Integer.valueOf(b));
	        context.write(wordCount, word);
	    }
	}
	
}

 

排序Job的Reducer任務就是再將Key-Value倒置過來。

public static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable> {

	private Text result = new Text();
	@Override
	protected void reduce(IntWritable key, Iterable<Text> values,
			Reducer<IntWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException {
		for (Text val : values) {
	        result.set(val.toString());
	        context.write(result, key);
	    }
	}
	
}

 

Reducer預設排序是從小到大(數字),而我們期望出現次數多的詞語排在前面,所以需要重寫排序類WritableComparator。

public class DescWritableComparator extends WritableComparator {

	protected DescWritableComparator() {
		super(IntWritable.class, true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		return -super.compare(a, b);
	}
	
}

 

如果有多個Reducer任務,Reducer的預設排序只是對傳送到該Reducer下的資料區域性排序。如果想達到全域性排序,需要我們手動去寫partitioner。Partitioner的作用是根據不同的key,制定相應的規則分發到不同的Reducer中。

public static class SortPartitioner<K, V> extends Partitioner<K, V> {

	@Override
	public int getPartition(K key, V value, int numReduceTasks) {
		int maxValue = 50;
	    int keySection = 0;
	    // 只有傳過來的key值大於maxValue 並且numReduceTasks比如大於1個才需要分割槽,否則直接返回0
	    if (numReduceTasks > 1 && key.hashCode() < maxValue) {
	        int sectionValue = maxValue / (numReduceTasks - 1);
	        int count = 0;
	        while ((key.hashCode() - sectionValue * count) > sectionValue) {
	            count++;
	        }
	        keySection = numReduceTasks - 1 - count;
	    }
	    return keySection;
	}
	
}

 

 

最後就是連結MapReduce Job流,這裡有兩個Job,需要先執行統計Job,再執行排序Job。我們需要將統計Job的輸出作為排序Job的輸入。(友情提示:別忘了給統計Job設定Combiner哦,也別忘了給排序Job設定Comparator和Partitioner哦。)

Job job1 = new Job(configuration, "key word analyzer");
job1.setJarByClass(JobDefiner.class);
job1.setMapperClass(AnalyzerMapper.class);
job1.setCombinerClass(CountReducer.class);
job1.setReducerClass(CountReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job1, new Path(otherArgs[0]));
Path outPath1 = new Path(otherArgs[1]);
FileOutputFormat.setOutputPath(job1, outPath1);
job1.waitForCompletion(true);

Job job2 = new Job(configuration, "result sort");
job2.setJarByClass(JobDefiner.class);
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);

job2.setMapperClass(SortKeyWordHandler.SortMapper.class);
job2.setReducerClass(SortKeyWordHandler.SortReducer.class);
// key按照降序排列
job2.setSortComparatorClass(DescWritableComparator.class);
job2.setPartitionerClass(SortKeyWordHandler.SortPartitioner.class);
FileInputFormat.addInputPath(job2, outPath1);
FileOutputFormat.setOutputPath(job2, new Path(otherArgs[2]));
job2.waitForCompletion(true);

 

 

大功告成?且慢!!在我的部落格Eclipse遠端除錯Hadoop叢集中,我們只講瞭如何配置本地Eclipse如何遠端除錯Hadoop叢集,在這裡我們就演示一下如何去跑。

我們先上傳兩篇關於習大大的報道到hdfs上

bin/hadoop dfs -mkdir input
bin/hadoop dfs -put mupeng/files/test_chinese* input

 

刷一下Eclipse裡面DFS Location就能看到

找到定義Job的main方法類,右鍵Run As=>Run Configurations ...

確認Project、Main class準確後,設定main方法的引數:統計Job的輸入路徑、統計Job的輸出路徑(同時也是排序Job的輸入路徑)、排序Job的輸出路徑。

hdfs://192.168.248.149:9000/user/mupeng/input
hdfs://192.168.248.149:9000/user/mupeng/output1
hdfs://192.168.248.149:9000/user/mupeng/output2

 

設定好後,點選Run,在第二個輸出路徑中,我們看到結果(我這隻有一個Reducer)

引用	20
強調	16
習近平	14
斐濟	13
我們	13
斐	12
中國	11
對	10
中	10
等	9
中方	9
為	9
方	9
......

 

 

最後提示大家:

本文相關原始碼下載地址(GitHub):點選檢視

相關部落格地址:Eclipse遠端除錯Hadoop叢集