1. 程式人生 > >MR之partition自定義分割槽器

MR之partition自定義分割槽器

maptask執行的結果都會放到一個分割槽檔案中,這個分割槽檔案有自己的編號,這個編號是通過一個hash演算法來生成的,通過對context.write(k,v)中的k進行hash會產生一個值,相同的key產生的值是一樣的,所以這種辦法能將相同的key值放到一個分割槽中。分割槽中的值會發送給reducetask進行相應的處理。
mapreduce框架中有預設的分割槽器,這個分割槽器叫做HashPartitioner,程式碼程式碼如下:
這是預設分割槽的原始碼

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred.lib;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.JobConf;

/** 
 * Partition keys by their {@link Object#hashCode()}. 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

} 
所以需要自己定義一個分割槽器,這個分割槽器通過判斷key的值返回相應的值。程式碼如下:
public class ProvincePartition extends Partitioner<Text,ProvinceBean> {

    //載入資料字典
    public static Map<String,Integer> provinceDict = new HashMap<String, Integer>();
    static {
        provinceDict.put("135",0);
        provinceDict.put("181",1);
        provinceDict.put("177",2);
        provinceDict.put("170",3);
    }

    public int getPartition(Text key, ProvinceBean flowBean, int numPartitions) {
        Integer id = provinceDict.get(key.toString().substring(0,3));
        System.err.println(key.toString().substring(0,3));
        System.err.println(id);
        return id==null?4:id;

    }
}

程式碼解釋:
1.首先需要將Text型別的值轉換成String型別,呼叫toString方法
2.切割手機號碼的前三位,通過get方法獲得key對應的value值,這個值也可以到資料庫中載入。
3.做一個判斷,判斷是否能得到值,能得值就直接返回map中的value值,得不到值就直接放在另外一個分割槽中。

注意:
如果 reduceTask 的數量> getPartition 的結果數,則會多產生幾個空的輸出檔案part-r-000xx;
如果 1<reduceTask 的數量<getPartition 的結果數,則有一部分分割槽資料無處安放,會Exception;
如果 reduceTask 的數量=1,則不管 mapTask 端輸出多少個分割槽檔案,最終結果都交給這一個 reduceTask,最終也就只會產生一個結果檔案 part-r-00000;
Driver中需要加入partition類的二進位制檔案

//設定自定義的分割槽類
job.setPartitionerClass(ProvincePartition.class);
//同時還需要設定reduce的個數,這個個數跟分割槽的個數相對應
job.setNumReduceTasks(5);