1. 程式人生 > >MapReduce的自定義排序、分割槽和分組

MapReduce的自定義排序、分割槽和分組

1.自定義排序(WritableComparable)

  1. 我們寫mr程式來處理文字時,經常會將處理後的資訊封裝到我們自定義的bean中,並將bean作為map輸出的key來傳輸。上一文我用圖解分析了mr程式的基本流程。而mr程式會在處理資料的過程中(傳輸到reduce之前)對資料排序(如:map端生成的檔案中的內容分割槽且區內有序)。

  2. 我們自定義bean來封裝處理後的資訊的話,我們可以自定義排序規則來挑選bean中的某幾個屬性來作為排序的依據,這樣就很靈活了。

import org.apache.hadoop.io.WritableComparable;

public class Person implements WritableComparable<Person> {
    private
String name; //姓名 private int age; //年齡 private int charm; //魅力值 // 如果空建構函式被覆蓋,一定要顯示的定義一下,否則反序列化時會拋異常。 public Person() { } public Person(String name, int age, int charm) { super(); this.name = name; this.age = age; this.charm = charm; } public
String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public int getCharm() { return charm; } public
void setCharm(int charm) { this.charm = charm; } @Override //hadoop的反序列化 public void readFields(DataInput in) throws IOException { name=in.readUTF(); age=in.readInt(); charm=in.readInt(); } @Override //hadoop的序列化 public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(age); out.writeInt(charm); } @Override //先按照年齡排序,在按照魅力值排序(年齡小,魅力大的在前) public int compareTo(Person o) { if(o.age==this.age){ if(o.charm==this.charm){ return 0; }else{ return o.charm-this.charm; } }else{ return this.age-o.age; } } }
  • 上要實現自定義排序,需要實現WritableComparable這個介面,然後實現三個方法readFields(反序列化)、write(序列化)、和最關鍵的compareTo(排序)。在mr過程中發生排序的地方就會按照我自定義的排序規則來排序。前提,map的輸出的key為封裝的Person。

  • 注意1:java的序列化過於重量級(Serializable),所以hadoop開發了一套自己的序列化和反序列化策略(Writable,精簡高效),因為map端的檔案要下載到reduce端的話如果不在同一臺節點上是會走網路進行傳輸(hadoop-rpc),所以物件需要序列化

  • 注意2:如果空建構函式被覆蓋,一定要顯示的定義一下,否則反序列化時會拋異常。

2、自定義分割槽(Partitioner)

  1. Mapreduce中會將maptask輸出的kv對,預設(HashPartitioner)根據key的hashcode%reducetask數來分割槽。
  2. (1)如果要按照我們自己的需求進行分組,則需要改寫資料分發元件Partitioner繼承抽象類:Partitioner。
    (2)在job物件中,設job.setPartitionerClass(自定義分割槽類.class)
import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * @author zzw
 * map端的輸出型別為(Text,Text),這裡自定的分割槽策略為key的首位如果為1,則進入0號分割槽;如果為2,則進入1號分割槽;如果是3則進入2號分割槽
 * 假設資料為:
 * 1367788000   hahaah
 * 2342344234   xiaomei
 * 3324234234   zzzz
 * 6666668888   wwww
 * 7777777777   ssss
 */
public class CustomPartitioner extends Partitioner<Text,Text>{
    static HashMap<String, Integer> numMap = new HashMap<String, Integer>();
    static {
        numMap.put("1", 0);
        numMap.put("2", 1);
        numMap.put("3", 2);
    }
    /*
     * 1)numPartitions其實我們可以設定,在job.setNumReduceTasks(n)設定。
     * 2)如果我們job.setNumReduceTasks(5),那麼這裡的numPartitions=5,那麼預設的HashPartitioner的機制就是用key的hashcode%numPartitions來決定分割槽屬於哪個分割槽,所以分割槽數量就等於我們設定的reduce數量5個。
     */
    @Override    
    public int getPartition(Text key, Text value, int numPartitions) {
        Integer hash = numMap.get(key.toString().substring(0, 1));
        //將沒有匹配到的資料放入3號分割槽
        return hash==null?3:hash;
    }
}

3、自定義分組(GroupingComparator)

  1. 假設我們將上面自定義的Person(bean)作為key傳送給reduce,而在reduce端我們希望將年齡相同的kv聚合成組,那麼就可以如下方式實現。
  2. 自定義分組要繼承WritableComparator,然後重寫compare方法。
  3. 定義完成後要設定job.setGroupingComparatorClass(CustomGroupingComparator.class);
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CustomGroupingComparator extends WritableComparator{
    protected CustomGroupingComparator() {
        super(Person.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        Person abean = (Person) a;
        Person bbean = (Person) b;
        //將item_id相同的bean都視為相同,從而聚合為一組
        return abean.getAge()-bbean.getAge();
    }
}