1. 程式人生 > >Hadoop完全分散式下實現自定義排序、分割槽和分組

Hadoop完全分散式下實現自定義排序、分割槽和分組

    經過前面一段時間的學習,簡單的單詞統計已經不能實現更多的需求,就連自帶的一些函式方法等也是跟不上節奏了;加上前面一篇MapReduce的底層執行步驟的瞭解,今天學習自定義的排序、分組、分割槽相對也特別容易。

自定義排序

自定義的排序有許多許多,根據不同的業務需求,重寫父類的方法即可。這裡介紹兩種常用的自定義排序:

一、自定義普通的正、倒排序

Mapper檔案不需要太多的修改,首先建立一個自定義的排序類,繼承一個Comparator(IntWritable.Comparator是子類),重寫裡面的compare方法即可。

eg:

Mymaper

package sort_2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

public class MyMapper extends Mapper<LongWritable, Text, LongWritable,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String values = value.toString();
        StringTokenizer st = new StringTokenizer(values);
        while (st.hasMoreTokens()) {
            /*
            *MapReduce對自動對map階段的輸出資料進行分組、排序、歸併等操作;
            *所以我們這裡需要把key與value值反過來傳給reducer;
            *然後在reducer階段的時候再把位置調換回來即可。
            *注意:這裡的st.nextToken()的位置,第一次呼叫就能獲取到第一個值,以此類推。
            */
           key= new Text(st.nextToken());
           value = new LongWritable(Long.parseLong(st.nextToken()));
           context.write(value,key);
        }
    }
}

Reducer類

package sort_2;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MySortReduce extends Reducer<LongWritable,Text, Text, LongWritable> {
   /*讓reduce預設分組排序*/

    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(new Text(value),key);
        }
    }
}

MySort類

package sort_2;

import org.apache.hadoop.io.IntWritable;

public class MySort extends IntWritable.Comparator {

    @Override
    public int compare(Object a, Object b) {
        return super.compare(a, b);//結果正序
        // return -super.compare(a, b);//結果倒序
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        return super.compare(b1, s1, l1, b2, s2, l2);//結果正序
        // return -super.compare(b1, s1, l1, b2, s2, l2);//結果倒序
    }
}

Runner類

這個就不粘出來了,就是正常的寫,多加一句 job.setSortComparatorClass(MySort.class);

注意:重寫後需要正序的話不需要動任何的引數,倒序的話把返回值改成倒數即可。最後需要在Runner中加上一句

job.setSortComparatorClass(MySort.class);//把自定義排序類的地址給job(很重要,不加等於沒有排序)

二、自定義二次排序的正、倒排序

這個的話憑空想象就有些難理解,我們來用一道題講解。

二次排序的需求說明:

按第一列進行正序排序,若有相同的資料按照第二列資料的大小正序排序;我們可以把這些資料看做一個一個的鍵值對或組,前後兩個數是一體的,一個變位置前後一行一同換位置。

先來看程式碼演示:

 Mapper類

package sort_2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;
/*
*讓我們的自定義排序MySort作為Map階段的最終輸出
*/
public class MyMapper extends Mapper<LongWritable, Text, MySort, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String values = value.toString();
        StringTokenizer st = new StringTokenizer(values);
        while (st.hasMoreTokens()) {
            MySort mySort = new MySort(Long.parseLong(st.nextToken()), Long.parseLong(st.nextToken()));//把需要排序的資料給我們的自定義排序
            context.write(mySort, new LongWritable(Long.parseLong(mySort.secondNum.toString())));//輸出到reducer
        }
    }
}

Reducer類

package sort_2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

import javax.xml.soap.Text;
import java.io.IOException;


public class MyReducer extends Reducer<MySort, LongWritable, LongWritable, LongWritable> {
    /**
     * 接收到Map階段傳輸的MySort類的key後,遍歷values,輸出最終結果
     * 這裡需要注意的是:輸出的key值是一個longWritable型資料,不是一個MySort物件,需要取出物件中的屬性
     */
    @Override
    protected void reduce(MySort key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        for (LongWritable value : values) {
            context.write(new LongWritable(key.firstNum),value);
        }
    }
}

MySort類

