1. 程式人生 > >MapReduce案例——影評分析1(兩表聯合查詢)

MapReduce案例——影評分析1(兩表聯合查詢)

多表聯合常用方式有兩種:reduceJoinmapjoin,其中reducejoin容易造成資料傾斜,對於併發執行的資料檔案來說,常用mapjoin,在mapper階段就完成資料連線,一般不會造成資料傾斜,即使傾斜,資料量也會很小。

使用條件,一張資料量很大的表和一張資料量很小的表,將資料量小的表提前載入到各個節點的記憶體中去,在執行map階段,通過內連線完成組合。

題:

現有如此三份資料:
1、users.dat    資料格式為:  2::M::56::16::70072
對應欄位為:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String
對應欄位中文解釋:使用者id,性別,年齡,職業,郵政編碼

2、movies.dat		資料格式為: 2::Jumanji (1995)::Adventure|Children's|Fantasy
對應欄位為:MovieID BigInt, Title String, Genres String
對應欄位中文解釋:電影ID,電影名字,電影型別

3、ratings.dat		資料格式為:  1::1193::5::978300760
對應欄位為:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String
對應欄位中文解釋:使用者ID,電影ID,評分,評分時間戳

使用者ID,電影ID,評分,評分時間戳,性別,年齡,職業,郵政編碼,電影名字,電影型別
userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType

題目:
(1)求被評分次數最多的10部電影,並給出評分次數(電影名,評分次數)

本案例user.dat和moives.dat資料量較小,ratings.dat資料量較大,首先進行moives.dat和ratings.dat表的聯合

需要先進行一次mapjoin:

package homework1;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * (1)求被評分次數最多的10部電影,並給出評分次數(電影名,評分次數)
 */
public class HomeWork1 {

	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://potter2:9000");//必須要開啟叢集
		System.setProperty("HADOOP_USER_NAME", "potter");
		FileSystem fs = FileSystem.get(conf);
		
		Job job = Job.getInstance(conf, "HomeWork1");
		
		job.setJarByClass(HomeWork1.class);
		
		job.setMapperClass(HomeWork1Mapper.class);
//		job.setReducerClass(HomeWork1Reducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		
		
		job.setNumReduceTasks(0);//不需要Reduce任務
		Path input = new Path(args[0]);
		Path output = new Path(args[1]);
		
		URI uri = new URI(args[2]);//最小檔案的路徑
		// 通過job物件,指定將會快取到各個將要執行maptask的伺服器節點上去的快取檔案的路徑,引數要求是一個URI物件
		job.addCacheFile(uri);//job載入快取檔案
		
		
		
		if (fs.exists(output)) {
			fs.delete(output,true);
		}
		FileInputFormat.setInputPaths(job, input);
		FileOutputFormat.setOutputPath(job, output);
		
		boolean isdone = job.waitForCompletion(true);
		System.exit(isdone ? 0 : 1);
		
	}
	
	
	public static class HomeWork1Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{

		/**
		 * movies.dat
		 * 	1::Toy Story (1995)::Animation|Children's|Comedy
			2::Jumanji (1995)::Adventure|Children's|Fantasy 
			
			ratings.dat
			1::1193::5::978300760
			1::661::3::978302109
		 */
		
		//執行map任務之前提前載入小檔案,將小檔案載入到movieMap中
		private static Map<String, String> movieMap = new HashMap<>();
		
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
//			URI[] cacheFile = context.getCacheFiles();
			
			//通過context獲取所有的快取檔案,並且獲取到快取檔案的路徑資訊
			Path[] localCacheFiles = context.getLocalCacheFiles();
			String strPath = localCacheFiles[0].toUri().toString();
			
			//自定義讀取邏輯去讀這個檔案,然後 進行相關的業務處理
			BufferedReader br = new BufferedReader(new FileReader(strPath));
			String readLine = null;
			
			while ((readLine = br.readLine()) != null) {

				System.out.println(readLine);
				String[] reads = readLine.split("::");
				String movieid = reads[0];
				String moviename = reads[1];
				String movietype = reads[2];
				
				movieMap.put(movieid, moviename+"\t"+movietype);
				
			}
			IOUtils.closeStream(br);
			
			
		}
		
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
				throws IOException, InterruptedException {
			
			String[] split = value.toString().split("::");
			//1::1193::5::978300760  
			//提取電影屬性
			String userid = split[0];
			String movieid = split[1];
			int rate = Integer.parseInt(split[2]);
			long ts = Long.parseLong(split[3]);
			//通過movieid在movieMap中獲取電影遠端和電影型別
			String movieInfo = movieMap.get(movieid);
			String[] movieInfos = movieInfo.split("\t");
			String movieName = movieInfos[0];
			String movieType = movieInfos[1];
			//將資訊組合輸出
			//2::Jumanji (1995)::Adventure|Children's|Fantasy 
			String kk = userid+"\t"+movieid+"\t"+rate+"\t"+ts+"\t"+movieName+"\t"+movieType;
			
			context.write(new Text(kk), NullWritable.get());
			
		}
	}
	
	public static class HomeWork1Reducer extends Reducer<Text, NullWritable,Text, NullWritable>{
		
		@Override
		protected void reduce(Text key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {
			
			
			
			
			
		}
	}
}

