Hadoop完全分散式下實現自定義排序、分割槽和分組
經過前面一段時間的學習,簡單的單詞統計已經不能實現更多的需求,就連自帶的一些函式方法等也是跟不上節奏了;加上前面一篇MapReduce的底層執行步驟的瞭解,今天學習自定義的排序、分組、分割槽相對也特別容易。
自定義排序
自定義的排序有許多許多,根據不同的業務需求,重寫父類的方法即可。這裡介紹兩種常用的自定義排序:
一、自定義普通的正、倒排序
Mapper檔案不需要太多的修改,首先建立一個自定義的排序類,繼承一個Comparator(IntWritable.Comparator是子類),重寫裡面的compare方法即可。
eg:
Mymaper
package sort_2; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; public class MyMapper extends Mapper<LongWritable, Text, LongWritable,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String values = value.toString(); StringTokenizer st = new StringTokenizer(values); while (st.hasMoreTokens()) { /* *MapReduce對自動對map階段的輸出資料進行分組、排序、歸併等操作; *所以我們這裡需要把key與value值反過來傳給reducer; *然後在reducer階段的時候再把位置調換回來即可。 *注意:這裡的st.nextToken()的位置,第一次呼叫就能獲取到第一個值,以此類推。 */ key= new Text(st.nextToken()); value = new LongWritable(Long.parseLong(st.nextToken())); context.write(value,key); } } }
Reducer類
package sort_2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MySortReduce extends Reducer<LongWritable,Text, Text, LongWritable> { /*讓reduce預設分組排序*/ @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(new Text(value),key); } } }
MySort類
package sort_2; import org.apache.hadoop.io.IntWritable; public class MySort extends IntWritable.Comparator { @Override public int compare(Object a, Object b) { return super.compare(a, b);//結果正序 // return -super.compare(a, b);//結果倒序 } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return super.compare(b1, s1, l1, b2, s2, l2);//結果正序 // return -super.compare(b1, s1, l1, b2, s2, l2);//結果倒序 } }
Runner類
這個就不粘出來了,就是正常的寫,多加一句 job.setSortComparatorClass(MySort.class);
注意:重寫後需要正序的話不需要動任何的引數,倒序的話把返回值改成倒數即可。最後需要在Runner中加上一句
job.setSortComparatorClass(MySort.class);//把自定義排序類的地址給job(很重要,不加等於沒有排序)
二、自定義二次排序的正、倒排序
這個的話憑空想象就有些難理解,我們來用一道題講解。
二次排序的需求說明:
按第一列進行正序排序,若有相同的資料按照第二列資料的大小正序排序;我們可以把這些資料看做一個一個的鍵值對或組,前後兩個數是一體的,一個變位置前後一行一同換位置。
先來看程式碼演示:
Mapper類
package sort_2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
/*
*讓我們的自定義排序MySort作為Map階段的最終輸出
*/
public class MyMapper extends Mapper<LongWritable, Text, MySort, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String values = value.toString();
StringTokenizer st = new StringTokenizer(values);
while (st.hasMoreTokens()) {
MySort mySort = new MySort(Long.parseLong(st.nextToken()), Long.parseLong(st.nextToken()));//把需要排序的資料給我們的自定義排序
context.write(mySort, new LongWritable(Long.parseLong(mySort.secondNum.toString())));//輸出到reducer
}
}
}
Reducer類
package sort_2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
import javax.xml.soap.Text;
import java.io.IOException;
public class MyReducer extends Reducer<MySort, LongWritable, LongWritable, LongWritable> {
/**
* 接收到Map階段傳輸的MySort類的key後,遍歷values,輸出最終結果
* 這裡需要注意的是:輸出的key值是一個longWritable型資料,不是一個MySort物件,需要取出物件中的屬性
*/
@Override
protected void reduce(MySort key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
for (LongWritable value : values) {
context.write(new LongWritable(key.firstNum),value);
}
}
}
MySort類
package sort_2;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class MySort implements WritableComparable<MySort> {
public Long firstNum;
public Long secondNum;
public MySort() {//無參構造必須提供,不然報錯
}
public MySort(Long firstNum, Long secondNum) {
this.firstNum = firstNum;
this.secondNum = secondNum;
}
/**
* 比較兩個數的前後大小,有三種情況:
* 1:-1--第一列的當前數小於當前列的上一個數
* 2:1--第一列的當前數大於當前列的上一個數
* 3:0--相等,兩個數相減等於零,這時就會比較第二列的資料大小,這時也會有三種情況,同上;
* 接下來就不屬於我們的工作了,WritableComparable預設繼承了Writable, Comparable<T>兩個類,剩下的工作就交給他們了。
* @param o
* @return
*/
@Override
public int compareTo(MySort o) {
int result = 0;
int num = (int) (this.firstNum-o.firstNum);
if (num != 0){
result = num;
}else{
result = (int) (this.secondNum-o.secondNum);
}
return result;//正序
//return -result;//倒序
}
/**
* 序列化
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(firstNum);
out.writeLong(secondNum);
}
/**
* 反序列化
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.firstNum = in.readLong();
this.secondNum = in.readLong();
}
}
WritableComparable的原始碼:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.hadoop.io;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@Public
@Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
//預設的繼承了Writable,Comparable類
}
Runner類不演示了,需要注意的是把Mapper與Reducer的輸入輸出型別改成自定義的MySort排序型別,不需要新增job.setSortComparatorClass();我們使用預設的MapReduce的key排序分組加上自定義排序完成就足夠了。
結果:
70 70
70 80
70 90
80 70
95 10
95 34
95 90
接下來就是對以上詳細的解釋了:
在mapreduce操作時,shuffle階段會多次根據key值排序。但是在shuffle分組後,相同key值的values序列的順序是不確定的(如下圖)。如果想要此時value值也是排序好的,這種需求就是二次排序。
測試的檔案資料:
a 1 a 5 a 7 a 9 b 3 b 8 b 10
未經過二次排序的輸出結果:
a 9
a 7
a 5
a 1
b 10
b 8
b 3
實現思路:
將map端輸出的<key,value>中的key和value先傳入自定義的排序類中做比較處理,處理之後在重新拉取出來。這裡就變成<第一列,第二列>,在針對newKey(第一列)排序的時候,如果newKey相同,就再對value(第二列)進行排序。
- 需要自定義的地方
- 自定義資料型別實現組合key 實現方式:繼承WritableComparable
注意:(容易被“坑”) 在reduce端對values進行迭代的時候,不要直接儲存value值或者key值,因為reduce方法會反覆執行多次,但key和value相關的物件只有兩個,reduce會反覆重用這兩個物件。需要用相應的資料型別.get()取出後再儲存。
自定義分割槽:
就使用簡單的詞頻統計來設定一個需求:
現在有三個檔案{a.txt,b.txt,c.txt}(代表三個分割槽),需要利用MapReduce的自定義分割槽計算出每一個分割槽中的詞頻統計結果。並將帶有“Hello”欄位的統計結果放入編號為‘1’的分割槽中,將帶有“World”欄位的統計結果放入編號為‘2’的分割槽中,其餘的放入編號為‘0’的分割槽中。
三個檔案中的內容為:
這個的話直接上演示程式碼,再解釋:
Mapper類:
package go_over.Map;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
import java.util.StringTokenizer;
/**
* @Author H.rosy
* @Create 2018-09-16 21:46
*/
public class MyMap extends Mapper<LongWritable, Text, Text, Text> {
void check(String text, String FName, Context context) throws IOException, InterruptedException {//檢查資料所屬檔案的方法
Text k = new Text(text);
Text v = new Text(FName);
context.write(k, v);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String values = value.toString();
StringTokenizer st = new StringTokenizer(values);
FileSplit split = (FileSplit) context.getInputSplit();//建立檔案切割物件
while (st.hasMoreElements()) {
String name = split.getPath().getName();//利用檔案切割物件獲取檔案的名字
if ("a.txt".equals(name)) {
check(st.nextToken(), "a", context);//呼叫傳參的方法
} else if ("b.txt".equals(name)) {
check(st.nextToken(), "b", context);
} else{
check(st.nextToken(), "c", context);
}
}
}
}
Reducer類:
package go_over.Reduce;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReduce extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/**
* 定義三個累加器分別代表三個檔案的詞頻統計結果
*/
int countA = 0;
int countB = 0;
int countC = 0;
/**
* 遍歷資料集開始統計
*/
for (Text value : values) {
if ("a".equals(value.toString())) {
countA++;
} else if ("b".equals(value.toString())) {
countB++;
} else {
countC++;
}
}
//手動拼接一下統計的結果
String result = " a.txt-->" + countA + " b.txt-->" + countB + " c.txt-->" + countC;
context.write(key, new Text(result));//輸出到檔案
}
}
MyPartition類
package go_over.Partition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyPartition extends Partitioner<Text, Text> {
/**
* 繼承一個Partitioner的抽象類
* 重寫getPartition方法
* @return
*/
@Override
public int getPartition(Text key, Text value, int i) {
int falg = 0;//分割槽編號(標誌)
if(key.find("Hello")==0){
falg = 1;
}else if(key.find("World")==0){
falg = 2;
}
return falg;//返回的int數值代表著分割槽的編號
}
}
Runner類:
package go_over.demo;
import com.bw.map.countMap;
import com.bw.map.sortMap;
import com.bw.reduce.countReduce;
import com.bw.reduce.sortReduce;
import com.bw.sort.MySort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
/**
* This is my test
*
* @Author
* @Create 2018-09-09 19:43
*/
public class MyDemo {
/**
* 首先建立一個靜態變數區
*
* @param args
*/
static Configuration conf = new Configuration();
static Job job = null;
static FileSystem fs = null;
static String uri = "hdfs://192.168.132.130:9000";
static {//靜態程式碼塊
try {
conf.setBoolean("dfs.support.append", true);
fs = FileSystem.get(URI.create(uri), conf);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//公共變數區
String input = uri.concat("/test");//本人需要做處理的檔案都在這個目錄下
String output = uri.concat("/output");//統計好結果後的檔案存放目錄
{//初始化job任務區
job = Job.getInstance(conf);//定義一個job任務
job.setJobName("wordCount");//新增工作名字
job.setMapperClass(countMap.class);//新增map類對映
job.setReducerClass(countReduce.class);//新增reduce對映
job.setMapOutputKeyClass(Text.class);//設定map階段的輸出key型別
job.setMapOutputValueClass(IntWritable.class);//設定map階段的輸出value型別
job.setOutputKeyClass(Text.class);//設定最終階段的輸出key型別
job.setOutputValueClass(IntWritable.class);//設定最終階段的輸出value型別
job.setPartitionerClass(MyPartition.class);//設定分割槽的自定義類地址
job.setNumReduceTasks(3);//設定分割槽數量
checkFileExists(new Path[]{new Path(output)});//檢測檔案是否存在
FileInputFormat.setInputPaths(job, new Path(input));//指定操作路徑
FileOutputFormat.setOutputPath(job, new Path(output));//指定操作路徑
job.waitForCompletion(true);//提交任務
}
{//結果展示模組
{//統計結果展示塊
String alert = "-------------------------------下面是統計結果--------------------------";
getResult(new Path(sortInput), alert);
}
}
{
fs.close();//關閉資源
}
}
synchronized static boolean getResult(Path path, String alert) throws IOException {
FSDataInputStream open = fs.open(path);//開啟目標路徑的檔案
BufferedReader reader = new BufferedReader(new InputStreamReader(open, "utf-8"));//設定緩衝區
String res = "";
System.err.println(alert);//輸出提示資訊
while ((res = reader.readLine()) != null) {//迴圈按行讀取文字內容並賦值給res
System.out.println(res);//輸出統計後的結果
}
reader.close();//關閉資源
return true;
}
static void checkFileExists(Path... paths) throws IOException {//檢視檔案是否存在,避免出現檔案重複存在的錯誤
for (Path path : paths) {
boolean exists = fs.exists(path);
if (exists) {
fs.delete(path, true);
}
}
}
最後的輸出結果為:
在HDFS的分割槽檔案中的效果為:
Found 4 items
-rw-r--r-- 3 supergroup 0 2018-10-23 20:48 /output/_SUCCESS
-rw-r--r-- 3 supergroup 128 2018-10-23 20:48 /output/part-r-00000
-rw-r--r-- 3 supergroup 43 2018-10-23 20:48 /output/part-r-00001
-rw-r--r-- 3 supergroup 43 2018-10-23 20:48 /output/part-r-00002
//最後一位代表的就是我們自定義的那個分割槽編號
自定義分組:
這是實現效果圖:
需求分析:根據第一列進行歸併分組後正序排序,並找出對應第二列每組中的最大值
技術實現:
(1).自定義分組比較器繼承RawComparator,實現compare()方法。
(2).在設定作業是設定job.setGroupingComparatorClass()。
Mapper、Reducer與Runner類
public class MyGroupTest {
// 定義輸入路徑
private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
// 定義輸出路徑
private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
public static void main(String[] args) {
try {
// 建立配置資訊
Configuration conf = new Configuration();
// 建立檔案系統
FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
// 如果輸出目錄存在,我們就刪除
if (fileSystem.exists(new Path(OUT_PATH))) {
fileSystem.delete(new Path(OUT_PATH), true);
}
// 建立任務
Job job = new Job(conf, MyGroupTest.class.getName());
// 設定輸入目錄和設定輸入資料格式化的類
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);
// 設定自定義Mapper類和設定map函式輸出資料的key和value的型別
job.setMapperClass(MyGroupMapper.class);
job.setMapOutputKeyClass(CombineKey.class);
job.setMapOutputValueClass(LongWritable.class);
//一定不要忘記設定自定義分組比較器的類(這一步是關鍵)
job.setGroupingComparatorClass(MyGroupComparator.class);
// 設定分割槽和reduce數量(reduce的數量,和分割槽的數量對應,因為分割槽為一個,所以reduce的數量也是一個)
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
// 排序、分組
// 歸約
// Shuffle把資料從Map端拷貝到Reduce端。
// 指定Reducer類和輸出key和value的型別
job.setReducerClass(MyGroupReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
// 指定輸出的路徑和設定輸出的格式化類
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.setOutputFormatClass(TextOutputFormat.class);
// 提交作業 退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MyGroupMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable> {
// 建立聯合的key
private CombineKey combineKey = new CombineKey();
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
InterruptedException {
// 對輸入value進行分割
String[] splits = value.toString().split("\t");
// 設定聯合的Key
combineKey.setComKey(Long.parseLong(splits[0]));
combineKey.setComVal(Long.parseLong(splits[1]));
// 傳給reducer計算
context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
}
}
public static class MyGroupReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable> {
@Override
protected void reduce(CombineKey combineKey, Iterable<LongWritable> values,
Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
long max = Long.MIN_VALUE;
// 遍歷比較求出每個組中的最大值
for (LongWritable val : values) {
if (val.get() > max) {
max= val.get();
}
}
// 把原始資料中的第一列中的元素分組後的組號作為key,所求的最小值為value將結果寫出去
context.write(new LongWritable(combineKey.getComKey()), new LongWritable(max));
}
}
}
二次排序及類
/**
* 二次排序構造一個新的Key
* @version
*/
class CombineKey implements WritableComparable<CombineKey> {
private Long comKey;
private Long comVal;
// 無參建構函式必須提供,否則Hadoop的反射機制會報錯
public CombineKey() {
}
// 有參建構函式
public CombineKey(Long comKey, Long comVal) {
this.comKey = comKey;
this.comVal = comVal;
}
public Long getComKey() {
return comKey;
}
public void setComKey(Long comKey) {
this.comKey = comKey;
}
public Long getComVal() {
return comVal;
}
public void setComVal(Long comVal) {
this.comVal = comVal;
}
public void write(DataOutput out) throws IOException {
out.writeLong(this.comKey);
out.writeLong(this.comVal);
}
public void readFields(DataInput in) throws IOException {
this.comKey = in.readLong();
this.comVal = in.readLong();
}
/**
* 第一列按升序排列,第一列相同時,第二列也按升序排列
*/
public int compareTo(CombineKey o) {
long minus = this.comKey - o.comVal;
if (minus != 0) {
return (int) minus;
}
return (int) (this.comVal - o.comVal);
}
}
分組比較器類:
/**
* 自定義分組比較器
* @version
*/
class MyGroupComparator implements RawComparator<CombineKey> {
// 分組策略中,這個方法不是重點
public int compare(CombineKey o1, CombineKey o2) {
// TODO Auto-generated method stub
return 0;
}
/**
* b1 表示第一個參與比較的位元組陣列
* s1 表示第一個位元組陣列中開始比較的位置
* l1 表示第一個位元組陣列中參與比較的位元組長度
* b2 表示第二個參與比較的位元組陣列
* s2 表示第二個位元組陣列中開始比較的位置
* l2 表示第二個位元組陣列參與比較的位元組長度
*/
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// 這裡是按第CombineKey中的第一個元素進行分組,因為是long型別,所以是8個位元組
return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
}
}
最終結果:
1 1
2 2
3 3