1. 程式人生 > >Hadoop之WordCount實戰詳解

Hadoop之WordCount實戰詳解

WorldCount可以說是MapReduce中的helloworld了,單詞計數主要完成的功能是:統計一系列文字檔案中每個單詞出現的次數,通過完成這個簡單程式讓讀者摸清 MapReduce 程式的基本結構。 特別是對於每一個階段的函式執行所產生的鍵值對。這裡對MapReduce過程原理不過多說明。

環境說明

  • CentOS 7
  • Hadoop 2.7.5
  • JDK 1.8
  • IDE是IDEA+Gradle,直接建立一個Gradle管理的Java專案即可,然後在build.gradle檔案中新增如下依賴:
compile 'org.apache.hadoop:hadoop-common:2.7.5'
compile 'ch.cern.hadoop:hadoop-mapreduce-client-core:2.7.5.0' compile 'commons-cli:commons-cli:1.2'
  • hadoop安裝路徑/usr/local/hadoop,在hadoop目錄下建一個input目錄,用來存放輸入檔案,一下是我自己建立的兩個檔案:
word1.txt

hello hadoop welcome
hello spark world

word2.txt

Spark is good!
Scala is similar with kotlin

程式設計實戰

1.編寫Map處理邏輯

首先自定義一個類繼承MapReduce框架的Mapper類。Map的輸入型別是鍵值對,鍵是行號,值就是單詞,輸出也是鍵值對,鍵是一個個單詞,值就是其出現的次數,這裡的次數值都是1,經過Reduce處理才得到最終次數。

public static class WordMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        protected
void map(Object key, Text value, Context context) throws IOException, InterruptedException { //key就是行號,value就是一行字串 StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }

2.編寫Reduce處理邏輯

Map處理完後,資料經過shuffle節點輸入給Reuduce任務,Reduce的輸入形式是(key, value-list)形式。

public static class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values)
            {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

3.編寫main函式

main函式要設定程式執行時引數(Configuration)以及環境引數(Job),Hadoop都是以作業(job)形式執行使用者程式,以下是整個程式程式碼:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Created by charming on 2018/4/4.
 */
public class WordCount {

    public static class WordMapper extends Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens())
            {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values)
            {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration(); //程式執行時引數
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2)
        {
            System.err.println("Usage wordcount <in> <out>");
            System.exit(2);
        }

        // 刪除輸出目錄
        Path outputPath = new Path(otherArgs[1]);
        if (outputPath.getFileSystem(conf).exists(outputPath));
        {
            outputPath.getFileSystem(conf).delete(outputPath, true);
        }

        Job job = Job.getInstance(conf, "word count"); //設定環境引數

        job.setJarByClass(WordCount.class); // 設定整個程式類名

        job.setMapperClass(WordMapper.class); //map函式
//        job.setCombinerClass(WordReduce.class); //combine的實現和reduce一樣
        job.setReducerClass(WordReduce.class); //reudce函式

        job.setOutputKeyClass(Text.class); //設定輸出key型別
        job.setOutputValueClass(IntWritable.class); //設定輸出value型別

        FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // 設定輸入檔案
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 設定輸出檔案

        System.exit(job.waitForCompletion(true)?0:1); // 提交作業
    }
}

注意,WordCount.java是直接在main目錄下建立,沒有包名。不然後面執行時會出異常。

4.編譯打包程式碼

有兩種方式

  • 在IDEA中使用gradle配置jar包
jar {
    baseName 'WordCount'
    String appMainClass = 'WordCount'
//    from {
//        //新增依懶到打包檔案
//        //configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
//        configurations.runtime.collect{zipTree(it)}
//    }
    manifest {
        attributes 'Main-Class': appMainClass
    }
}
  • 通過命令列形式打包
jar -cvf WordCount.jar ./WordCount*.class

在/usr/local/hadoop下面新建一個job目錄,將打包好的jar包放置在此目錄下。

5.在MapReduce上執行

hadoop的輸入輸出都是在hdfs檔案系統上的。

首先需要在 HDFS 中建立使用者目錄,在hadoop目錄下執行命令:

./bin/hdfs dfs -mkdir -p /user/hadoop

在 HDFS 中建立輸入目錄,這裡使用的是 hadoop 使用者,並且已建立相應的使用者目錄 /user/hadoop ,因此在命令中就可以使用相對路徑如 input,其對應的絕對路徑就是 /user/hadoop/input:

./bin/hdfs dfs -mkdir input

接著將 ./input 中的 txt 檔案作為輸入檔案複製到分散式檔案系統中,即將 /usr/local/hadoop/input 複製到分散式檔案系統中的 /user/hadoop/input 中。

./bin/hdfs dfs -put ./input/*.txt input

複製完成後,可以通過如下命令檢視檔案列表:

./bin/hdfs dfs -ls input

執行 MapReduce 作業

./bin/hadoop jar ./job/WordCount.jar WordCount input output

檢視位於 HDFS 中的輸出結果:

./bin/hdfs dfs -cat output/*

我們也可以將執行結果取回到本地:

rm -r ./output    # 先刪除本地的 output 資料夾(如果存在)
./bin/hdfs dfs -get output ./output     # 將 HDFS 上的 output 資料夾拷貝到本機
cat ./output/*

Hadoop 執行程式時,輸出目錄不能存在,否則會提示錯誤 org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://localhost:9000/user/hadoop/output already exists,因此若要再次執行,需要刪除 output 資料夾。前面程式碼中已經給出程式碼刪除output檔案的方式。也可以手動使用下面命令進行刪除。

./bin/hdfs dfs -rm -r output    # 刪除 output 資料夾

下面是前面兩個檔案word1.txt、word2.txt執行的結果:
這裡寫圖片描述