執行時,需要打成jar包,上傳到分散式叢集環境上執行,上傳資料:

需要輸入命令:hadoop jar mapjoin.jar homework1.HomeWork1 /movies_rate/input/ ratings.dat /movies_rate/output /movies_rate/input/movies.dat

部分結果展示:

1	1193	5	978300760	One Flew Over the Cuckoo's Nest (1975)	Drama
1	661	3	978302109	James and the Giant Peach (1996)	Animation|Children's|Musical
1	914	3	978301968	My Fair Lady (1964)	Musical|Romance
1	3408	4	978300275	Erin Brockovich (2000)	Drama
1	2355	5	978824291	Bug's Life, A (1998)	Animation|Children's|Comedy
1	1197	3	978302268	Princess Bride, The (1987)	Action|Adventure|Comedy|Romance
1	1287	5	978302039	Ben-Hur (1959)	Action|Adventure|Drama
1	2804	5	978300719	Christmas Story, A (1983)	Comedy|Drama
1	594	4	978302268	Snow White and the Seven Dwarfs (1937)	Animation|Children's|Musical
1	919	4	978301368	Wizard of Oz, The (1939)	Adventure|Children's|Drama|Musical
1	595	5	978824268	Beauty and the Beast (1991)	Animation|Children's|Musical
1	938	4	978301752	Gigi (1958)	Musical
1	2398	4	978302281	Miracle on 34th Street (1947)	Drama
1	2918	4	978302124	Ferris Bueller's Day Off (1986)	Comedy
1	1035	5	978301753	Sound of Music, The (1965)	Musical
1	2791	4	978302188	Airplane! (1980)	Comedy
1	2687	3	978824268	Tarzan (1999)	Animation|Children's
1	2018	4	978301777	Bambi (1942)	Animation|Children's
1	3105	5	978301713	Awakenings (1990)	Drama
1	2797	4	978302039	Big (1988)	Comedy|Fantasy
1	2321	3	978302205	Pleasantville (1998)	Comedy
1	720	3	978300760	Wallace & Gromit: The Best of Aardman Animation (1996)	Animation
1	1270	5	978300055	Back to the Future (1985)	Comedy|Sci-Fi
1	527	5	978824195	Schindler's List (1993)	Drama|War
1	2340	3	978300103	Meet Joe Black (1998)	Romance
1	48	5	978824351	Pocahontas (1995)	Animation|Children's|Musical|Romance
1	1097	4	978301953	E.T. the Extra-Terrestrial (1982)	Children's|Drama|Fantasy|Sci-Fi
1	1721	4	978300055	Titanic (1997)	Drama|Romance
1	1545	4	978824139	Ponette (1996)	Drama
1	745	3	978824268	Close Shave, A (1995)	Animation|Comedy|Thriller
1	2294	4	978824291	Antz (1998)	Animation|Children's
1	3186	4	978300019	Girl, Interrupted (1999)	Drama
1	1566	4	978824330	Hercules (1997)	Adventure|Animation|Children's|Comedy|Musical
1	588	4	978824268	Aladdin (1992)	Animation|Children's|Comedy|Musical

完成moives.dat和ratings.dat兩個表之間的拼接:接下來根據需求進行操作,為方便操作需要將拼接好的資料下載至本地(如果拼接量很大,需要在叢集環境中執行):

題目:
(1)求被評分次數最多的10部電影,並給出評分次數(電影名,評分次數)

