java:MapReduce原理及入門例項:wordcount
阿新 • • 發佈:2018-12-13
MapReduce原理
MapperTask -> Shuffle(分割槽排序分組) -> ReducerTask
MapReduce執行步驟
- Map處理任務
- 讀取檔案每一行,解析成<key、value>,呼叫map函式
- 處理邏輯對key、value處理,行成新的key、value
- 資料分割槽
- Reduce處理任務
- 拷貝map任務輸出到reduce節點,對map任務輸出合併,排序
- 處理邏輯處理key、value,行成新的key、value
- 儲存到檔案中
wordcount示例
- 準備檔案
vim word.txt
hello Jack
hello Tom
hello Jimi
hello Mili
hello Make
- 上傳檔案
hadoop fs -put word.txt /word.txt
hadoop fs -ls / # 檢視
- 執行任務
cd hadoop-2.8.5/share/hadoop/mapreduce hadoop jar hadoop-mapreduce-examples-2.8.5.jar wordcount /word.txt /wcout
- 檢視任務結果
hadoop fs -ls /wcout
hadoop fs -cat /wcout/part-r-00000
Jack 1
Jimi 1
Make 1
Mili 1
Tom 1
hello 5
java示例
- mapper
package mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java. io.IOException;
/**
* 繼承Mapper 實現map計算
* 傳遞的引數需要實現序列化,通過網路傳輸
*/
public class MapDemo extends Mapper<LongWritable, Text, Text, LongWritable>{
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
// 接收資料
String line = value.toString();
// 切分單詞
String[] words = line.split(" ");
// 將每個單詞轉為數字
for(String word: words)
{
context.write(new Text(word), new LongWritable(1));
}
}
}
- reducer
package mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 繼承Reducer,實現reduce計算
*/
public class ReduceDemo extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException
{
// 定義計數器
long count = 0;
// 統計
for (LongWritable counter : values)
{
count += counter.get();
}
// 輸出結果
context.write(key, new LongWritable(count));
}
}
- job
package mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 統計單詞個數
* 執行:hadoop jar hdfsdemo.jar
* 根據實際路徑指定輸入輸出檔案
*/
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 構建Job物件
Job job = Job.getInstance(new Configuration());
// 注意:main方法所在類
job.setJarByClass(WordCount.class);
// 設定輸入檔案路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 設定Mapper屬性
job.setMapperClass(MapDemo.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 設定Reducer屬性
job.setReducerClass(ReduceDemo.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 設定輸出檔案路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交任務
job.waitForCompletion(true);
}
}
將WordCount
類的打包為jar,上傳至伺服器,執行
hadoop jar hdfsdemo.jar /word.txt /out
檢視輸出檔案,和haoop中自帶的wordcount輸出一致
Jack 1
Jimi 1
Make 1
Mili 1
Tom 1
hello 5
總結
匯入依賴jar包
hadoop-2.8.5/share/hadoop/mapreduce/
自定義任務
- 分析業務邏輯,確定輸入輸出樣式
- 繼承Mapper
- 繼承Reducer
- 通過job物件組裝Mapper和Reducer