MapReduce實踐 Youtube資料分析
這篇部落格是關於如何在Hadoop MapReduce中進行YouTube資料分析的。
使用該資料集執行一些分析,並將提取一些有用的資訊,例如YouTube上排名前10位的視訊,他們上傳了最多的視訊。
資料
資料展示

資料說明
Column 1: Video id of 11 characters.
Column 2: uploader of the video
Column 3: Interval between the day of establishment of Youtube and the date of uploading of the video.
Column 4: Category of the video.
Column 5: Length of the video.
Column 6: Number of views for the video.
Column 7: Rating on the video.
Column 8: Number of ratings given for the video
Column 9: Number of comments done on the videos.
Column 10: Related video ids with the uploaded video.
問題1:尋找Top 5視訊類別
Mapper
對每一行資料進行劃分,統計各個視訊類別的數量(Column 4)。資料集中部分資料缺失,因此忽略了劃分後少於5個屬性的資料。
public static class CategoryMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); // 值為1 private Text category = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] attributeArray = value.toString().split("\t");// 對字串進行切分 if (attributeArray.length > 5)// 忽略屬性值少於5的錯誤資料 { category.set(attributeArray[3]); context.write(category, one); } } }
Combiner
Combiner的作用就是對map端的輸出先做一次合併,以減少在map和reduce節點之間的資料傳輸量,以提高網路IO效能,是MapReduce的一種優化手段之一。
Combiner實質就是在本地端先執行的一次Reducer。
public static class SumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
Reducer
因為需要對, [視訊類別, 視訊數]
陣列進行排序比較,因此首先定義一個二元組類,包含視訊類別和視訊數,分別對應first和second。並定義了Comparable介面,用於後面排序的需要。
public static class TwoTuple implements Comparable<TwoTuple> { publicString first; publicint second; public TwoTuple(String a, int b) { first = a; second = b; } public String toString() { return "(" + first + ", " + second + ")"; } @Override public int compareTo(TwoTuple tt) { return second - tt.second; } }
使用Reducer實現提取Top N值的演算法。
首先需要介紹 setup()
函式和 cleanup()
函式,與 reduce()
函式不同,不會根據key的數目多次執行,只會執行1次。
setup()此方法被MapReduce框架僅且執行一次,在執行Map任務前,進行相關變數或者資源的集中初始化工作。若是將資源初始化工作放在方法map()中,導致Mapper任務在解析每一行輸入時都會進行資源初始化工作,導致重複,程式執行效率不高。
cleanup()此方法被MapReduce框架僅且執行一次,在執行完畢Map任務後,進行相關變數或資源的釋放工作。若是將釋放資源工作放入方法map()中,也會導致Mapper任務在解析、處理每一行文字後釋放資源,而且在下一行文字解析前還要重複初始化,導致反覆重複,程式執行效率不高。
演算法介紹
在 setup()
函式中,主要用來從配置中獲取需要提取Top N的N值,並初始化 top[]
陣列;
在 reduce()
函式中,計算出每個Category的視訊總數後覆蓋放入top[0]陣列並進行排序;
在 cleanup()
函式中,將覆蓋排序多次後的top陣列寫入output。
public static class TopNReducer extends Reducer<Text, IntWritable, Text, IntWritable> { int len; TwoTuple[] top; @Override protected void setup(Context context) throws IOException, InterruptedException { len =context.getConfiguration().getInt("N", 10);// 從配置中獲取top N的N值,若無則預設為10 top = new TwoTuple[len + 1]; for (int i=0; i<=len; i++) { top[i] = new TwoTuple("null", 0); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for (int i = len; i > 0; i--) { context.write(new Text(top[i].first), new IntWritable(top[i].second)); } } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } add(key.toString(), sum); } private void add(String key, int val) { top[0].first = key; top[0].second = val;// 替換掉最小值 Arrays.sort(top); // 排序,從小到大順序 } }
main&conf
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); conf.setInt("N", 5); // String[] otherArgs = new GenericOptionsParser(conf, // args).getRemainingArgs(); String[] otherArgs = { "/youtube", "/youtube_category_Top5" }; if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "youtube"); job.setJarByClass(FindMaxCategory.class); job.setMapperClass(CategoryMapper.class); job.setCombinerClass(SumReducer.class); //job.setReducerClass(SumReducer.class);// 統計每個類別的總量 job.setReducerClass(TopNReducer.class);// 統計TopN的類別的總量 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputDirRecursive(job, true); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }