1. 程式人生 > >Hadoop例項WordCount程式修改--詞頻降序

Hadoop例項WordCount程式修改--詞頻降序

修改wordcount例項,改為:
1、 對詞頻按降序排列
2、 輸出排序為前三,和後三的資料

首先是第一項:
對詞頻排序,主要針對的是最後輸出的部分。

**

分析程式內容:

**
WordCount.java

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory
; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache
.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static final Log LOG = LogFactory.getLog(FileInputFormat.class); //定義log變數 //map過程需要繼承org.apache
.hadoop.mapreduce包中的Mapper類,並重寫map方法。 //這裡繼承Mapper介面,設定map的輸入型別為<Object,Text>,輸出型別為<Text,IntWritable> public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); //one表示單詞只出現一次。為IntWritable型別。 private Text word = new Text(); //word用來儲存切下的單詞。為Text型別。 //這裡map方法的前面兩個引數代表輸入型別為<Object,Text>,對應<key,value>。 //後面的一個引數context應該是固定要這麼寫的,輸出<key,value>對到中間檔案 public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { //對輸入的行進行切詞。這裡輸入的Text為一行文字。 //若輸入檔案中有多行,會由map自動對其進行處理,切分成單行,每行的文字內容為value,key為這一行到檔案開頭的偏移量。 StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); //將切分後的單詞存到word中。 LOG.info("map過程 : " + word + "~~~~" + key + "@@@@" + one ); context.write(word, one); //將處理結果輸出到中間檔案,對每個單詞定詞頻為1,中間檔案中可能會存在重複行,即一個單詞出現不止一次。這裡暫不處理。 } } } //reduce過程需要繼承org.apache.hadoop.mapreduce包中的Reducer類,並重寫其reduce方法。 //這裡繼承Reducer介面,設定Reduce的輸入型別為<Text,IntWritable>,輸出型別為<Text,IntWritable> public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); //使用result記錄詞頻。為IntWritable型別。 //reduce方法,前面兩個引數是輸入型別<Text,Iterable<IntWritable>>,對應<key,value>。 //後面的一個引數context應該是固定要這麼寫的,用來輸出<key,value>對到中間檔案 public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); //對獲取的<key,value-list>計算value的和。這裡不太明白是本來輸入的就是value-list,還是經過處理,合成的?? LOG.info("val:get()~~~ : " + val.get() + "~~~~" + key ); } result.set(sum); //這裡的sum就是每個word的輸出詞頻 LOG.info("word!!!! : " + result + "~~~~" + key ); context.write(key, result); //輸出到中間檔案,key為單詞,value為詞頻。 } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //檢查執行命令 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } //配置作業名 Job job = new Job(conf, "word count"); //配置作業中使用的各種類。 job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //設定輸入輸出路徑 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //以上部分都是自己配置,也可以使用預設。 System.exit(job.waitForCompletion(true) ? 0 : 1); } }

每個輸入檔案都是按照這幾步進行處理,然後處理完一個輸入檔案後再處理下一個。

第一步:
Wordcount類中的一個成員函式:map函式

public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      //將輸入檔案的值轉為字串型別後,定義為一個可以分詞的物件。
StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        //將可分詞物件切分成單個的word。
        word.set(itr.nextToken());
        LOG.info("map過程 : " + word + "~~~~" + key + "@@@@" + one ); 
        每個word的value值定為1,將word作為key。
        context.write(word, one);
      }
}

第二步:
IntSumReducer類中一個成員函式:reduce函式

public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
        //針對每個key都會呼叫一次reduce函式,對一個word出現的次數進行計數,出現兩次的話會迴圈兩次。
        LOG.info("val:get()~~~ : " + val.get() + "~~~~" + key );
      }
      //這裡的sum就是每個word的輸出詞頻
      result.set(sum);
      //輸出,key為word,value為詞頻。
      LOG.info("word!!!! : " + result + "~~~~" + key ); 
      context.write(key, result);
}

一個檔案按照上述步驟完成後將結果寫入到臨時檔案中,接著處理第二個檔案。到三個檔案都已經處理完以後。

第三步:
前面三個輸入檔案產生的臨時資料集中進行處理,執行IntSumReducer類中的reduce函式。與上面第二步相同。執行完畢後輸入到另一個臨時檔案中。
此時,map過程和reduce過程都已經完成了。

中間臨時檔案以及最後輸出都是是按照首字母來排序的,a→z,這裡是在map端排序。然後在合併處理三個臨時輸出檔案的過程中,又進行了一次排序,這裡是在reduce端排序。

**

排序原理說明

**

map task和reduce task否會對資料按照key值進行排序。

針對map task,會將資料先存放到一個緩衝區,然後當緩衝區資料達到一定量時,對緩衝區資料進行排序,這裡採用的排序演算法是快速排序演算法,並以IFile的檔案形式寫入到磁碟,當磁碟上檔案數達到一定量時,會對這些檔案進行合併,並再次排序,這裡採用的排序演算法是基於堆實現的優先佇列。

針對reduce task,會從每個map task上遠端拷貝資料,如果資料大小超過一定量則放到磁碟上,沒有超過就放在記憶體中。如果磁碟上檔案數目超過一定量則合併為一個大檔案。如果記憶體中檔案大小或數目超過一定大小,則會經過一次合併後將資料寫入磁碟。當所有資料拷貝完畢後,再由reduce task統一對記憶體和磁碟上的所有資料進行一次合併。

問題1:這裡reduce過程中的排序是隻在最後合併所有資料時進行,還是也會像map過程中對每一次合併檔案都會進行??
問題2:reduce過程的排序演算法使用方式也與map相同?在磁碟上的合併使用基於堆的優先佇列,在記憶體中合併使用快速排序?那麼最後的合併是是用什麼排序演算法?

面臨一個很大的問題
巨大的問題。。。沒有搞清楚到底哪些函式需要自己編寫,哪些是本來就有的,還有背後執行的機理,也沒有太清楚。

解決:
《實戰hadoo——開啟通向雲端計算的捷徑》 劉鵬主編

看書!!我們自己程式設計寫mapreduce任務需要編寫哪些東西(就是哪些部分是可以重寫,應該重寫的),這個可以參考劉鵬的《實戰Hadoop》,裡面細緻的剖析了wordcount程式的執行過程,並說明了哪些函式是我們程式設計需要實現的。看懂了這些(最好要執行一些這些程式),你基本就可以寫一些mapreduce程式,處理一些簡單的任務了。

1、 map過程需要繼承org.apache.hadoop.mapreduce包中的Mapper類,並重寫map方法。
2、 reduce過程需要繼承org.apache.hadoop.mapreduce包中的Reducer類,並重寫其reduce方法。
問題1。Map方法和reduce方法後面接的引數是怎麼定義的,個數,型別?

部分類及引數介紹,其中有些需要重寫,有些可以借用預設類:

1、InputFormat類。作用是將輸入的資料分割成一個個split,並將split進一步拆分成《key,value》作為map函式的輸入。可以通過job.setInputFormatClass()方法進行設定,預設為使用TextInputFormatClass()方法進行設定(該類只處理文字檔案)。TextInputFormat將文字檔案的多行分割為solits,並通過LineRecorderReader將其中的每一行解析為《key,value》對,key值為對應行在檔案中的偏移量,value為行的內容。

2、Mapper類,實現map函式,根據輸入的《key,value》對生成中間結果。可以通過job.setMapperClass()方法來進行設定。預設使用Mapper類,Mapper將輸入的《key,value》對原封不動地作為中間結果輸出。

3、Combiner類。實現combine函式,合併中間結果中具有相同key值的鍵值對。可以通過job.setCombinerClass()方法進行設定,預設為null,即不合並中間結果。

4、Partitioner類。實現getPartition函式,用於在Shuffle過程中按照key值將中間資料分成R份,每份由一個Reducer負責。可以通過job.setPartitionerClass()方法進行設定,預設使用HashPartitioner類,HashPartitioner使用雜湊函式完成Shuffle過程。

5、Reducer類,實現reduce函式,將中間結果合併,得到最終結果。可以使用job.setReducerClass()方法來進行設定,預設使用Reducer類,Reducer將中間結果直接輸出作為最終結果。

6、OutputFormat類,負責輸出最終結果,可以使用job.setOutputFormatClass()方法進行設定,預設使用TextOutputFormat類,TextOutputFormat將最終結果寫成純文字檔案,每行一個《key,value》對,key和value之間用製表符分隔開。

7、除了上述幾個類以外,job.setOutputKeyClass()方法和job.setOutputValueClass()方法可以用來設定最終結果(即Reducer的輸出結果)的key和value的型別,預設情況下分別為LongWritable和Text。Job.setMapOutputKeyClass()方法和job.setMapOutputValueClass()方法可以用來設定中間結果(即Mapper的輸出結果)的key和value的型別,預設情況下與最終結果的型別保持一致。

程式CopyOfWordCount.java

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.Random;
import java.util.StringTokenizer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount2.IntWritableDecreasingComparator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class CopyOfWordCount {
//定義log變數
  public static final Log LOG =
        LogFactory.getLog(FileInputFormat.class);

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        LOG.info("map過程 : " + word + "~~~~" + key + "@@@@" + one ); 
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
        LOG.info("val:get()~~~ : " + val.get() + "~~~~" + key ); 
      }
      //這裡的sum就是每個word的輸出詞頻
      result.set(sum);
      LOG.info("word!!!! : " + result + "~~~~" + key ); 
      context.write(key, result);
    }
  }

  private static class IntWritableDecreasingComparator extends IntWritable.Comparator {  
      public int compare(WritableComparable a, WritableComparable b) {  
        return -super.compare(a, b);  
      }  

      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
          return -super.compare(b1, s1, l1, b2, s2, l2);  
      }  
  }  

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }

    Path tempDir = new Path("wordcount-temp-" + Integer.toString(  
            new Random().nextInt(Integer.MAX_VALUE))); //定義一個臨時目錄  

    Job job = new Job(conf, "word count");
    job.setJarByClass(CopyOfWordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

    FileOutputFormat.setOutputPath(job, tempDir);//先將詞頻統計任務的輸出結果寫到臨時目  
    //錄中, 下一個排序任務以臨時目錄為輸入目錄。  
    job.setOutputFormatClass(SequenceFileOutputFormat.class);  
    if(job.waitForCompletion(true))  
    {  
        Job sortJob = new Job(conf, "sort");  
        sortJob.setJarByClass(WordCount2.class);  

        FileInputFormat.addInputPath(sortJob, tempDir);  
        sortJob.setInputFormatClass(SequenceFileInputFormat.class);  

        /*InverseMapper由hadoop庫提供,作用是實現map()之後的資料對的key和value交換*/  
        sortJob.setMapperClass(InverseMapper.class);  
        /*將 Reducer 的個數限定為1, 最終輸出的結果檔案就是一個。*/  
        sortJob.setNumReduceTasks(1);   
        FileOutputFormat.setOutputPath(sortJob, new Path(otherArgs[1]));  

        sortJob.setOutputKeyClass(IntWritable.class);  
        sortJob.setOutputValueClass(Text.class);  
        /*Hadoop 預設對 IntWritable 按升序排序,而我們需要的是按降序排列。 
        * 因此我們實現了一個 IntWritableDecreasingComparator 類,  
        * 並指定使用這個自定義的 Comparator 類對輸出結果中的 key (詞頻)進行排序*/  
        sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);  

        System.exit(sortJob.waitForCompletion(true) ? 0 : 1); 
    }  

    FileSystem.get(conf).deleteOnExit(tempDir);  

  }
}

輸出結果:

這裡寫圖片描述

ps:快速排序演算法
分析分析分析!

初始值:

12   3
4    4
1    6 
4    6
4    2
6    6

處理過程:

這裡寫圖片描述

最終值:

12   3
6    6
4    2
4    4
4    6
1    6

相關推薦

Hadoop例項WordCount程式修改--詞頻

修改wordcount例項,改為: 1、 對詞頻按降序排列 2、 輸出排序為前三,和後三的資料 首先是第一項: 對詞頻排序,主要針對的是最後輸出的部分。 ** 分析程式內容: ** WordCount.java package org.

命令列執行hadoop例項wordcount程式

需要說明的有以下幾點。 1.如果wordcount程式不含層次,即沒有package 那麼使用如下命令: hadoop jar wordcount.jar WordCount2 /home/hadoop/input/20418.txt /home/hadoop/outp

spark小應用一:wordcount,按詞頻(SCALA)

val rdd = sc.textFile("hdfs://mycluster/user/bpf/sparkApp/wordcount/input") val wordcount = rdd.flatM

WordCount基礎上改進,實現以詞頻為鍵值,並按詞頻排列

思路: 1、任務一:與WordCount.v1.0相同,但將處理結果以二進位制形式儲存到臨時目錄中,作為第二次MapReduce任務的輸入目錄 2、任務二:利用Hadoop提供的InverseMapper實現key與value位置互換,自定義一個IntWri

Linux 環境下執行hadoopwordcount 程式

我相信每個人部署完hadoop 環境之後,都想執行一下程式,就比如學習java的時候,配置好環境變數,就想執行一下程式輸出一個hellop word。 wordcount程式,統計文字中的字元出現次數。本次,主要目的是在liunx執行一下wordcount,看看輸出的效果。下一篇會介紹在

在eclipse中編寫HadoopWordCount程式,並在eclipse中執行

基於Windows7 + jdk1.8.0_162 + eclipse4.7.2 + Hadoop2.7.7 一、安裝eclipse(自行百度) 二、安裝jdk(自行百度) 三、下載maven倉庫並在eclipse中配置maven環境(後面的文章講) 四、在eclipse中新建一個maven

