WordCount(mapreduce、yarn)
阿新 • • 發佈:2018-11-21
作為一個hadoop的初學者,在經歷了一系列繁瑣複雜的hadoop叢集環境安裝配置之後,終於自主完成了一個wordcount程式。通過mapreduce進行分散式運算,並通過yarn進行執行排程。
wordcount是一個經典的案例,相信大家都熟悉。主要任務就是計算每個單詞出現的次數並儲存。實現該過程,主要包括兩個階段:map階段: 將每一行文字資料變成<單詞,1>這樣的kv資料;reduce階段:將相同單詞的一組kv資料進行聚合,即累加所有的v。主要包括三個類的開發:WordcountMapper類開發;WordcountReducer類開發;JobSubmitter客戶端類開發。
WordcountMapper類:
package ldp.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * KEYIN:是map task讀取到的資料的key型別,是一行的起始偏移量Long * VALUEIN:是map task讀取到的資料的value型別,是一行的內容String * KEYOUT:是使用者自定義map方法要返回的結果kv資料的key型別,在wordcount邏輯中,為單詞String * VALUEOUT:是使用者自定義map方法要返回的結果kv資料的value型別,在wordcount邏輯中,為整數Integer * * 在mapreduce中,map產生的資料需要傳輸給reduce,需要進行序列化和反序列化,所以hadoop設計了自己 * 的序列化機制 * hadoop為jdk中的常用基本型別序列化介面: * Long LongWritable * String Text * Integer IntWritable * Float FloatWritable */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //切單詞,將每一行資料按照分割符" "切分 String line = value.toString(); String[] words = line.split(" "); for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } }
WordcountReducer類:
package ldp.wordcount; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { //統計詞頻,values為一個迭代物件 Iterator<IntWritable> iterator = values.iterator(); int count = 0; while (iterator.hasNext()) { IntWritable value = (IntWritable) iterator.next(); count += value.get(); } context.write(key, new IntWritable(count)); } }
JobSubmitter客戶端類:
package ldp.mapreduce;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**用於提交mapreduce job的客戶端程式
*功能:
*1、封裝本次job執行時所需要的必要引數
*2、跟yarn進行互動,將mapreduce程式成功的啟動執行
*/
public class JobSubmitter {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//封裝引數:jar包所在的位置
job.setJarByClass(JobSubmitter.class);
//封裝引數:本次job要呼叫的Mapper實現類、Reducer實現類
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//封裝引數:本次job的Mapper實現類、Reducer實現類產生的結果資料的key、value型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//封裝引數:本次job要處理的輸入資料集所在路徑、最終結果的輸出路徑
//注意:此時輸入路徑為hadoop叢集環境的目錄
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
//注意:輸出路徑必須不存在,否則報錯
FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
//封裝引數:想要啟動的reducer task的數量
job.setNumReduceTasks(2);
//提交job給yarn
boolean res = job.waitForCompletion(true);
//用於以後的日誌需要,可刪掉
System.exit(res?0:-1);
}
}
三個類都完成以後,就需要將工程打成一個jar包並上傳至linux伺服器。然後在hadoop叢集的機器上,用命令
hadoop jar wordcount.jar ldp.wordcount.JobSubmitter執行,hadoop jar命令會將這臺機器上的hadoop安裝目錄中的所有jar包和配置檔案全部加入執行時的classpath,此時執行結束,大功告成。
同時,有時候需要除錯,也可以在本地執行,只需要把JobSubmitter客戶端類的
FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
FileOutputFormat.setOutputPath(job, new Path("/wordcount/output"));
修改成本地目錄,即:
FileInputFormat.setInputPaths(job, new Path("e:/wordcount/input"));
FileOutputFormat.setOutputPath(job, new Path("e:/wordcount/output"));
此時,mapreduce程式就會在本機執行,同時可以輕鬆除錯及debug。