1. 程式人生 > >分散式處理框架——MapReduce

分散式處理框架——MapReduce

1 MapReduce 優點

  • 海量資料離線處理;
  • 易開發,易執行;

2 MapReduce 程式設計模型

  • 將作業拆分成 Map 階段和 Reduce 階段
  • Map階段: Map Tasks
  • Reduce階段: Reduce Tasks

2.1 wordcount 案例

在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述

2.2 核心概念

  • Split: 交由 MapReduce 作業來處理的資料塊,是MapReduce 中最小的計算單元;HDFS: blocksize 是HDFS 中的最小儲存單元,128M;預設情況下:Split 和 block 是一一對應的,也可以手動設定他們的關係(不建議)
  • InputFormat
  • OutputFormat
  • Combiner
  • Partitioner

3 MapReduce 架構

3.1 版本1.x

  • JobTracker : JT, 作業管理者,將作業分解成一堆的任務:Task(MapTask , ReduceTask);將任務分配給 TaskTracker 執行;作業的監控、容錯處理(task作業掛了,重啟 task的機制); 在一定的時間間隔內,JT沒有收到 TT 的心跳,TT可能掛了,這個TT上指派的任務可能被指派到其他TT上執行;
  • TaskTracker: TT, 任務的執行者,在TT 上執行 Task(MapTask , ReduceTask); 會與JT進行互動:執行、啟動、停止作業;傳送心跳資訊給 JT;
  • MapTask: 自己開發的 map 任務交由該 Task 處理;將 map 的輸出結果寫到本地磁碟;
  • ReduceTask : 對 Map Task 輸出的資料進行讀取;按照資料進行分組傳給我們自己編寫的 reduce 方法處理;
    在這裡插入圖片描述

3.2 MapReduce2.x

在這裡插入圖片描述

3 wordcount 案例

3.1 原始碼

