1. 程式人生 > >MapReduce的Shuffle機制

MapReduce的Shuffle機制

1、MapReduce的shuffle機制

1.1、概述

 MapReduce中,mapper階段處理的資料如何傳遞給reduce階段,是MapReduce框架中最關鍵的一個流程,這個流程就叫shuffle.

Shuffle:資料混洗---------(核心機制:資料分割槽,排序,區域性聚合,快取,拉取,再合併排序)

具體來說,就是將MapTask輸出的處理資料結果,按照Partitioner元件制定的規則分發ReduceTask,並在分發的過程中,對資料按key進行分割槽和排序

1.2、主要流程

Shuffle是MapReduce處理流程中的一個核心,它的每一個處理步驟是分散在各個Maptask和reducetask節點上完成的,整體來看,分為3個操作:

1、分割槽partition(如果reduceTask只有一個或者沒有,那麼partition將不起作用。設定沒設定相當於沒有)

2、Sort根據key排序(MapReduce程式設計中sort是一定會做的,並且只能按照key排序,當然如果沒有reduce階段,那麼就不會對key排序)

3、Combiner進行區域性value的合併(Combiner是可選的元件,作用是為了提高任務的執行效率)

 

1.3、詳細流程

1、mapTask收集我們map()方法輸出的kv對,放在記憶體緩衝區kvbuffer(環形緩衝區:記憶體中的一種首尾相連的資料結構,kvbuffer包含資料區和索引區)中,在存資料的時候,會呼叫partitioner進行分割槽編號的計算,並存入元資料中

2、當記憶體緩衝區的資料達到100*0.8時,就會開始溢寫到本地磁碟檔案file.out,可能會溢位多次,則會有多個檔案,相應的緩衝區中的索引區資料溢位為磁碟索引檔案file.out.index

3、在溢寫前,會先根據分割槽編號排序,相同的分割槽的資料,排在一起,再根據map的key排序(快排)

4、多個溢寫檔案會被合併成大的溢位檔案(歸併排序)

5、在資料量大的時.候,可以對maptask結果啟用壓縮,將mapreduce.map.ouput.compress設為true,並使用

mapreduce.map.output.compress.codec設定使用的壓縮演算法,可以提高資料傳輸到reduce端的效率

6、reduceTask根據自己的分割槽號,去各個mapTask機器上取相應的結果分割槽資料

7、reduceTask會取到同一個分割槽的來自不同mapTask的結果檔案,reduceTask會將這些檔案再進行合併(歸併排序)

8、合併成r大檔案後,shuffle的過程也就結束了,後面進入reduceTask的邏輯運算過程(從檔案中取出一個一個的鍵值對group,呼叫使用者自定義的reduce()方法)

 

2、自定義Shuffle過程中的元件

1、自定義輸入

    預設輸入類:TextInputFormat

自定義:   

模仿   org.apache.hadoop.mapreduce.lib.input.LineRecordReader  和org.apache.hadoop.mapreduce.lib.input.TextInputFormat

1、自定義類繼承FileInputFormat
public class MyFileInputFormat extends FileInputFormat<Text, LongWritable>{
    @Override                                            
    public RecordReader<Text, LongWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        //例項化一個
        MyAllFileRecodReader reader = new MyAllFileRecodReader();
        //split引數和context都是框架自動傳入的,把這兩個引數傳給reader進行處理,以便獲取相關資訊
        reader.initialize(split, context);
        return reader;
    }
    /**
     * 給定的檔名可拆分嗎?返回false確保單個輸入檔案不會被分割。以便Mapper處理整個檔案。
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
}

2、自定義類實現RecordReader
public class MyFileRecodReader extends RecordReader<Text, LongWritable>{
    //用於儲存檔案系統輸入流
    private FSDataInputStream open = null;
    //儲存檔案長度
    private int fileSplitLength = 0;
    /**
     * 當前的MyAllFileRecodReader讀取到的一個key-value
     */
    private Text key = new Text();
    private LongWritable value = new LongWritable();

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        //通過InputSplit物件獲取檔案路徑
        FileSplit fileSplit = (FileSplit)split;
        Path path = fileSplit.getPath();
        //獲取檔案長度
        fileSplitLength = (int)fileSplit.getLength();

        //通過context物件獲取到配置檔案資訊,通過配置檔案獲取到一個當前檔案系統
        Configuration configuration = context.getConfiguration();
        FileSystem fs  = FileSystem.get(configuration);
        //獲取檔案系統的一個輸入流
        open = fs.open(path);
    }

    /**
     * 已讀標記
     * 如果為false,表示還沒有進行讀取
     * 在需求中一個mapTask只處理一個小檔案,一個mapTask最終只需要讀取一次就完畢
     * 如果一個檔案讀取完畢了,那麼就把isRead這個變數標記為true
     */
    private boolean isRead = false;

    /**
     *  實現讀取規則:逐檔案讀取
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        //如果沒有讀取過檔案就進入
        if(!isRead){
            //準備一個位元組陣列長度為檔案的長度
            byte[] buffer = new byte[fileSplitLength];
            //一次性把真個檔案讀入位元組陣列中
            IOUtils.readFully(open, buffer);
            //把讀取到的檔案傳給key
            key.set(buffer, 0, fileSplitLength);
            //設定已讀標記為true
            isRead = true;
            //返回讀取一個檔案成功標記
            return true;
        }else{
            return false;
        }
    }

    //獲取key的方法
    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    //獲取當前value值
    @Override
    public LongWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    /**
     * 獲取資料的處理進度的
     */
    @Override
    public float getProgress() throws IOException, InterruptedException {
        //已讀為真返回1.0,沒有讀返回0
        return  isRead ? 1.0F : 0F;
    }

    @Override
    public void close() throws IOException {
        //關閉輸入流
        IOUtils.closeQuietly(open);
    }

