1. 程式人生 > >結合案例講解MapReduce重要知識點 -------- 記憶體排序

結合案例講解MapReduce重要知識點 -------- 記憶體排序

TOP N

資料:

hello qianfeng

hello qianfeng

qianfeng is best

qianfeng better

hadoop is good

spark is nice

取統計後的前三名: qianfeng 4 is 3 hello 2

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class MemSort  extends ToolRunner implements Tool{

	/**
	 * 自定義的myMapper
	 */
	static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
		
		Text k = new Text();
		Text v = new Text("1");
        
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String lines [] = line.split(" ");
			for (String s : lines) {
				k.set(s);
				context.write(k, v);
			}
		}
	}
	
	/**
	 * 自定義MyReducer
	 */
	static class MyReducer extends Reducer<Text, Text, Text, Text>{
		
		List<String> li = new ArrayList<String>();
		@Override
		protected void reduce(Text key, Iterable<Text> value,Context context)
				throws IOException, InterruptedException {
			int counter = 0;
			for (Text t : value) {
				counter += Integer.parseInt(t.toString());
			}
			//context.write(new Text(counter+""), key);
			li.add(key.toString()+"_"+counter);
			/**
			 * li(qianfeng_4,is_3,hello_2)
			 */
		}
		
		@Override
		protected void cleanup(Context context)throws IOException, InterruptedException {
			//對ist中的元素的第二個進行排序
			for (int i = 0; i < li.size()-1; i++) {
				for (int j = i+1; j < li.size(); j++) {
					//判斷
					if(Integer.parseInt(li.get(i).split("_")[1]) <
							Integer.parseInt(li.get(j).split("_")[1])){
						String tmp = "";
						tmp = li.get(i);
						li.set(i, li.get(j));
						li.set(j, tmp);
					}
				}
			}
			//輸出
			for (int i = 0; i < 3; i++) {
				String l [] = li.get(i).split("_");
				context.write(new Text(l[0]), new Text(l[1]));
			}
		}
	}
	
	
	@Override
	public void setConf(Configuration conf) {
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
	}

	@Override
	public Configuration getConf() {
		return new Configuration();
	}
	
	/**
	 * 驅動方法
	 */
	@Override
	public int run(String[] args) throws Exception {
		//1、獲取conf物件
		Configuration conf = getConf();
		//2、建立job
		Job job = Job.getInstance(conf, "model01");
		//3、設定執行job的class
		job.setJarByClass(MemSort.class);
		//4、設定map相關屬性
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		
		//5、設定reduce相關屬性
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		//判斷輸出目錄是否存在,若存在則刪除
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[1]))){
			fs.delete(new Path(args[1]), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//6、提交執行job
		int isok = job.waitForCompletion(true) ? 0 : 1;
		return isok;
	}
	
	/**
	 * job的主入口
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			//對輸入引數作解析
			String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs();
			System.exit(ToolRunner.run(new MemSort(), argss));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}