業務需求:    topN的求取,以電影名(或者電影id)為key值,通過電影名和評分次數進行降序排序,然後取前10條記錄

根據業務需求,需要分兩步,第一步求取不同電影的評分次數全部檔案,第二步,對上一步檔案結果進行排序,並且取前10行記錄

需要進行兩次MapReduce,在這裡用了依賴:

程式碼如下:

package homework1;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HomeWork1_2 {

	public static void main(String[] args) throws Exception {
		//第一個job
		Configuration conf = new Configuration();
//		conf.set("fs.defaultFS", "hdfs://potter2:9000");
//		System.setProperty("HADOOP_USER_NAME", "potter");
		FileSystem fs = FileSystem.get(conf);
		
		Job job = Job.getInstance(conf, "HomeWork1");
		
		job.setJarByClass(HomeWork1_2.class);
		
		job.setMapperClass(HomeWork1_2Mapper.class);
		job.setReducerClass(HomeWork1_2Reducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		
//		job.setNumReduceTasks(0);
		Path input = new Path("D:\\MapRedaucehomework\\movieinput\\input1\\part-m-00000");
		Path output = new Path("D:\\MapRedaucehomework\\movieinput\\input1\\output1");
		
		
		if (fs.exists(output)) {
			fs.delete(output,true);
		}
		FileInputFormat.setInputPaths(job, input);
		FileOutputFormat.setOutputPath(job, output);
		//=============================================
		//第二個job
		FileSystem fs2 = FileSystem.get(conf);
		
		Job job2 = Job.getInstance();
		job2.setJarByClass(HomeWork1_2.class);
		job2.setMapperClass(HomeWork1_2MRMapper.class);
		job2.setReducerClass(HomeWork1_2MRReducer.class);
		
		job2.setMapOutputKeyClass(MovieRate.class);
		job2.setMapOutputValueClass(NullWritable.class);
		
		job2.setOutputKeyClass(MovieRate.class);
		job2.setOutputValueClass(NullWritable.class);
		
		Path input2 = new Path("D:\\MapRedaucehomework\\movieinput\\input1\\output1");
		Path output2 = new Path("D:\\MapRedaucehomework\\movieinput\\input1\\output1_1");
		
		if (fs2.exists(output2)) {
			fs2.delete(output2,true);
		}
		FileInputFormat.setInputPaths(job2, input2);
		FileOutputFormat.setOutputPath(job2, output2);
		
		
		
		ControlledJob step1Job = new ControlledJob(job.getConfiguration());
		ControlledJob step2Job = new ControlledJob(job2.getConfiguration());
		
		step1Job.setJob(job);
		step2Job.setJob(job2);
		
		JobControl jc = new JobControl("JC");
		
		jc.addJob(step1Job);
		jc.addJob(step2Job);
		//依賴
		step2Job.addDependingJob(step1Job);
		
		Thread thread = new Thread(jc);
		thread.start();
		// 每隔一段時間來判斷一下該jc執行緒的任務是否執行完成
		while (!jc.allFinished()) {

			Thread.sleep(1000);
		}
		jc.stop();
		
		
		
		
	}
	public static class HomeWork1_2Mapper extends Mapper<LongWritable,Text, Text, Text>{
		
		//1	1193	5	978300760	One Flew Over the Cuckoo's Nest (1975)	Drama
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			
			String[] split = value.toString().split("\t");
			//取電影ID
			String kk = split[1];
			//取電影名稱
			String aa = split[4];
			
			context.write(new Text(kk), new Text(aa));
		}
	}
	
	public static class HomeWork1_2Reducer extends Reducer<Text, Text, Text, NullWritable>{
		
		
		@Override
		protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
			int countName = 0;
			String movieName = "";
			//對電影名稱進行遍歷,求出對該電影的評分次數
			for(Text text : values){
				
				countName++;
				movieName = text.toString();
			}
			String qq = key.toString() +"\t"+ movieName+"\t"+ countName;
			
			context.write(new Text(qq), NullWritable.get());
		}
		
	}
	
	
	
	//reduce階段不能實現排序,所以需要在使用另一個MapReduce進行排序,取前10
	public static class HomeWork1_2MRMapper extends Mapper<LongWritable, Text, MovieRate, NullWritable>{
		//1	Toy Story (1995)	2077
		MovieRate ee = new MovieRate();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			
			String[] split = value.toString().split("\t");
			ee.setUserid(split[0]);
			ee.setMovieName(split[1]);
			ee.setCountName(Integer.parseInt(split[2]));
			context.write(ee, NullWritable.get());
		}
		
	}
	
	public static class HomeWork1_2MRReducer extends Reducer<MovieRate, NullWritable, MovieRate, NullWritable>{
		int count = 0;//一定要將這個變數設在reduce外面,不能再for迴圈上面設定
		@Override
		protected void reduce(MovieRate key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {
			//取前10個
			for(NullWritable in : values){
				count++;
				if (count <= 10) {
					context.write(key, NullWritable.get());
				}else {
					return;
				}
				
			}
			
			
		}
		
		
	}
	
	
	
}

需要對第一個mapreduce的自定義排序:

package homework1;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class MovieRate implements WritableComparable<MovieRate>{

	
	private String userid;
	private String movieName;
	private int countName;
	public String getUserid() {
		return userid;
	}
	public void setUserid(String userid) {
		this.userid = userid;
	}
	public String getMovieName() {
		return movieName;
	}
	public void setMovieName(String movieName) {
		this.movieName = movieName;
	}
	public int getCountName() {
		return countName;
	}
	public void setCountName(int countName) {
		this.countName = countName;
	}
	public MovieRate(String userid, String movieName, int countName) {
		super();
		this.userid = userid;
		this.movieName = movieName;
		this.countName = countName;
	}
	public MovieRate() {
		// TODO Auto-generated constructor stub
	}
	@Override
	public String toString() {
		return userid + "\t" + movieName + "\t" + countName;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		
		out.writeUTF(userid);
		out.writeUTF(movieName);
		out.writeInt(countName);
		
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		
		userid = in.readUTF();
		movieName = in.readUTF();
		countName = in.readInt();
		
	}
	@Override
	public int compareTo(MovieRate o) {
		
		int ff = this.countName - o.countName;
		if (ff == 0) {
			return 0;
		}else {
			return ff > 0 ? -1 : 1;
		}
		
		
	}
	





}

第一個MapReduce執行結果:擷取部分

1	Toy Story (1995)	2077
10	GoldenEye (1995)	888
100	City Hall (1996)	128
1000	Curdled (1996)	20
1002	Ed's Next Move (1996)	8
1003	Extreme Measures (1996)	121
1004	Glimmer Man, The (1996)	101
1005	D3: The Mighty Ducks (1996)	142
1006	Chamber, The (1996)	78
1007	Apple Dumpling Gang, The (1975)	232
1008	Davy Crockett, King of the Wild Frontier (1955)	97
1009	Escape to Witch Mountain (1975)	291
101	Bottle Rocket (1996)	253
1010	Love Bug, The (1969)	242
1011	Herbie Rides Again (1974)	135
1012	Old Yeller (1957)	301
1013	Parent Trap, The (1961)	258
1014	Pollyanna (1960)	136
1015	Homeward Bound: The Incredible Journey (1993)	234
1016	Shaggy Dog, The (1959)	156
1017	Swiss Family Robinson (1960)	276
1018	That Darn Cat! (1965)	123
1019	20,000 Leagues Under the Sea (1954)	575
102	Mr. Wrong (1996)	60
1020	Cool Runnings (1993)	392
1021	Angels in the Outfield (1994)	247
1022	Cinderella (1950)	577
1023	Winnie the Pooh and the Blustery Day (1968)	221
1024	Three Caballeros, The (1945)	126
1025	Sword in the Stone, The (1963)	293
1026	So Dear to My Heart (1949)	8
1027	Robin Hood: Prince of Thieves (1991)	344
1028	Mary Poppins (1964)	1011

第二個MapReduce執行結果(top10)

2858	American Beauty (1999)	3428
260	Star Wars: Episode IV - A New Hope (1977)	2991
1196	Star Wars: Episode V - The Empire Strikes Back (1980)	2990
1210	Star Wars: Episode VI - Return of the Jedi (1983)	2883
480	Jurassic Park (1993)	2672
2028	Saving Private Ryan (1998)	2653
589	Terminator 2: Judgment Day (1991)	2649
2571	Matrix, The (1999)	2590
1270	Back to the Future (1985)	2583
593	Silence of the Lambs, The (1991)	2578
至此完成!!!