1. 程式人生 > >結合案例講解MapReduce重要知識點 -------- 使用自定義資料實現記憶體排序

結合案例講解MapReduce重要知識點 -------- 使用自定義資料實現記憶體排序

自定義資料WCData

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

/**
 * 自定義資料型別
 * @author lyd
 *
 */
public class WCData implements WritableComparable<WCData>{

	public String word;
	public int counter;
	
	public WCData(){
		
	}
	
	public WCData(String word, int counter) {
		this.word = word;
		this.counter = counter;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(word);
		out.writeInt(counter);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.word = in.readUTF();
		this.counter = in.readInt();
	}

	@Override
	public int compareTo(WCData o) {
		int tmp = o.counter - this.counter; //降序
		//int tmp = this.counter - o.counter; //升序
		if(tmp != 0){
			return tmp;
		} 
		return 0;
	}
	
	
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + counter;
		result = prime * result + ((word == null) ? 0 : word.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		WCData other = (WCData) obj;
		if (counter != other.counter)
			return false;
		if (word == null) {
			if (other.word != null)
				return false;
		} else if (!word.equals(other.word))
			return false;
		return true;
	}

	/**
	 * @return the word
	 */
	public String getWord() {
		return word;
	}

	/**
	 * @param word the word to set
	 */
	public void setWord(String word) {
		this.word = word;
	}

	/**
	 * @return the counter
	 */
	public int getCounter() {
		return counter;
	}

	/**
	 * @param counter the counter to set
	 */
	public void setCounter(int counter) {
		this.counter = counter;
	}

	/* (non-Javadoc)
	 * @see java.lang.Object#toString()
	 */
	@Override
	public String toString() {
		return word + ":" + counter;
	}

	
}

MapReduce類

 import java.io.IOException;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import edu.qianfeng.mr.day01.WordCountWritable;


/**
 * 
 * @author lyd
 *
 *TOP-N
 *求前幾名
 * 資料:
 hello qianfeng hello qianfeng qianfeng is best qianfeng better
 hadoop is good
 spark is nice
 
 取統計後的前三名:
 qianfeng 4
 is 3
 hello 2
 */
public class TopN extends ToolRunner implements Tool{

	/**
	 * 自定義的myMapper
	 * @author lyd
	 *
	 */
	static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{

		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
		}

		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String lines [] = line.split(" ");
			for (String s: lines) {
				context.write(new Text(s), new Text(1+""));
			}
		}

		@Override
		protected void cleanup(Context context)throws IOException, InterruptedException {
		}
		
	}
	
	/**
	 * 自定義MyReducer
	 * @author lyd
	 *
	 */
	static class MyReducer extends Reducer<Text, Text, WCData, NullWritable>{

		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
		
		}
		//獲取treeset物件
		TreeSet<WCData> ts = new TreeSet<WCData>();
		public static final int k = 5;
		@Override
		protected void reduce(Text key, Iterable<Text> value,Context context)
				throws IOException, InterruptedException {
			int counter = 0;
			for (Text t : value) {
				counter += Integer.parseInt(t.toString());
			}
			
			//建立資料物件
			WCData wc = new WCData(key.toString(), counter);
			//將資料物件放到treeset中
			ts.add(wc);
			if(ts.size() > k){
				//移除元素
				ts.remove(ts.last());
			}
			//
			//context.write(wc, null);
		}
		
		@Override
		protected void cleanup(Context context)throws IOException, InterruptedException {
			for (WCData wcData : ts) {
				context.write(wcData, null);
			}
		}
	}
	
	
	@Override
	public void setConf(Configuration conf) {
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
	}

	@Override
	public Configuration getConf() {
		return new Configuration();
	}
	
	/**
	 * 驅動方法
	 */
	@Override
	public int run(String[] args) throws Exception {
		//1、獲取conf物件
		Configuration conf = getConf();
		//2、建立job
		Job job = Job.getInstance(conf, "model01");
		//3、設定執行job的class
		job.setJarByClass(TopN.class);
		//4、設定map相關屬性
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		
		//5、設定reduce相關屬性
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(WCData.class);
		job.setOutputValueClass(NullWritable.class);
		//判斷輸出目錄是否存在,若存在則刪除
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[1]))){
			fs.delete(new Path(args[1]), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//6、提交執行job
		int isok = job.waitForCompletion(true) ? 0 : 1;
		return isok;
	}
	
	/**
	 * job的主入口
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			//對輸入引數作解析
			String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs();
			System.exit(ToolRunner.run(new TopN(), argss));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}