Hadoop2.x實戰:WordCount、Sort、去重複、average例項MapRedure編寫
阿新 • • 發佈:2019-02-20
Hadoop版本:2.6.0
Eclipse版本:luna
一、 Hadoop做的一個計算單詞的例項
1、引入jar
2、程式碼編寫<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.6</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> </dependencies>
package com.lin.wordcount; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class WordCount { public static class WordCountMapper extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one =new IntWritable(1); private Text word =new Text(); public void map(Object key,Text value,OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word,one);//字元解析成key-value,然後再發給reducer } } } public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result =new IntWritable(); public void reduce(Text key, Iterator<IntWritable>values, OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException { int sum = 0; while (values.hasNext()){//key相同的map會被髮送到同一個reducer,所以通過迴圈來累加 sum +=values.next().get(); } result.set(sum); output.collect(key, result);//結果寫到hdfs } } public static void main(String[] args)throws Exception { //System.setProperty("hadoop.home.dir", "D:\\project\\hadoop-2.7.2"); 如果本地環境變數沒有設定hadoop路徑可以這麼做 String input = "hdfs://hmaster:9000/input/LICENSE.txt"; String output = "hdfs://hmaster:9000/output/"; JobConf conf = new JobConf(WordCount.class); conf.setJobName("WordCount"); //方法一設定連線引數 conf.addResource("classpath:/hadoop2/core-site.xml"); conf.addResource("classpath:/hadoop2/hdfs-site.xml"); conf.addResource("classpath:/hadoop2/mapred-site.xml"); conf.addResource("classpath:/hadoop2/yarn-site.xml"); //方法二設定連線引數 //conf.set("mapred.job.tracker", "10.75.201.125:9000"); conf.setOutputKeyClass(Text.class);//設定輸出key格式 conf.setOutputValueClass(IntWritable.class);//設定輸出value格式 conf.setMapperClass(WordCountMapper.class);//設定Map運算元 conf.setCombinerClass(WordCountReducer.class);//設定Combine運算元 conf.setReducerClass(WordCountReducer.class);//設定reduce運算元 conf.setInputFormat(TextInputFormat.class);//設定輸入格式 conf.setOutputFormat(TextOutputFormat.class);//設定輸出格式 FileInputFormat.setInputPaths(conf,new Path(input));//設定輸入路徑 FileOutputFormat.setOutputPath(conf,new Path(output));//設定輸出路徑 JobClient.runJob(conf); System.exit(0); } }
3、輸出結果:
最終輸出:二、Sort排序例項
原始碼:
輸入檔案:package com.lin.sort; /** * 功能概要:資料排序 * * @author linbingwen * @since 2016年6月30日 */ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Sort { //map將輸入中的value化成IntWritable型別,作為輸出的key public static class Map extends Mapper<Object,Text,IntWritable,IntWritable> { private static IntWritable data=new IntWritable(); //實現map函式 public void map(Object key,Text value,Context context) throws IOException,InterruptedException{ String line=value.toString(); data.set(Integer.parseInt(line)); context.write(data, new IntWritable(1)); } } //reduce將輸入中的key複製到輸出資料的key上, //然後根據輸入的value-list中元素的個數決定key的輸出次數 //用全域性linenum來代表key的位次 public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { private static IntWritable linenum = new IntWritable(1); //實現reduce函式 public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException { for(IntWritable val:values){ context.write(linenum, key); linenum = new IntWritable(linenum.get()+1); } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); //這句話很關鍵 // conf.set("mapred.job.tracker", "192.168.1.2:9001"); conf.addResource("classpath:/hadoop2/core-site.xml"); conf.addResource("classpath:/hadoop2/hdfs-site.xml"); conf.addResource("classpath:/hadoop2/mapred-site.xml"); conf.addResource("classpath:/hadoop2/yarn-site.xml"); String[] ioArgs=new String[]{"hdfs://hmaster:9000/sort_in","hdfs://hmaster:9000/sort_out"}; String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Data Sort <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf, "Data Sort"); job.setJarByClass(Sort.class); //設定Map和Reduce處理類 job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //設定輸出型別 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //設定輸入和輸出目錄 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
file1.txt
2
32
654
32
15
756
65223
file2.txt5956
22
650
92
file3.txt
26
54
6
執行結果:輸入輸出:
下面是在hadoop的安裝機器上看的結果
三、去重例項
package com.lin.diffdata;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import com.lin.wordcount.WordCount;
/**
* 功能概要:資料去重複
*
* @author linbingwen
* @since 2016年6月28日
*/
public class DiffData {
// map將輸入中的value複製到輸出資料的key上,並直接輸出
public static class Map extends Mapper<Object, Text, Text, Text> {
private static Text line = new Text();// 每行資料
// 實現map函式
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
line = value;
context.write(line, new Text(""));
}
}
// reduce將輸入中的key複製到輸出資料的key上,並直接輸出
public static class Reduce extends Reducer<Text, Text, Text, Text> {
// 實現reduce函式
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(DiffData.class);
conf.setJobName("WordCount");
conf.addResource("classpath:/hadoop2/core-site.xml");
conf.addResource("classpath:/hadoop2/hdfs-site.xml");
conf.addResource("classpath:/hadoop2/mapred-site.xml");
conf.addResource("classpath:/hadoop2/yarn-site.xml");
String[] ioArgs = new String[] { "hdfs://hmaster:9000/input", "hdfs://hmaster:9000/output" };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Data Deduplication <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "Data Deduplication");
job.setJarByClass(DiffData.class);
// 設定Map、Combine和Reduce處理類
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// 設定輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 設定輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
執行輸出結果:
最終結果:
其中輸入
file1.txt
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
file2.txt
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
hadoop安裝機器上檢視結果
四、求平均數
package com.lin.average;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Average {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
// 實現map函式
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 將輸入的純文字檔案的資料轉化成String
String line = value.toString();
// 將輸入的資料首先按行進行分割
StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
// 分別對每一行進行處理
while (tokenizerArticle.hasMoreElements()) {
// 每行按空格劃分
StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
String strName = tokenizerLine.nextToken();// 學生姓名部分
String strScore = tokenizerLine.nextToken();// 成績部分
Text name = new Text(strName);
int scoreInt = Integer.parseInt(strScore);
// 輸出姓名和成績
context.write(name, new IntWritable(scoreInt));
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
// 實現reduce函式
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
sum += iterator.next().get();// 計算總分
count++;// 統計總的科目數
}
int average = (int) sum / count;// 計算平均成績
context.write(key, new IntWritable(average));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//設定hadoop的機器、埠
conf.set("mapred.job.tracker", "10.75.201.125:9000");
//設定輸入輸出檔案目錄
String[] ioArgs = new String[] { "hdfs://hmaster:9000/average_in", "hdfs://hmaster:9000/average_out" };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Score Average <in> <out>");
System.exit(2);
}
//設定一個job
Job job = Job.getInstance(conf, "Score Average");
//去除重複的輸出資料夾
// FileSystem fs = FileSystem.get(conf);
// Path out = new Path(otherArgs[1]);
// if (fs.exists(out)){
// fs.delete(out, true);
// }
job.setJarByClass(Average.class);
// 設定Map、Combine和Reduce處理類
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// 設定輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 將輸入的資料集分割成小資料塊splites,提供一個RecordReder的實現
job.setInputFormatClass(TextInputFormat.class);
// 提供一個RecordWriter的實現,負責資料輸出
job.setOutputFormatClass(TextOutputFormat.class);
// 設定輸入和輸出目錄
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
執行結果:
下面輸入file1.txt
張三 88
李四 99
王五 66
趙六 77
file2.txt
張三 78
李四 89
王五 96
趙六 67
file3.txt
張三 80
李四 82
王五 84
趙六 86