1. 程式人生 > >大資料筆記07--MR案例開發

大資料筆記07--MR案例開發

wordcount

案例需求

統計輸入的檔案中,每個單詞出現了幾次

分析設計

  1. 在map中將輸入的每條資料切割成單詞,將key為單詞,value為1的計算結果輸出
  2. 預設的分組器會將相同key(單詞)的資料分為一組,輸入reduce
  3. 在reduce中,遍歷輸入的資料,將value加和(sum),輸出單詞和sum到檔案中

程式碼

  • 主類
public class MyWC {
	public static void main(String[] args) throws Exception {
		//獲取conf物件
		Configuration conf = new Configuration
(true); //獲取任務物件job對 Job job = Job.getInstance(conf); //設定當前main函式所在類 job.setJarByClass(MyWC.class); //設定當前應用的jar包所在路徑 job.setJar(""); //設定輸入路徑 FileInputFormat.setInputPaths(job, "/test/"); //建立輸出路徑的物件 Path outPath = new Path("/output"); //若輸出路徑存在 則刪除 FileSystem fs = outPath.getFileSystem
(conf); if(fs.exists(outPath)){ fs.delete(outPath,true); } //設定輸出路徑 FileOutputFormat.setOutputPath(job, outPath); //設定Map的Class job.setMapperClass(MyWCMapper.class); //設定Map輸出的key、value型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //設定reduce的class
job.setReducerClass(MyWCReducer.class); //設定reduce的個數 job.setNumReduceTasks(2); //執行Job true表示返回執行資訊 job.waitForCompletion(true); } }
  • Map
public class MyWCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	//建立key和value的物件
	Text myKey = new Text();
	IntWritable myValue = new IntWritable(1);
	/*
	 * 重寫map方法
	 */
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		//用hadoop提供的StringUtils類的split方法切割字串
		String[] words = StringUtils.split(value.toString(),' ');
		//遍歷輸出
		for(String word : words){
			myKey.set(word);
			context.write(myKey, myValue);
		}
	}
}
  • Reduce
public class MyWCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	/**
	 * 重寫reduce方法
	 */
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
			throws IOException, InterruptedException {

		int sum = 0;
		/**
		 * 一組資料呼叫一次reduce
		 * 所以和map一條條讀kv對資料不同,reduce讀入一組資料,所以是values
		 * key是一個指標,指向對應value的key值(此處因為是由預設分組器分組,所以一次讀入的values的key相同)
		 */
		for(IntWritable value : values){
			sum += value.get();
		}
		context.write(key, new IntWritable(sum));
	}
}

溫度統計

案例需求

對下例所示的溫度資料,篩選出每個月溫度最高的兩天

	1949-10-01 14:21:02	34c
	1949-10-01 19:21:02	38c
	1949-10-02 14:01:02	36c
	1950-01-01 11:21:02	32c
	1950-10-01 12:21:02	37c
	1951-12-01 12:21:02	23c
	1950-10-02 12:21:02	41c
	1950-10-03 12:21:02	27c
	1951-07-01 12:21:02	45c
	1951-07-02 12:21:02	46c
	1951-07-03 12:21:03	47c

分析設計

  1. 在map中將切割資訊,捨棄時間,將年月日和溫度封裝到自定義物件中作為key,value為null
  2. 自定義分組器將年和月相同的資料分到一組
  3. 在reduce中,遍歷key(已根據年、月、溫度排序),輸出前兩條日期不重複的資料,即溫度最高的兩天

程式碼

  • 自定義物件
public class MyTQ implements WritableComparable<MyTQ>{
	
	private int year;
	private int month;
	private int day;
	private double temp;
	
	/**
	*Getter和 Setter省略
	*/
	
	//toString方法,輸出資料時會呼叫
	@Override
	public String toString() {
		return year + "-" + month + "-" + day + ":" + temp;
	}
	//序列化方法
	@Override
	public void write(DataOutput out) throws IOException {
		 out.writeInt(year);
		 out.writeInt(month);
		 out.writeInt(day);
		 out.writeDouble(temp);
	}
	//反序列化
	@Override
	public void readFields(DataInput in) throws IOException {
		this.year = in.readInt();
		this.month = in.readInt();
		this.day = in.readInt();
		this.temp = in.readDouble();
	}
	
	//重寫compareTo方法
	//完成三次排序
	@Override
	public int compareTo(MyTQ o) {
		//先比較年份
		int yc = Integer.compare(this.year, o.getYear());
		if(yc == 0){ //若年份相同
			//再比較月份
			int mc = Integer.compare(this.month, o.getMonth());
			//若兩個mouth相等,直接返回溫度的比較結果
			if(mc == 0){
				return Double.compare(this.temp, o.getTemp());
			}else{
				//month不相等,返回month的比較結果
				return mc;
			}
		}else{
			//若兩個year不相同,返回year的結果
			return yc;
		}
		/**
		 * 直接返回yc、mc、this-o
		 * this的值小於o的值返回負數,大於返回正數
		 * 最終排序的結果是升序
		 * 若改為o-this 排序結果是降序
		 */
	}
	
}
  • 主類