2、自定義分割槽

需要:  1、繼承 partitioner

             2、重寫getpartition()方法

             3、在main方法中指定分割槽類  job.setPartitionclass()

package homework;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class Mypartition extends Partitioner<Student, Text> {
	@Override
	public int getPartition(Student key, Text arg1, int arg2) {
		if(key.getType().equals("math")){
			return 0;
		}
		if(key.getType().equals("english")){
			return 1;
		}
		if(key.getType().equals("computer")){
			return 2;
		}else{
			return 3;
		}
	}
}

3、自定義排序

  需要  :    1、實現writableComparable

                   2、重新write()、readFields()、compareTo()方法

package homework;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Student implements WritableComparable<Student> {
    private String type;
    private String name;
    private Double avg;
    
	public Student() {
		super();
	}
	public Student(String type, String name, Double avg) {
		super();
		this.type = type;
		this.name = name;
		this.avg = avg;
	}
	public String getType() {
		return type;
	}
	public void setType(String type) {
		this.type = type;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
    public Double getAvg() {
		return avg;
	}
	public void setAvg(Double avg) {
		this.avg = avg;
	}
	@Override
	public String toString() {
		return   type + "\t" + name + "\t" + avg ;
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		this.type=in.readUTF();
		this.name=in.readUTF();
		this.avg=in.readDouble();	
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(type);
		out.writeUTF(name);
		out.writeDouble(avg);	
	}
	@Override
	public int compareTo(Student o) {
		int temp=o.getType().compareTo(this.getType());
		if(temp==0){
			if(o.getAvg()>this.getAvg()){
				return 1;
			}else if(o.getAvg()<this.getAvg()){
				return -1;
			}else{
				return 0;
			}
		}
		return temp;
	}
}

4、自定義分組

需要  :     1、繼承writableComparable

                  2、重寫compare()方法

                  3、指定分組類  job.setGroupingComparatorClass(MyGroup.class);

                  4、既有分割槽又有排序的時候,分組欄位一定在排序欄位中

package homework;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroup extends WritableComparator {

	public MyGroup() {
		super(Student.class,true);
	}

	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		Student aa=(Student)a;
		Student bb=(Student)b;
		return aa.getType().compareTo(bb.getType());
	} 
}

5、自定義輸出

1)模仿 org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

public class MyMultipePathOutputFormat extends FileOutputFormat<Text, NullWritable>{
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        //獲得當前的檔案系統傳給自定義的RecordWriter元件
        Configuration configuration = job.getConfiguration();
        FileSystem fs = FileSystem.get(configuration);
        try {
            //返回一個RecordWriter正在處理輸出資料的元件
            return new MyMutiplePathRecordWriter(fs);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

2)繼承RecordWriter 並實現write()方法

public class MyMutiplePathRecordWriter extends RecordWriter<Text, NullWritable>{
    //宣告要輸出的兩個路徑
    private DataOutputStream out_jige;
    private DataOutputStream out_bujige;

    public MyMutiplePathRecordWriter(FileSystem fs) throws Exception {
        //建立系統輸出流
        out_jige = fs.create(new Path("E:\\bigdata\\cs\\jige\\my_output_jige.txt"));
        out_bujige = fs.create(new Path("E:\\bigdata\\cs\\bujige\\my_output_bujige.txt"));
    }

    /**
     * 實現寫出方法,根據需要寫出的格式自定義
     */
    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        //接受到的key格式為:course + "\t" + name + "\t" + avgScore
        String keyStr = key.toString();
        String[] split = keyStr.split("\t");
        //獲取到平均分欄位
        double score = Double.parseDouble(split[2]);
        //沒一行資料加入個換行符
        byte[] bytes = (keyStr + "\n").getBytes();

        //如果平均分大於60就用DataOutputStream寫出到jige目錄
        if(score >= 60){
            out_jige.write(bytes, 0, bytes.length);
        }else{//小於60分的寫道bujige目錄
            out_bujige.write(bytes, 0, bytes.length);
        }
    }

    /**
     * 在close方法中關閉輸出流。
     */
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeQuietly(out_jige);
        IOUtils.closeQuietly(out_bujige);
    }
}