MapReduce的自定義排序、分割槽和分組
阿新 • • 發佈:2019-01-25
1.自定義排序(WritableComparable)
我們寫mr程式來處理文字時,經常會將處理後的資訊封裝到我們自定義的bean中,並將bean作為map輸出的key來傳輸。上一文我用圖解分析了mr程式的基本流程。而mr程式會在處理資料的過程中(傳輸到reduce之前)對資料排序(如:map端生成的檔案中的內容分割槽且區內有序)。
我們自定義bean來封裝處理後的資訊的話,我們可以自定義排序規則來挑選bean中的某幾個屬性來作為排序的依據,這樣就很靈活了。
import org.apache.hadoop.io.WritableComparable;
public class Person implements WritableComparable<Person> {
private String name; //姓名
private int age; //年齡
private int charm; //魅力值
// 如果空建構函式被覆蓋,一定要顯示的定義一下,否則反序列化時會拋異常。
public Person() {
}
public Person(String name, int age, int charm) {
super();
this.name = name;
this.age = age;
this.charm = charm;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public int getCharm() {
return charm;
}
public void setCharm(int charm) {
this.charm = charm;
}
@Override //hadoop的反序列化
public void readFields(DataInput in) throws IOException {
name=in.readUTF();
age=in.readInt();
charm=in.readInt();
}
@Override //hadoop的序列化
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
out.writeInt(charm);
}
@Override //先按照年齡排序,在按照魅力值排序(年齡小,魅力大的在前)
public int compareTo(Person o) {
if(o.age==this.age){
if(o.charm==this.charm){
return 0;
}else{
return o.charm-this.charm;
}
}else{
return this.age-o.age;
}
}
}
上要實現自定義排序,需要實現WritableComparable這個介面,然後實現三個方法readFields(反序列化)、write(序列化)、和最關鍵的compareTo(排序)。在mr過程中發生排序的地方就會按照我自定義的排序規則來排序。前提,map的輸出的key為封裝的Person。
注意1:java的序列化過於重量級(Serializable),所以hadoop開發了一套自己的序列化和反序列化策略(Writable,精簡高效),因為map端的檔案要下載到reduce端的話如果不在同一臺節點上是會走網路進行傳輸(hadoop-rpc),所以物件需要序列化。
- 注意2:如果空建構函式被覆蓋,一定要顯示的定義一下,否則反序列化時會拋異常。
2、自定義分割槽(Partitioner)
- Mapreduce中會將maptask輸出的kv對,預設(HashPartitioner)根據key的hashcode%reducetask數來分割槽。
- (1)如果要按照我們自己的需求進行分組,則需要改寫資料分發元件Partitioner繼承抽象類:Partitioner。
(2)在job物件中,設job.setPartitionerClass(自定義分割槽類.class)
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author zzw
* map端的輸出型別為(Text,Text),這裡自定的分割槽策略為key的首位如果為1,則進入0號分割槽;如果為2,則進入1號分割槽;如果是3則進入2號分割槽
* 假設資料為:
* 1367788000 hahaah
* 2342344234 xiaomei
* 3324234234 zzzz
* 6666668888 wwww
* 7777777777 ssss
*/
public class CustomPartitioner extends Partitioner<Text,Text>{
static HashMap<String, Integer> numMap = new HashMap<String, Integer>();
static {
numMap.put("1", 0);
numMap.put("2", 1);
numMap.put("3", 2);
}
/*
* 1)numPartitions其實我們可以設定,在job.setNumReduceTasks(n)設定。
* 2)如果我們job.setNumReduceTasks(5),那麼這裡的numPartitions=5,那麼預設的HashPartitioner的機制就是用key的hashcode%numPartitions來決定分割槽屬於哪個分割槽,所以分割槽數量就等於我們設定的reduce數量5個。
*/
@Override
public int getPartition(Text key, Text value, int numPartitions) {
Integer hash = numMap.get(key.toString().substring(0, 1));
//將沒有匹配到的資料放入3號分割槽
return hash==null?3:hash;
}
}
3、自定義分組(GroupingComparator)
- 假設我們將上面自定義的Person(bean)作為key傳送給reduce,而在reduce端我們希望將年齡相同的kv聚合成組,那麼就可以如下方式實現。
- 自定義分組要繼承WritableComparator,然後重寫compare方法。
- 定義完成後要設定job.setGroupingComparatorClass(CustomGroupingComparator.class);
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class CustomGroupingComparator extends WritableComparator{
protected CustomGroupingComparator() {
super(Person.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Person abean = (Person) a;
Person bbean = (Person) b;
//將item_id相同的bean都視為相同,從而聚合為一組
return abean.getAge()-bbean.getAge();
}
}