1. 程式人生 > >MapReduce中的分割槽方法Partitioner

MapReduce中的分割槽方法Partitioner

在進行MapReduce計算時,有時候需要把最終的輸出資料分到不同的檔案中,比如按照省份劃分的話,需要把同一個省份的資料放到一個檔案中,按照性別劃分的話,需要把同一個性別的資料放到一個檔案中.我們知道最終的輸出資料是來自Reducer任務的,那麼如果要得到多個檔案,意味著有同樣數的Reducer任務在執行.
Reducer任務的資料來自於Mapper任務,也就是說Mapper任務要劃分資料,對於不同的資料分配給不同的Reducer任務執行.
Mapper任務劃分資料的過程就稱作Partition.負責實現劃分資料的類稱作Partitioner

package com.thp.bigdata.provinceflow;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable {

	
	private long upFlow;  // 上行流量
	private long downFlow;  // 下行流量
	private long sumFlow;   // 總流量
	
	// 反序列化時,需要反射呼叫空參建構函式,所以要顯式定義一個
	public FlowBean() {}

	public FlowBean(long upFlow, long downFlow) {
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}
	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

	/**
	 * 序列化方法
	 */
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}
	
	/**
	 * 反序列化方法:
	 * 注意 : 反序列化的順序跟序列化的順序完全一致
	 */
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow = in.readLong();
		downFlow = in.readLong();
		sumFlow = in.readLong();
	}

	// 輸出列印的時候呼叫的是toString() 方法
	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}
}

package com.thp.bigdata.provinceflow;

import java.util.HashMap;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean>{

	
	public static HashMap<String, Integer> provinceDict = new HashMap<String, Integer>();
	
	static {
		provinceDict.put("136", 0);
		provinceDict.put("137", 1);
		provinceDict.put("138", 2);
		provinceDict.put("139", 3);
	}
	
	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) {
		String prefix =  key.toString().substring(0, 3);
		Integer pronviceId = provinceDict.get(prefix);
		return pronviceId == null ? 4 : pronviceId;
	}

}

package com.thp.bigdata.provinceflow;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCount {

	
	static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// 將一行內容轉換成string
			String line = value.toString();
			// 切分欄位
			String[] fields = line.split("\t");
			// 取出手機號
			String phoneNumber = fields[1];
			// 取出上行流量和下行流量
			long upFlow = Long.parseLong(fields[fields.length - 3]);
			long downFlow = Long.parseLong(fields[fields.length - 2]);
			context.write(new Text(phoneNumber), new FlowBean(upFlow, downFlow));
		}
	}
 	
	
	static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
		@Override
		protected void reduce(Text key, Iterable<FlowBean> values, Context context)
				throws IOException, InterruptedException {
			long sum_upFlow = 0;
			long sum_downFlow = 0;
			// 遍歷所有的bean,將其中的上行流量,下行流量分別累加
			for(FlowBean bean : values) {
				sum_upFlow += bean.getUpFlow();
				sum_downFlow += bean.getDownFlow();
			}
			FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
			context.write(key, resultBean);
		}
	}
	
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		//指定本程式的jar包所在的本地路徑
		job.setJarByClass(FlowCount.class);
		
		//指定本業務job要使用的mapper/Reducer業務類
		job.setMapperClass(FlowCountMapper.class);
		job.setReducerClass(FlowCountReducer.class);
		
		// 指定我們自定義的資料分割槽器
		job.setPartitionerClass(ProvincePartitioner.class);
		// 同時指定相應"分割槽"數量的reducetask
		job.setNumReduceTasks(5);
		
		//指定mapper輸出資料的kv型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlowBean.class);
		
		//指定最終輸出的資料的kv型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		//指定job的輸入原始檔案所在目錄
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		//指定job的輸出結果所在目錄
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
		boolean res = job.waitForCompletion(true);
		System.exit(res ? 0 : 1);
		
	}
	
}

在這裡插入圖片描述

在這裡插入圖片描述

程式碼地址 :
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/provinceflow