package sort_2;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MySort implements WritableComparable<MySort> {
    public Long firstNum;
    public Long secondNum;

    public MySort() {//無參構造必須提供,不然報錯
    }

    public MySort(Long firstNum, Long secondNum) {
        this.firstNum = firstNum;
        this.secondNum = secondNum;
    }

    /**
     * 比較兩個數的前後大小,有三種情況:
     * 1:-1--第一列的當前數小於當前列的上一個數
     * 2:1--第一列的當前數大於當前列的上一個數
     * 3:0--相等,兩個數相減等於零,這時就會比較第二列的資料大小,這時也會有三種情況,同上;
     * 接下來就不屬於我們的工作了,WritableComparable預設繼承了Writable, Comparable<T>兩個類,剩下的工作就交給他們了。
     * @param o
     * @return
     */
    @Override
    public int compareTo(MySort o) {
        int result = 0;
        int num = (int) (this.firstNum-o.firstNum);
        if (num != 0){
            result = num;
        }else{
            result = (int) (this.secondNum-o.secondNum);
        }
        return result;//正序
       //return -result;//倒序
    }

    /**
     * 序列化
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(firstNum);
        out.writeLong(secondNum);
    }

    /**
     * 反序列化
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.firstNum = in.readLong();
        this.secondNum = in.readLong();
    }
}

 WritableComparable的原始碼:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.hadoop.io;

import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
//預設的繼承了Writable,Comparable類
}

Runner類不演示了,需要注意的是把Mapper與Reducer的輸入輸出型別改成自定義的MySort排序型別,不需要新增job.setSortComparatorClass();我們使用預設的MapReduce的key排序分組加上自定義排序完成就足夠了。

結果:

70	70
70	80
70	90
80	70
95	10
95	34
95	90

接下來就是對以上詳細的解釋了:

在mapreduce操作時,shuffle階段會多次根據key值排序。但是在shuffle分組後,相同key值的values序列的順序是不確定的(如下圖)。如果想要此時value值也是排序好的,這種需求就是二次排序。

測試的檔案資料:

a 1 a 5 a 7 a 9 b 3 b 8 b 10

未經過二次排序的輸出結果:

a   9
a   7
a   5
a   1
b   10
b   8
b   3

實現思路:

將map端輸出的<key,value>中的key和value先傳入自定義的排序類中做比較處理,處理之後在重新拉取出來。這裡就變成<第一列,第二列>,在針對newKey(第一列)排序的時候,如果newKey相同,就再對value(第二列)進行排序。

  • 需要自定義的地方
  1. 自定義資料型別實現組合key 實現方式:繼承WritableComparable

注意:(容易被“坑”) 在reduce端對values進行迭代的時候,不要直接儲存value值或者key值,因為reduce方法會反覆執行多次,但key和value相關的物件只有兩個,reduce會反覆重用這兩個物件。需要用相應的資料型別.get()取出後再儲存。

自定義分割槽:

就使用簡單的詞頻統計來設定一個需求:

現在有三個檔案{a.txt,b.txt,c.txt}(代表三個分割槽),需要利用MapReduce的自定義分割槽計算出每一個分割槽中的詞頻統計結果。並將帶有“Hello”欄位的統計結果放入編號為‘1’的分割槽中,將帶有“World”欄位的統計結果放入編號為‘2’的分割槽中,其餘的放入編號為‘0’的分割槽中。

三個檔案中的內容為:

這個的話直接上演示程式碼,再解釋:

Mapper類:

package go_over.Map;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * @Author H.rosy
 * @Create 2018-09-16  21:46
 */
public class MyMap extends Mapper<LongWritable, Text, Text, Text> {

    void check(String text, String FName, Context context) throws IOException, InterruptedException {//檢查資料所屬檔案的方法
        Text k = new Text(text);
        Text v = new Text(FName);
        context.write(k, v);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String values = value.toString();
        StringTokenizer st = new StringTokenizer(values);
        FileSplit split = (FileSplit) context.getInputSplit();//建立檔案切割物件
        while (st.hasMoreElements()) {
            String name = split.getPath().getName();//利用檔案切割物件獲取檔案的名字
            if ("a.txt".equals(name)) {
                check(st.nextToken(), "a", context);//呼叫傳參的方法
            } else if ("b.txt".equals(name)) {
                check(st.nextToken(), "b", context);
            } else{
                check(st.nextToken(), "c", context);
            }
        }
    }
}

Reducer類:

package go_over.Reduce;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReduce extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        /**
         *   定義三個累加器分別代表三個檔案的詞頻統計結果
         */
        int countA = 0;
        int countB = 0;
        int countC = 0;
        /**
         * 遍歷資料集開始統計
         */
        for (Text value : values) {
            if ("a".equals(value.toString())) {
                countA++;
            } else if ("b".equals(value.toString())) {
                countB++;
            } else {
                countC++;
            }
        }
        //手動拼接一下統計的結果
        String result = "   a.txt-->" + countA + "   b.txt-->" + countB + "   c.txt-->" + countC;
        context.write(key, new Text(result));//輸出到檔案
    }
}

MyPartition類

package go_over.Partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartition extends Partitioner<Text, Text> {
    /**
     * 繼承一個Partitioner的抽象類
     * 重寫getPartition方法
     * @return
     */
    @Override
    public int getPartition(Text key, Text value, int i) {
        int falg = 0;//分割槽編號(標誌)
        if(key.find("Hello")==0){
            falg = 1;
        }else if(key.find("World")==0){
            falg = 2;
        }
        return falg;//返回的int數值代表著分割槽的編號
    }
}

