1. 程式人生 > >Hadoop詳解(四)——Shuffle原理,Partitioner分割槽原理,Combiner程式設計,常見的MR演算法

Hadoop詳解(四)——Shuffle原理,Partitioner分割槽原理,Combiner程式設計,常見的MR演算法

package liuxun.hadoop.mr.dc;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DataCountPartition {

	public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean> {

		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			// accept
			String line = value.toString();
			// split
			String[] fields = line.split("\t");
			String tel = fields[1];
			long up = Long.parseLong(fields[8]);
			long down = Long.parseLong(fields[9]);
			DataBean bean = new DataBean(tel, up, down);
			// send
			context.write(new Text(tel), bean);
		}

	}

	public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean> {

		@Override
		protected void reduce(Text key, Iterable<DataBean> values, Context context)
				throws IOException, InterruptedException {
			long up_sum = 0;
			long down_sum = 0;
			for (DataBean bean : values) {
				up_sum += bean.getUpPayLoad();
				down_sum += bean.getDownPayLoad();
			}
			DataBean bean = new DataBean("", up_sum, down_sum);
			context.write(key, bean);
		}

	}

	public static class ProviderPartitioner extends Partitioner<Text, DataBean> {

		private static Map<String, Integer> prividerMap = new HashMap<String, Integer>();
		static {
			// 實際開發時是從資料庫載入這種對映關係的
			// 1:中國移動 2:中國聯通 3:中國電信
			prividerMap.put("135", 1);
			prividerMap.put("136", 1);
			prividerMap.put("137", 1);
			prividerMap.put("150", 2);
			prividerMap.put("159", 2);
			prividerMap.put("182", 3);
			prividerMap.put("183", 3);
		}

		// 此方法的返回值是分割槽號
		// key: mapper一次輸出的key 這裡是手機號
		// key: mapper一次輸出的Value 這裡是DataBean
		// numPartitions:分割槽數量,由Reducer的數量決定,啟動幾個Reducer就會有幾個partition
		@Override
		public int getPartition(Text key, DataBean value, int numPartitions) {
			// 根據手機號得到運營商 此處根據key進行分割槽,實際開發中也可以根據value進行分割槽
			String account = key.toString();
			String sub_acc = account.substring(0, 3);
			Integer code = prividerMap.get(sub_acc);
			if (code == null) {
				code  =0;
			}
			return code;
		}

	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(DataCountPartition.class);

		job.setMapperClass(DCMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(DataBean.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));

		job.setReducerClass(DCReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DataBean.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.setPartitionerClass(ProviderPartitioner.class);
		
		// 設定啟動Reducer的數量
		job.setNumReduceTasks(Integer.parseInt(args[2]));
		
		job.waitForCompletion(true);

	}

}
① 首先將日誌資料上傳至HDFS  ② 將以上程式打包成WCP.jar  —>上傳至Linux主機—>hadoop jar  /日誌地址  /統計結果地址 /reducer數量