package com.bzt.cn.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.
apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.FileOutputStream; import java.io.IOException; /* * MapReduce 版 wordcount * */ public class WordCountApp { public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { LongWritable one = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words) { context.write(new Text(word), one); } } } /* * Reducer ; 歸併操作 * */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { long sum = 0; @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 建立 job Job job = Job.getInstance(conf, "WC"); // 設定 job 處理類 job.setJarByClass(WordCountApp.class); //設定作業處理的輸入路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); //設定 map 相關的引數 job.setMapperClass(MyMapper.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //設定作業處理的輸出路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

3.2 maven 打包成jar,上傳到叢集

執行 [[email protected] ~]$ hadoop jar wordcount.jar com.bzt.cn.mapreduce.WordCountApp hdfs://node1:8020/hello.txt hdfs://node1:8020/wcout

[[email protected] ~]$ hadoop fs -ls /wcout
18/10/30 09:38:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 hadoop supergroup          0 2018-10-30 09:38 /wcout/_SUCCESS
-rw-r--r--   1 hadoop supergroup         30 2018-10-30 09:38 /wcout/part-r-00000
[[email protected] ~]$ hadoop fs -text /wcout/part-r-00000
18/10/30 09:39:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hello	4
jerry	5
tom	7
world	8
[[email protected] ~]$ 

3.4 增強版

 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

//        清理已經存在的輸出目錄
        Path outputPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(conf);
        if(fileSystem.exists(outputPath)){
            fileSystem.delete(outputPath);
            System.out.println("output file deleted!");
        }

        // 建立 job
        Job job = Job.getInstance(conf, "WC");
        // 設定 job 處理類
        job.setJarByClass(WordCountApp.class);
        //設定作業處理的輸入路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        //設定 map 相關的引數
        job.setMapperClass(MyMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //設定作業處理的輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

4 Combiner

  • 本地的 reducer
  • 減少Map Tasks 輸出的資料量及資料網路傳輸量
  • 適用場景:求和,次數
    在這裡插入圖片描述
    在這裡插入圖片描述

5 Partitioner

  • Partitioner 決定 MapTask 輸出的資料交由哪個ReduceTask處理
  • 預設實現: 分發的 key 的 hash 值對 Reduce Task 個數取模

5.1 測試資料

在這裡插入圖片描述

5.2 原始碼

package com.bzt.cn.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PartitionerApp {

    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        LongWritable one = new LongWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split(" ");

            context.write(new Text(words[0]),new LongWritable(Long.parseLong(words[1])));
        }

    }

    /*
     * Reducer ; 歸併操作
     * */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        long sum = 0;

        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            for (LongWritable value : values) {
                sum += value.get();
            }

            context.write(key, new LongWritable(sum));
        }
    }

    public static class MyPartitioner extends Partitioner<Text,LongWritable>{

        @Override
        public int getPartition(Text key, LongWritable value, int numPartitions) {

            if(key.toString().equals("dog")){
                return 0;
            }
            if(key.toString().equals("cat")){
                return 1;
            }
            if(key.toString().equals("duck")){
                return 2;
            }

            return 3;
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

//        清理已經存在的輸出目錄
        Path outputPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(conf);
        if(fileSystem.exists(outputPath)){
            fileSystem.delete(outputPath);
            System.out.println("output file deleted!");
        }

        // 建立 job
        Job job = Job.getInstance(conf, "WC");
        // 設定 job 處理類
        job.setJarByClass(PartitionerApp.class);
        //設定作業處理的輸入路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        //設定 map 相關的引數
        job.setMapperClass(MyMapper.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);


        //設定 job 的partition
        job.setPartitionerClass(MyPartitioner.class);
        //設定 4 個 reducer,每個分割槽一個
        job.setNumReduceTasks(4);

        //設定作業處理的輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}


5.3 maven 打包,在叢集執行

[[email protected] ~]$ hadoop jar part.jar com.bzt.cn.mapreduce.PartitionerApp hdfs://node1:8020/animal.txt hdfs://node1:8020/partionerout
[[email protected] ~]$ hadoop fs -ls /partionerout
18/10/30 10:39:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r--   1 hadoop supergroup          0 2018-10-30 10:38 /partionerout/_SUCCESS
-rw-r--r--   1 hadoop supergroup          6 2018-10-30 10:38 /partionerout/part-r-00000
-rw-r--r--   1 hadoop supergroup          6 2018-10-30 10:38 /partionerout/part-r-00001
-rw-r--r--   1 hadoop supergroup          7 2018-10-30 10:38 /partionerout/part-r-00002
-rw-r--r--   1 hadoop supergroup          8 2018-10-30 10:38 /partionerout/part-r-00003
[[email protected] ~]$ hadoop fs -cat /partionerout/part-r-00000
18/10/30 10:40:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
dog	7
[[email protected] ~]$ hadoop fs -cat /partionerout/part-r-00001
18/10/30 10:40:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
cat	6
[[email protected] ~]$ hadoop fs -cat /partionerout/part-r-00002
18/10/30 10:40:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
duck	7
[[email protected] ~]$ hadoop fs -cat /partionerout/part-r-00003
18/10/30 10:40:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
lion	13
[[email protected] ~]$ 

6 jobhistory

  • 記錄已執行完的 MapReduce 資訊到指定的HDFS目錄
  • 預設關閉

6.1 配置 jobhistory

/home/hadoop/apps/hadoop-2.6.0-cdh5.7.0/etc/hadoop
mapred-site.xml

<configuration>
	<property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    
    <property>
     	<name>mapreduce.jobhistory.address</name>
        <value>node1:10020</value>
    </property>
    
     <property>
     	<name>mapreduce.jobhistory.webapp.address</name>
        <value>node1:19888</value>
    </property>
    
    <property>
     	<name>mapreduce.jobhistory.done-dir</name>
        <value>/history/done</value>
    </property>
    
    <property>
     	<name>mapreduce.jobhistory.intermediate-done-dir</name>
        <value>/history/done_intermediate</value>
    </property>
    
    
    
</configuration>

6.2 啟動 history server

重啟一下 yarn

[[email protected] ~]$ mr-jobhistory-daemon.sh start historyserver

[[email protected] ~]$ mr-jobhistory-daemon.sh start historyserver
starting historyserver, logging to /home/hadoop/apps/hadoop-2.6.0-cdh5.7.0/logs/mapred-hadoop-historyserver-node1.out
[[email protected] ~]$ jps
6704 JobHistoryServer
6738 Jps
1395 DataNode
6245 ResourceManager
1271 NameNode
1559 SecondaryNameNode
6346 NodeManager
[[email protected] ~]$ 

6.3 測試

[[email protected] ~]$ cd /home/hadoop/apps/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce2
[[email protected] mapreduce2]$ clear
[[email protected] mapreduce2]$ hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3

訪問 http://node1:19888/jobhistory
在這裡插入圖片描述
點進去看 log
在這裡插入圖片描述
聚合沒有開啟
配置 yarn-site.xml

<configuration>

<!-- Site specific YARN configuration properties -->
	<property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>
    
</configuration>

重啟 yarn,再跑一次 pi