Runner類:

package go_over.demo;
 
 
import com.bw.map.countMap;
import com.bw.map.sortMap;
import com.bw.reduce.countReduce;
import com.bw.reduce.sortReduce;
import com.bw.sort.MySort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
 
/**
 * This is my test 
 *
 * @Author 
 * @Create 2018-09-09  19:43
 */
public class MyDemo {
    /**
     * 首先建立一個靜態變數區
     *
     * @param args
     */
    static Configuration conf = new Configuration();
    static Job job = null;
    static FileSystem fs = null;
    static String uri = "hdfs://192.168.132.130:9000";
 
    static {//靜態程式碼塊
        try {
            conf.setBoolean("dfs.support.append", true);
            fs = FileSystem.get(URI.create(uri), conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //公共變數區
        String input = uri.concat("/test");//本人需要做處理的檔案都在這個目錄下        
        String output = uri.concat("/output");//統計好結果後的檔案存放目錄
       
 
        {//初始化job任務區
            job = Job.getInstance(conf);//定義一個job任務
            job.setJobName("wordCount");//新增工作名字
 
            job.setMapperClass(countMap.class);//新增map類對映
            job.setReducerClass(countReduce.class);//新增reduce對映
 
            job.setMapOutputKeyClass(Text.class);//設定map階段的輸出key型別
            job.setMapOutputValueClass(IntWritable.class);//設定map階段的輸出value型別
 
            job.setOutputKeyClass(Text.class);//設定最終階段的輸出key型別
            job.setOutputValueClass(IntWritable.class);//設定最終階段的輸出value型別

            job.setPartitionerClass(MyPartition.class);//設定分割槽的自定義類地址
            job.setNumReduceTasks(3);//設定分割槽數量

            checkFileExists(new Path[]{new Path(output)});//檢測檔案是否存在
            FileInputFormat.setInputPaths(job, new Path(input));//指定操作路徑
            FileOutputFormat.setOutputPath(job, new Path(output));//指定操作路徑
            job.waitForCompletion(true);//提交任務
        }
 
 
        {//結果展示模組
           
            {//統計結果展示塊
                String alert = "-------------------------------下面是統計結果--------------------------";
                getResult(new Path(sortInput), alert);
            }
            
        }
        {
            
            fs.close();//關閉資源
        }
    }
 
 
    synchronized static boolean getResult(Path path, String alert) throws IOException {
        FSDataInputStream open = fs.open(path);//開啟目標路徑的檔案
        BufferedReader reader = new BufferedReader(new InputStreamReader(open, "utf-8"));//設定緩衝區
        String res = "";
        System.err.println(alert);//輸出提示資訊
        while ((res = reader.readLine()) != null) {//迴圈按行讀取文字內容並賦值給res
            System.out.println(res);//輸出統計後的結果
        }
        reader.close();//關閉資源
        return true;
    }
 
    static void checkFileExists(Path... paths) throws IOException {//檢視檔案是否存在,避免出現檔案重複存在的錯誤
        for (Path path : paths) {
            boolean exists = fs.exists(path);
            if (exists) {
                fs.delete(path, true);
            }
        }
    }
 

最後的輸出結果為:

在HDFS的分割槽檔案中的效果為:

Found 4 items
-rw-r--r--   3 supergroup          0 2018-10-23 20:48 /output/_SUCCESS
-rw-r--r--   3 supergroup        128 2018-10-23 20:48 /output/part-r-00000
-rw-r--r--   3 supergroup         43 2018-10-23 20:48 /output/part-r-00001
-rw-r--r--   3 supergroup         43 2018-10-23 20:48 /output/part-r-00002
//最後一位代表的就是我們自定義的那個分割槽編號

自定義分組:

這是實現效果圖:

需求分析:根據第一列進行歸併分組後正序排序,並找出對應第二列每組中的最大值

技術實現:

(1).自定義分組比較器繼承RawComparator,實現compare()方法。

(2).在設定作業是設定job.setGroupingComparatorClass()。

Mapper、Reducer與Runner類

public class MyGroupTest {
	// 定義輸入路徑
	private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
	// 定義輸出路徑
	private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
 
	public static void main(String[] args) {
 
		try {
			// 建立配置資訊
			Configuration conf = new Configuration();
 
 
			// 建立檔案系統
			FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
			// 如果輸出目錄存在,我們就刪除
			if (fileSystem.exists(new Path(OUT_PATH))) {
				fileSystem.delete(new Path(OUT_PATH), true);
			}
 
			// 建立任務
			Job job = new Job(conf, MyGroupTest.class.getName());
 
			// 設定輸入目錄和設定輸入資料格式化的類
			FileInputFormat.setInputPaths(job, INPUT_PATH);
			job.setInputFormatClass(TextInputFormat.class);
 
			// 設定自定義Mapper類和設定map函式輸出資料的key和value的型別
			job.setMapperClass(MyGroupMapper.class);
			job.setMapOutputKeyClass(CombineKey.class);
			job.setMapOutputValueClass(LongWritable.class);
			
			//一定不要忘記設定自定義分組比較器的類(這一步是關鍵)
			job.setGroupingComparatorClass(MyGroupComparator.class);
			
			// 設定分割槽和reduce數量(reduce的數量,和分割槽的數量對應,因為分割槽為一個,所以reduce的數量也是一個)
			job.setPartitionerClass(HashPartitioner.class);
			job.setNumReduceTasks(1);
 
			// 排序、分組
			// 歸約
			// Shuffle把資料從Map端拷貝到Reduce端。
			// 指定Reducer類和輸出key和value的型別
			job.setReducerClass(MyGroupReducer.class);
			job.setOutputKeyClass(LongWritable.class);
			job.setOutputValueClass(LongWritable.class);
 
			// 指定輸出的路徑和設定輸出的格式化類
			FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
			job.setOutputFormatClass(TextOutputFormat.class);
 
			// 提交作業 退出
			System.exit(job.waitForCompletion(true) ? 0 : 1);
 
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
 
	public static class MyGroupMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable> {
		// 建立聯合的key
		private CombineKey combineKey = new CombineKey();
 
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
				InterruptedException {
			// 對輸入value進行分割
			String[] splits = value.toString().split("\t");
			// 設定聯合的Key
			combineKey.setComKey(Long.parseLong(splits[0]));
			combineKey.setComVal(Long.parseLong(splits[1]));
 
			// 傳給reducer計算
			context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
		}
	}
 
	public static class MyGroupReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable> {
		@Override
		protected void reduce(CombineKey combineKey, Iterable<LongWritable> values,
				Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
 
			long max = Long.MIN_VALUE;
			// 遍歷比較求出每個組中的最大值
			for (LongWritable val : values) {
 
				if (val.get() > max) {
					max= val.get();
				}
			}
 
			// 把原始資料中的第一列中的元素分組後的組號作為key,所求的最小值為value將結果寫出去
			context.write(new LongWritable(combineKey.getComKey()), new LongWritable(max));
		}
	}
}

二次排序及類

/**
 * 二次排序構造一個新的Key
 * @version
 */
class CombineKey implements WritableComparable<CombineKey> {
 
	private Long comKey;
	private Long comVal;
 
	// 無參建構函式必須提供,否則Hadoop的反射機制會報錯
	public CombineKey() {
	}
 
	// 有參建構函式
	public CombineKey(Long comKey, Long comVal) {
		this.comKey = comKey;
		this.comVal = comVal;
	}
 
	public Long getComKey() {
		return comKey;
	}
 
	public void setComKey(Long comKey) {
		this.comKey = comKey;
	}
 
	public Long getComVal() {
		return comVal;
	}
 
	public void setComVal(Long comVal) {
		this.comVal = comVal;
	}
 
	public void write(DataOutput out) throws IOException {
		out.writeLong(this.comKey);
		out.writeLong(this.comVal);
	}
 
	public void readFields(DataInput in) throws IOException {
		this.comKey = in.readLong();
		this.comVal = in.readLong();
	}
 
	/**
	 * 第一列按升序排列,第一列相同時,第二列也按升序排列
	 */
	public int compareTo(CombineKey o) {
		long minus = this.comKey - o.comVal;
		if (minus != 0) {
			return (int) minus;
		}
 
		return (int) (this.comVal - o.comVal);
	}
 
}

分組比較器類:

/**
 * 自定義分組比較器
 * @version
 */
class MyGroupComparator implements RawComparator<CombineKey> {
 
	// 分組策略中,這個方法不是重點
	public int compare(CombineKey o1, CombineKey o2) {
		// TODO Auto-generated method stub
		return 0;
	}
 
	/**
	 * b1 表示第一個參與比較的位元組陣列
	 * s1 表示第一個位元組陣列中開始比較的位置 
	 * l1 表示第一個位元組陣列中參與比較的位元組長度 
	 * b2 表示第二個參與比較的位元組陣列 
	 * s2 表示第二個位元組陣列中開始比較的位置 
	 * l2 表示第二個位元組陣列參與比較的位元組長度
	 */
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
 
		// 這裡是按第CombineKey中的第一個元素進行分組,因為是long型別,所以是8個位元組
		return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
	}
 
}

最終結果:

1 1
2 2
3 3