public class MyTQMR {
	public static void main(String[] args) throws Exception {
		//建立配置物件
		 Configuration conf = new Configuration(true);
		 //建立job物件
		 Job job = Job.getInstance(conf);
		 job.setJarByClass(MyTQMR.class);

		 //輸入路徑
		 FileInputFormat.setInputPaths(job, "");
		 //輸出路徑
		 Path path = new Path("");
		 FileSystem fileSystem = path.getFileSystem(conf);
		 if(fileSystem.exists(path)){
			 fileSystem.delete(path,true);
		 }
		 FileOutputFormat.setOutputPath(job, path);
		 
		 //設定map和輸出的kv型別
		 job.setMapperClass(MyTQMapper.class);
		 job.setMapOutputKeyClass(MyTQ.class);
		 job.setMapOutputValueClass(NullWritable.class);
		 
		 //設定reduce類
		 job.setReducerClass(MyTQReducer.class);
		 
		 //設定自定義的分組器
		 job.setGroupingComparatorClass(MyTQGroupComparator.class);
		 
		 //執行任務
		 job.waitForCompletion(true);
	}
}
  • Map
public class MyTQMapper extends Mapper<LongWritable, Text, MyTQ, NullWritable> {
	
	MyTQ myKey = new MyTQ();

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//根據‘\t’切割出溫度
		String[] split1 = StringUtils.split(value.toString(), '\t');
		Double temp = Double.parseDouble(split1[1].split("c")[0]);//去除後面的c
		//根據‘ ’切割時間和日期
		String[] split2 = StringUtils.split(split1[1], ' ');
		//捨棄時間(split2[1])
		//切割年月日
		String[] split3 = StringUtils.split(split2[0], '-');
		int year = Integer.parseInt(split3[0]);
		int month = Integer.parseInt(split3[1]);
		int day = Integer.parseInt(split3[2]);
		
		//將值設定給物件
		myKey.setYear(year);
		myKey.setMonth(month);
		myKey.setDay(day);
		myKey.setTemp(temp);
		
		//寫入下一步
		context.write(myKey, NullWritable.get());
	}
}
  • Reduce
public class MyTQReducer extends Reducer<MyTQ, NullWritable, MyTQ, NullWritable> {
	@Override
	protected void reduce(MyTQ key, Iterable<NullWritable> values,
			Context context) throws IOException, InterruptedException {
		int flag = 0;//記錄讀取的記錄數
		int day = 0;//記錄已讀的日期
		
		//該邏輯僅適用於讀取兩條資料
		for (NullWritable value : values){
			//讀第一條資料時 flag為0 讀第二條資料時不進入這段程式碼
			if(flag == 0){
				//將結果輸出,並將日期賦給day
				context.write(key, value);
				day = key.getDay();
			}
			//讀第二條資料時 若日期相等則跳過 讀下一條 不相等時才進入這段程式碼
			if(flag != 0 && day != key.getDay()){
				//將第二條資料輸出 並跳出迴圈
				context.write(key, value);
				break;
			}
			flag++;
		}
	}
}
  • 自定義的分組器
public class MyTQGroupComparator extends WritableComparator {
	/**
	 * 無參構造方法呼叫父類構造方法
	 */
	public MyTQGroupComparator() {
		super(MyTQ.class,true);
	}
	
	//重寫compare(WritableComparable a, WritableComparable b)方法
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		//獲取需要比較的值 的反序列化後的物件
		MyTQ t1 = (MyTQ) a;
		MyTQ t2 = (MyTQ) b;
		
		//用Integer的compare方法比較year和month
		int yc = Integer.compare(t1.getYear(), t2.getYear());
		int mc = Integer.compare(t1.getMonth(), t2.getMonth());
		/*
		 * 返回值僅用於兩種狀態的判斷 
		 * 為0時 分在同一組(即年月相同)
		 * 不為0時 不分在同一組
		 */
		if(yc == 0){
			return mc;
		}else{
			return yc;
		}
	}
}

涉及到的類

NullWritable

和IntWritable、Text等類似是對資料型別進行可序列化封裝的封裝類,但NullWritable類似一個佔位符,用於value值需要為空的情況,不會輸出到檔案中,也能避免因map中輸出value為null而造成之後出現空指標錯誤 無法例項化(構造方法為private),通過靜態方法get()獲取例項化物件,詳見原始碼

WritableComparable<T>介面

自定義物件時需要實現該介面,重寫序列化方法write(DataOutput out)、反序列化方法readFields(DataInput in)、排序方法int CompareTo(MyTQ o)

WritableComparator類

  • 自定義分組器時對key分組,需要繼承WritableComparator類(下稱WC),重寫無參構造方法呼叫父類的構造方法,傳入引數自定義物件MyTQ.class和引數true
  • WC中有兩個WritableComparable介面(即MyTQ的父介面)的指標key1,key2,和DataInputBuffer類(繼承DataInputStream,DataInputStream實現DataInput介面)的物件buffer
  • 呼叫分組器時,構造方法將反射建立MyTQ物件賦給key1,key2指標(傳入的true此時生效,若是false則不會建立物件,key1、key2賦為null),並將buffer例項化
  • 之後呼叫WC中的compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法,將byte陣列b1的s1到l1位置的位元組(即以位元組的形式從檔案中讀取的一條資料)放入buffer,呼叫key1的readFields方法傳入buffer,完成反序列化,將值賦給key1中各屬性,對key2同樣
  • 賦值完後該compare方法會呼叫compare(key1,key2),即自定義分組器中重寫的方法
  • 詳見WritableComparator類原始碼

推薦好友

案例需求

如下例所示的好友表,每行第一個詞表示其本人,剩餘詞表示其好友 找出間接好友關係,如hello和hadoop同為tom的好友且他們兩人不是直接好友,則兩人為間接好友 統計每對簡介好友出現的次數,作為好友推薦的依據

tom	hello	hadoop	cat
world	hadoop	hello	hive
cat	tom	hive
mr	hive	hello
hive	cat	hadoop	world	hello	mr
hadoop	tom	hive	world
hello	tom	world	hive	mr

分析設計

  1. 在map中,
  2. 分組器
  3. 在reduce中,

程式碼