1. 程式人生 > >使用MapReduce執行WordCount案例

使用MapReduce執行WordCount案例

@[toc] ## 一、準備資料 注意:準備的資料的格式必須是文字,每個單詞之間使用==製表符==分割。編碼必須是==utf-8無bom== ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200715115827403.png) ## 二、MR的程式設計規範 MR的程式設計只需要將自定義的元件和系統預設元件進行組合,組合之後執行即可! ## 三、程式設計步驟 ①Map階段的核心處理邏輯需要編寫在`Mapper`中 ②Reduce階段的核心處理邏輯需要編寫在`Reducer`中 ③將編寫的Mapper和Reducer進行組合,組合成一個`Job` ④對Job進行設定,設定後執行 ## 四、編寫程式 **WCMapper.java** ```java public class WCMapper extends Mapper{ private Text out_key=new Text(); private IntWritable out_value=new IntWritable(1);//每個單詞出現一次記為1 // 針對輸入的每個 keyin-valuein呼叫一次 (0,hello hi hello hi) @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws Exception { System.out.println("keyin:"+key+"----keyout:"+value); String[] words = value.toString().split("\t"); for (String word : words) { out_key.set(word); //寫出資料(單詞,1) context.write(out_key, out_value); } } } ``` ### Mapper程式解讀 1. 導包時,需注意匯入 `org.apache.hadoop.mapreduce`包下的類(hadoop2.0的新api) 2. 自定義的類必須符合MR的Mapper的規範 3. 在MR中,只能處理`key-value`格式的資料 ` KEYIN, VALUEIN`: mapper`輸入`的k-v型別,由當前Job的InputFormat的`RecordReader`決定!封裝輸入的key-value由RecordReader自動進行,不可自定義。 `KEYOUT, VALUEOUT`: mapper`輸出`的k-v型別,可自定義 4. `InputFormat`的作用: ①驗證輸入目錄中的檔案格式,是否符合當前Job的要求 ②生成切片,每個切片都會交給一個`MapTask`處理 ③提供RecordReader,由RR從切片中讀取記錄,交給Mapper進行處理 方法: `List getSplits`: 切片 `RecordReader createRecordReader`: 建立RecordReader 預設hadoop使用的是`TextInputFormat`,TextInputFormat使用`LineRecordReader` 5. 在Hadoop中,如果有Reduce階段。通常key-value都需要實現==序列化==協議! MapTask處理後的key-value,只是一個==階段性==的結果! 這些key-value需要傳輸到==ReduceTask==所在的機器! 將一個物件通過序列化技術,序列化到一個檔案中,經過網路傳輸到另外一臺機器, 再使用==反序列化==技術,從檔案中讀取資料,還原為物件是最快捷的方式! java的序列化協議: Serializable 特點:不僅儲存物件的屬性值,型別,還會儲存大量的包的結構,子父類和介面的繼承資訊,很笨重。 hadoop開發了一款輕量級的序列化協議: `Writable`機制! **WCReducer.java** ```java /* *KEYIN, VALUEIN: Mapper輸出的keyout-valueout *KEYOUT, VALUEOUT: 自定義 */ public class WCReducer extends Reducer{ private IntWritable out_value=new IntWritable(); // reduce一次處理一組資料,key相同的視為一組 @Override protected void reduce(Text key, Iterable values, Reducer.Context context) throws Exception { int sum=0; for (IntWritable intWritable : values) { sum += intWritable.get(); } out_value.set(sum); //將累加的值寫出 context.write(key, out_value); } } ``` **WCDriver.java** ```java /* * 1.啟動這個執行緒,執行Job * * 2.本地模式主要用於測試程式是否正確! */ public class WCDriver { public static void main(String[] args) throws Exception { Path inputPath=new Path("e:/input/wordcount"); Path outputPath=new Path("e:/output/wordcount");//保證輸出目錄不存在 //作為整個Job的配置 Configuration conf = new Configuration();//空參表示預設使用本地的檔案系統 //使用HDFS,分散式檔案系統 /* Path inputPath=new Path("/wordcount"); Path outputPath=new Path("/mroutput/wordcount"); conf.set("fs.defaultFS", "hdfs://hadoop101:9000"); conf.set("mapreduce.framework.name", "yarn");// 在YARN上執行 conf.set("yarn.resourcemanager.hostname", "hadoop102");// RM所在的機器 */ //一定要保證輸出目錄不存在 FileSystem fs=FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } // ①建立Job Job job = Job.getInstance(conf); //job.setJar("MapReduce-0.0.1-SNAPSHOT.jar");// 告訴NM執行時,MR中Job所在的Jar包在哪裡 // 將某個類所在地jar包作為job的jar包 job.setJarByClass(WCDriver.class); // 為Job建立一個名字 job.setJobName("wordcount"); // ②設定Job // 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); // Job需要根據Mapper和Reducer輸出的Key-value型別準備序列化器,通過序列化器對輸出的key-value進行序列化和反序列化 // 如果Mapper和Reducer輸出的Key-value型別一致,直接設定Job最終的輸出型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設定輸入目錄和輸出目錄 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // ③執行Job job.waitForCompletion(true); } } ``` 注意: 若要在yarn上執行,需將這三個程式打成jar包,然後放在叢集某臺機器上,使用`hadoop jar`命令執行 ```bash hadoop jar jar包名 主類名(WCDriv