MapReduce案例——影評分析1(兩表聯合查詢)
阿新 • • 發佈:2019-01-03
多表聯合常用方式有兩種:reduceJoin和mapjoin,其中reducejoin容易造成資料傾斜,對於併發執行的資料檔案來說,常用mapjoin,在mapper階段就完成資料連線,一般不會造成資料傾斜,即使傾斜,資料量也會很小。
使用條件,一張資料量很大的表和一張資料量很小的表,將資料量小的表提前載入到各個節點的記憶體中去,在執行map階段,通過內連線完成組合。
題:
現有如此三份資料: 1、users.dat 資料格式為: 2::M::56::16::70072 對應欄位為:UserID BigInt, Gender String, Age Int, Occupation String, Zipcode String 對應欄位中文解釋:使用者id,性別,年齡,職業,郵政編碼 2、movies.dat 資料格式為: 2::Jumanji (1995)::Adventure|Children's|Fantasy 對應欄位為:MovieID BigInt, Title String, Genres String 對應欄位中文解釋:電影ID,電影名字,電影型別 3、ratings.dat 資料格式為: 1::1193::5::978300760 對應欄位為:UserID BigInt, MovieID BigInt, Rating Double, Timestamped String 對應欄位中文解釋:使用者ID,電影ID,評分,評分時間戳 使用者ID,電影ID,評分,評分時間戳,性別,年齡,職業,郵政編碼,電影名字,電影型別 userid, movieId, rate, ts, gender, age, occupation, zipcode, movieName, movieType 題目: (1)求被評分次數最多的10部電影,並給出評分次數(電影名,評分次數)
本案例user.dat和moives.dat資料量較小,ratings.dat資料量較大,首先進行moives.dat和ratings.dat表的聯合
需要先進行一次mapjoin:
package homework1; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * (1)求被評分次數最多的10部電影,並給出評分次數(電影名,評分次數) */ public class HomeWork1 { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://potter2:9000");//必須要開啟叢集 System.setProperty("HADOOP_USER_NAME", "potter"); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf, "HomeWork1"); job.setJarByClass(HomeWork1.class); job.setMapperClass(HomeWork1Mapper.class); // job.setReducerClass(HomeWork1Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setNumReduceTasks(0);//不需要Reduce任務 Path input = new Path(args[0]); Path output = new Path(args[1]); URI uri = new URI(args[2]);//最小檔案的路徑 // 通過job物件,指定將會快取到各個將要執行maptask的伺服器節點上去的快取檔案的路徑,引數要求是一個URI物件 job.addCacheFile(uri);//job載入快取檔案 if (fs.exists(output)) { fs.delete(output,true); } FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); boolean isdone = job.waitForCompletion(true); System.exit(isdone ? 0 : 1); } public static class HomeWork1Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{ /** * movies.dat * 1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy ratings.dat 1::1193::5::978300760 1::661::3::978302109 */ //執行map任務之前提前載入小檔案,將小檔案載入到movieMap中 private static Map<String, String> movieMap = new HashMap<>(); @Override protected void setup(Context context) throws IOException, InterruptedException { // URI[] cacheFile = context.getCacheFiles(); //通過context獲取所有的快取檔案,並且獲取到快取檔案的路徑資訊 Path[] localCacheFiles = context.getLocalCacheFiles(); String strPath = localCacheFiles[0].toUri().toString(); //自定義讀取邏輯去讀這個檔案,然後 進行相關的業務處理 BufferedReader br = new BufferedReader(new FileReader(strPath)); String readLine = null; while ((readLine = br.readLine()) != null) { System.out.println(readLine); String[] reads = readLine.split("::"); String movieid = reads[0]; String moviename = reads[1]; String movietype = reads[2]; movieMap.put(movieid, moviename+"\t"+movietype); } IOUtils.closeStream(br); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { String[] split = value.toString().split("::"); //1::1193::5::978300760 //提取電影屬性 String userid = split[0]; String movieid = split[1]; int rate = Integer.parseInt(split[2]); long ts = Long.parseLong(split[3]); //通過movieid在movieMap中獲取電影遠端和電影型別 String movieInfo = movieMap.get(movieid); String[] movieInfos = movieInfo.split("\t"); String movieName = movieInfos[0]; String movieType = movieInfos[1]; //將資訊組合輸出 //2::Jumanji (1995)::Adventure|Children's|Fantasy String kk = userid+"\t"+movieid+"\t"+rate+"\t"+ts+"\t"+movieName+"\t"+movieType; context.write(new Text(kk), NullWritable.get()); } } public static class HomeWork1Reducer extends Reducer<Text, NullWritable,Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException { } } }
執行時,需要打成jar包,上傳到分散式叢集環境上執行,上傳資料:
需要輸入命令:hadoop jar mapjoin.jar homework1.HomeWork1 /movies_rate/input/ ratings.dat /movies_rate/output /movies_rate/input/movies.dat
部分結果展示:
1 1193 5 978300760 One Flew Over the Cuckoo's Nest (1975) Drama 1 661 3 978302109 James and the Giant Peach (1996) Animation|Children's|Musical 1 914 3 978301968 My Fair Lady (1964) Musical|Romance 1 3408 4 978300275 Erin Brockovich (2000) Drama 1 2355 5 978824291 Bug's Life, A (1998) Animation|Children's|Comedy 1 1197 3 978302268 Princess Bride, The (1987) Action|Adventure|Comedy|Romance 1 1287 5 978302039 Ben-Hur (1959) Action|Adventure|Drama 1 2804 5 978300719 Christmas Story, A (1983) Comedy|Drama 1 594 4 978302268 Snow White and the Seven Dwarfs (1937) Animation|Children's|Musical 1 919 4 978301368 Wizard of Oz, The (1939) Adventure|Children's|Drama|Musical 1 595 5 978824268 Beauty and the Beast (1991) Animation|Children's|Musical 1 938 4 978301752 Gigi (1958) Musical 1 2398 4 978302281 Miracle on 34th Street (1947) Drama 1 2918 4 978302124 Ferris Bueller's Day Off (1986) Comedy 1 1035 5 978301753 Sound of Music, The (1965) Musical 1 2791 4 978302188 Airplane! (1980) Comedy 1 2687 3 978824268 Tarzan (1999) Animation|Children's 1 2018 4 978301777 Bambi (1942) Animation|Children's 1 3105 5 978301713 Awakenings (1990) Drama 1 2797 4 978302039 Big (1988) Comedy|Fantasy 1 2321 3 978302205 Pleasantville (1998) Comedy 1 720 3 978300760 Wallace & Gromit: The Best of Aardman Animation (1996) Animation 1 1270 5 978300055 Back to the Future (1985) Comedy|Sci-Fi 1 527 5 978824195 Schindler's List (1993) Drama|War 1 2340 3 978300103 Meet Joe Black (1998) Romance 1 48 5 978824351 Pocahontas (1995) Animation|Children's|Musical|Romance 1 1097 4 978301953 E.T. the Extra-Terrestrial (1982) Children's|Drama|Fantasy|Sci-Fi 1 1721 4 978300055 Titanic (1997) Drama|Romance 1 1545 4 978824139 Ponette (1996) Drama 1 745 3 978824268 Close Shave, A (1995) Animation|Comedy|Thriller 1 2294 4 978824291 Antz (1998) Animation|Children's 1 3186 4 978300019 Girl, Interrupted (1999) Drama 1 1566 4 978824330 Hercules (1997) Adventure|Animation|Children's|Comedy|Musical 1 588 4 978824268 Aladdin (1992) Animation|Children's|Comedy|Musical
完成moives.dat和ratings.dat兩個表之間的拼接:接下來根據需求進行操作,為方便操作需要將拼接好的資料下載至本地(如果拼接量很大,需要在叢集環境中執行):
題目:
(1)求被評分次數最多的10部電影,並給出評分次數(電影名,評分次數)
業務需求: topN的求取,以電影名(或者電影id)為key值,通過電影名和評分次數進行降序排序,然後取前10條記錄
根據業務需求,需要分兩步,第一步求取不同電影的評分次數全部檔案,第二步,對上一步檔案結果進行排序,並且取前10行記錄
需要進行兩次MapReduce,在這裡用了依賴:
程式碼如下:
package homework1;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HomeWork1_2 {
public static void main(String[] args) throws Exception {
//第一個job
Configuration conf = new Configuration();
// conf.set("fs.defaultFS", "hdfs://potter2:9000");
// System.setProperty("HADOOP_USER_NAME", "potter");
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf, "HomeWork1");
job.setJarByClass(HomeWork1_2.class);
job.setMapperClass(HomeWork1_2Mapper.class);
job.setReducerClass(HomeWork1_2Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setNumReduceTasks(0);
Path input = new Path("D:\\MapRedaucehomework\\movieinput\\input1\\part-m-00000");
Path output = new Path("D:\\MapRedaucehomework\\movieinput\\input1\\output1");
if (fs.exists(output)) {
fs.delete(output,true);
}
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
//=============================================
//第二個job
FileSystem fs2 = FileSystem.get(conf);
Job job2 = Job.getInstance();
job2.setJarByClass(HomeWork1_2.class);
job2.setMapperClass(HomeWork1_2MRMapper.class);
job2.setReducerClass(HomeWork1_2MRReducer.class);
job2.setMapOutputKeyClass(MovieRate.class);
job2.setMapOutputValueClass(NullWritable.class);
job2.setOutputKeyClass(MovieRate.class);
job2.setOutputValueClass(NullWritable.class);
Path input2 = new Path("D:\\MapRedaucehomework\\movieinput\\input1\\output1");
Path output2 = new Path("D:\\MapRedaucehomework\\movieinput\\input1\\output1_1");
if (fs2.exists(output2)) {
fs2.delete(output2,true);
}
FileInputFormat.setInputPaths(job2, input2);
FileOutputFormat.setOutputPath(job2, output2);
ControlledJob step1Job = new ControlledJob(job.getConfiguration());
ControlledJob step2Job = new ControlledJob(job2.getConfiguration());
step1Job.setJob(job);
step2Job.setJob(job2);
JobControl jc = new JobControl("JC");
jc.addJob(step1Job);
jc.addJob(step2Job);
//依賴
step2Job.addDependingJob(step1Job);
Thread thread = new Thread(jc);
thread.start();
// 每隔一段時間來判斷一下該jc執行緒的任務是否執行完成
while (!jc.allFinished()) {
Thread.sleep(1000);
}
jc.stop();
}
public static class HomeWork1_2Mapper extends Mapper<LongWritable,Text, Text, Text>{
//1 1193 5 978300760 One Flew Over the Cuckoo's Nest (1975) Drama
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
//取電影ID
String kk = split[1];
//取電影名稱
String aa = split[4];
context.write(new Text(kk), new Text(aa));
}
}
public static class HomeWork1_2Reducer extends Reducer<Text, Text, Text, NullWritable>{
@Override
protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
int countName = 0;
String movieName = "";
//對電影名稱進行遍歷,求出對該電影的評分次數
for(Text text : values){
countName++;
movieName = text.toString();
}
String qq = key.toString() +"\t"+ movieName+"\t"+ countName;
context.write(new Text(qq), NullWritable.get());
}
}
//reduce階段不能實現排序,所以需要在使用另一個MapReduce進行排序,取前10
public static class HomeWork1_2MRMapper extends Mapper<LongWritable, Text, MovieRate, NullWritable>{
//1 Toy Story (1995) 2077
MovieRate ee = new MovieRate();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
ee.setUserid(split[0]);
ee.setMovieName(split[1]);
ee.setCountName(Integer.parseInt(split[2]));
context.write(ee, NullWritable.get());
}
}
public static class HomeWork1_2MRReducer extends Reducer<MovieRate, NullWritable, MovieRate, NullWritable>{
int count = 0;//一定要將這個變數設在reduce外面,不能再for迴圈上面設定
@Override
protected void reduce(MovieRate key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {
//取前10個
for(NullWritable in : values){
count++;
if (count <= 10) {
context.write(key, NullWritable.get());
}else {
return;
}
}
}
}
}
需要對第一個mapreduce的自定義排序:
package homework1;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class MovieRate implements WritableComparable<MovieRate>{
private String userid;
private String movieName;
private int countName;
public String getUserid() {
return userid;
}
public void setUserid(String userid) {
this.userid = userid;
}
public String getMovieName() {
return movieName;
}
public void setMovieName(String movieName) {
this.movieName = movieName;
}
public int getCountName() {
return countName;
}
public void setCountName(int countName) {
this.countName = countName;
}
public MovieRate(String userid, String movieName, int countName) {
super();
this.userid = userid;
this.movieName = movieName;
this.countName = countName;
}
public MovieRate() {
// TODO Auto-generated constructor stub
}
@Override
public String toString() {
return userid + "\t" + movieName + "\t" + countName;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(userid);
out.writeUTF(movieName);
out.writeInt(countName);
}
@Override
public void readFields(DataInput in) throws IOException {
userid = in.readUTF();
movieName = in.readUTF();
countName = in.readInt();
}
@Override
public int compareTo(MovieRate o) {
int ff = this.countName - o.countName;
if (ff == 0) {
return 0;
}else {
return ff > 0 ? -1 : 1;
}
}
}
第一個MapReduce執行結果:擷取部分
1 Toy Story (1995) 2077
10 GoldenEye (1995) 888
100 City Hall (1996) 128
1000 Curdled (1996) 20
1002 Ed's Next Move (1996) 8
1003 Extreme Measures (1996) 121
1004 Glimmer Man, The (1996) 101
1005 D3: The Mighty Ducks (1996) 142
1006 Chamber, The (1996) 78
1007 Apple Dumpling Gang, The (1975) 232
1008 Davy Crockett, King of the Wild Frontier (1955) 97
1009 Escape to Witch Mountain (1975) 291
101 Bottle Rocket (1996) 253
1010 Love Bug, The (1969) 242
1011 Herbie Rides Again (1974) 135
1012 Old Yeller (1957) 301
1013 Parent Trap, The (1961) 258
1014 Pollyanna (1960) 136
1015 Homeward Bound: The Incredible Journey (1993) 234
1016 Shaggy Dog, The (1959) 156
1017 Swiss Family Robinson (1960) 276
1018 That Darn Cat! (1965) 123
1019 20,000 Leagues Under the Sea (1954) 575
102 Mr. Wrong (1996) 60
1020 Cool Runnings (1993) 392
1021 Angels in the Outfield (1994) 247
1022 Cinderella (1950) 577
1023 Winnie the Pooh and the Blustery Day (1968) 221
1024 Three Caballeros, The (1945) 126
1025 Sword in the Stone, The (1963) 293
1026 So Dear to My Heart (1949) 8
1027 Robin Hood: Prince of Thieves (1991) 344
1028 Mary Poppins (1964) 1011
第二個MapReduce執行結果(top10)
2858 American Beauty (1999) 3428
260 Star Wars: Episode IV - A New Hope (1977) 2991
1196 Star Wars: Episode V - The Empire Strikes Back (1980) 2990
1210 Star Wars: Episode VI - Return of the Jedi (1983) 2883
480 Jurassic Park (1993) 2672
2028 Saving Private Ryan (1998) 2653
589 Terminator 2: Judgment Day (1991) 2649
2571 Matrix, The (1999) 2590
1270 Back to the Future (1985) 2583
593 Silence of the Lambs, The (1991) 2578
至此完成!!!