CentOS虛擬機器Java環境中MapReduce HadoopWordCount(詞頻運算)程式連線資料入門

目錄 1. Hadoop 簡介 2. Hadoop 的架構 3. MapReduce 簡介 4. Hadoop HDFS 簡介 5. HDFS架構 6. MapReduce開發流程概念(重點) 7. maperuce 運算開發示例(重點) 8. hdfs 的資料型

windows下idea編寫WordCount程式,並打jar包上傳到hadoop叢集執行(傻瓜版)

通常會在IDE中編制程式,然後打成jar包,然後提交到叢集,最常用的是建立一個Maven專案,利用Maven來管理jar包的依賴。 一、生成WordCount的jar包 1. 開啟IDEA,File→New→Project→Maven→Next→填寫Groupld和Artifactld→Ne

mapreduce程式的按照key值從大到小排列

  在近期的Hadoop的學習中,在學習mapreduce時遇到問題:讓求所給資料的top10,們我們指導mapreduce中是有預設的排列機制的,是按照key的升序從大到小排列的 然而top10問題的求解需要按照降序排列。在網上找了很長時間才得以解決,解決方法如下:   自定義一

HadoopWordcount流量統計入門例項

一:何為MapReduce HDFS和MapReduce是Hadoop的兩個重要核心,其中MR是Hadoop的分散式計算模型。MapReduce主要分為兩步Map步和Reduce步,引用網上流傳很廣的一個故事來解釋,現在你要統計一個圖書館裡面有多少本書,為了完成這個任務,你可以指派小明去統計書架

偽分散式執行Hadoop例項之HDFS執行MapReduce程式

一、前期準備 準備一臺客戶機 安裝jdk 配置環境變數 安裝Hadoop 配置環境變數 二、配置叢集 配置hadoop-env.sh檔案 cd /opt/module/hadoop-2.7.2/etc/hadoop vim hadoo

Hadoop第一個測試例項WordCount的執行

首先確保hadoop已經正確安裝、配置以及執行。 拷貝WordCount.java到我們的資料夾,下載的hadoop裡帶有WordCount.java,路徑為: hadoop-0.20.203.0/src/examples/org/apache/hadoop/example

Hadoop平臺中執行MapReduce WordCount程式

一、實驗名稱 在Hadoop平臺執行MapReduce程式 二、實驗過程 1.設定環境變數 (1)編輯~/.bashrc檔案,新增下列語句 export HADOOP_HOME=/usr/local/hadoop export CLASSPATH=.:$JAVA_HOME/

hadoop入門教程-程式例項

無論是在微信還是QQ,我們經常看到好友推薦這樣的功能,其實這個功能是在大資料的基礎上實現的,下面來看具體的程式碼實現: 在src下新增三個類:JobRun.java: package com.lftgb.mr; import java.io.IOException; i

(python)編寫程式,生成包涵20個隨機數的列表,然後將前10升序排列,後10排列

如果本題目幫助到了你,請點選關注我一下,嘿嘿!! 方法一 import random def RandomNumbers(number,start,end): data=[]

編寫程式,生成一個包含20個隨機整數的列表,然後對其中偶數下標的元素進行排列,基數下標的元素不變

import random x=[random.randint(0,100) for i in range(20)] print(x) y=x[::2] y.sort() y.reverse() x[::2]=y print(x)

一步一步跟我學習hadoop(2)----hadoop eclipse外掛安裝和執行wordcount程式

本部落格hadoop版本是hadoop  0.20.2。 安裝hadoop-0.20.2-eclipse-plugin.jar 下載hadoop-0.20.2-eclipse-plugin.jar檔案,並新增到eclipse外掛庫,新增方法很簡單:找到eclipse安裝目錄

Hadoop HDFS 配置、格式化、啟動、基本使用Hadoop MapReduce配置、wordcount程式提交

Hadoop的安裝方式     單機:所有的服務執行在一個程序裡面,開發階段才會使用     分散式:將多個服務(JVM),分別執行在多臺機器上。     偽分散式:將多個服務(JVM)執行在一臺機器上 Hadoop偽分散式安裝     文件:http://hadoop.a

hadoop簡單例項-WordCount

本例項先貼原始碼,再講解步驟。 程式碼如下: package test; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.

hadoop入門(六)JavaAPI+Mapreduce例項wordCount單詞計數詳解

剛剛研究了一下haoop官網單詞計數的例子,把詳細步驟解析貼在下面: 準備工作: 1、haoop叢集環境搭建完成 2、新建一個檔案hello,並寫入2行單詞,如下: [[email protected] hadoop-2.6.0]# vi hello hello