1. 程式人生 > >Hadoop2.x實戰:WordCount、Sort、去重複、average例項MapRedure編寫

Hadoop2.x實戰:WordCount、Sort、去重複、average例項MapRedure編寫

Hadoop版本:2.6.0

Eclipse版本:luna

一、     Hadoop做的一個計算單詞的例項

1、引入jar

	<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.2.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>2.2.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.2.0</version>
		</dependency>
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.6</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
	</dependencies>
2、程式碼編寫
package com.lin.wordcount;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
 
public class WordCount {
 
    public static class WordCountMapper extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one =new IntWritable(1);
        private Text word =new Text();
 
        public void map(Object key,Text value,OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while(itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word,one);//字元解析成key-value,然後再發給reducer
            }
 
        }
    }
 
    public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result =new IntWritable();
 
        public void reduce(Text key, Iterator<IntWritable>values, OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {
            int sum = 0;
            while (values.hasNext()){//key相同的map會被髮送到同一個reducer,所以通過迴圈來累加
                sum +=values.next().get();
            }
            result.set(sum);
            output.collect(key, result);//結果寫到hdfs
        }
      
    }
 
    public static void main(String[] args)throws Exception {
    	//System.setProperty("hadoop.home.dir", "D:\\project\\hadoop-2.7.2"); 如果本地環境變數沒有設定hadoop路徑可以這麼做
    	
        String input = "hdfs://hmaster:9000/input/LICENSE.txt";
        String output = "hdfs://hmaster:9000/output/";
 
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("WordCount");
        //方法一設定連線引數
        conf.addResource("classpath:/hadoop2/core-site.xml");
        conf.addResource("classpath:/hadoop2/hdfs-site.xml");
        conf.addResource("classpath:/hadoop2/mapred-site.xml");
        conf.addResource("classpath:/hadoop2/yarn-site.xml");
       //方法二設定連線引數
       //conf.set("mapred.job.tracker", "10.75.201.125:9000");
 
        conf.setOutputKeyClass(Text.class);//設定輸出key格式
        conf.setOutputValueClass(IntWritable.class);//設定輸出value格式
 
       conf.setMapperClass(WordCountMapper.class);//設定Map運算元
       conf.setCombinerClass(WordCountReducer.class);//設定Combine運算元
       conf.setReducerClass(WordCountReducer.class);//設定reduce運算元
 
       conf.setInputFormat(TextInputFormat.class);//設定輸入格式
       conf.setOutputFormat(TextOutputFormat.class);//設定輸出格式
 
       FileInputFormat.setInputPaths(conf,new Path(input));//設定輸入路徑
       FileOutputFormat.setOutputPath(conf,new Path(output));//設定輸出路徑
 
       JobClient.runJob(conf);
       System.exit(0);
    }
 
}


3、輸出結果:

最終輸出:


二、Sort排序例項 

原始碼:

package com.lin.sort;

/**
 * 功能概要:資料排序
 * 
 * @author linbingwen
 * @since  2016年6月30日 
 */

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class Sort {
 
    //map將輸入中的value化成IntWritable型別,作為輸出的key
    public static class Map extends Mapper<Object,Text,IntWritable,IntWritable> {
        private static IntWritable data=new IntWritable();
       
        //實現map函式
        public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
            String line=value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
        }
       
    }
   
    //reduce將輸入中的key複製到輸出資料的key上,
    //然後根據輸入的value-list中元素的個數決定key的輸出次數
    //用全域性linenum來代表key的位次
    public static class Reduce extends  Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
       
        private static IntWritable linenum = new IntWritable(1);
       
        //實現reduce函式
        public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException {
            for(IntWritable val:values){
                context.write(linenum, key);
                linenum = new IntWritable(linenum.get()+1);
            }
           
        }
 
    }
   
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        //這句話很關鍵
//        conf.set("mapred.job.tracker", "192.168.1.2:9001");
        conf.addResource("classpath:/hadoop2/core-site.xml");
        conf.addResource("classpath:/hadoop2/hdfs-site.xml");
        conf.addResource("classpath:/hadoop2/mapred-site.xml");
        conf.addResource("classpath:/hadoop2/yarn-site.xml");
       
        String[] ioArgs=new String[]{"hdfs://hmaster:9000/sort_in","hdfs://hmaster:9000/sort_out"};
        String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
        if (otherArgs.length != 2) {
        System.err.println("Usage: Data Sort <in> <out>");
         System.exit(2);
      }
     
     Job job = Job.getInstance(conf, "Data Sort");
     job.setJarByClass(Sort.class);
     
     //設定Map和Reduce處理類
     job.setMapperClass(Map.class);
     job.setReducerClass(Reduce.class);
     
     //設定輸出型別
     job.setOutputKeyClass(IntWritable.class);
     job.setOutputValueClass(IntWritable.class);
     
     //設定輸入和輸出目錄
     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
     System.exit(job.waitForCompletion(true) ? 0 : 1);
     }
}
輸入檔案:

file1.txt

2
32
654
32
15
756
65223
file2.txt
5956
22
650
92

file3.txt

26
54
6
執行結果:

輸入輸出:


下面是在hadoop的安裝機器上看的結果


三、去重例項

package com.lin.diffdata;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.lin.wordcount.WordCount;

/**
 * 功能概要:資料去重複
 * 
 * @author linbingwen
 * @since 2016年6月28日
 */
public class DiffData {

	// map將輸入中的value複製到輸出資料的key上,並直接輸出
	public static class Map extends Mapper<Object, Text, Text, Text> {
		private static Text line = new Text();// 每行資料

		// 實現map函式
		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			line = value;
			context.write(line, new Text(""));
		}

	}

	// reduce將輸入中的key複製到輸出資料的key上,並直接輸出
	public static class Reduce extends Reducer<Text, Text, Text, Text> {
		// 實現reduce函式
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			context.write(key, new Text(""));
		}

	}

	public static void main(String[] args) throws Exception {
		
        JobConf conf = new JobConf(DiffData.class);
        conf.setJobName("WordCount");
        conf.addResource("classpath:/hadoop2/core-site.xml");
        conf.addResource("classpath:/hadoop2/hdfs-site.xml");
        conf.addResource("classpath:/hadoop2/mapred-site.xml");
        conf.addResource("classpath:/hadoop2/yarn-site.xml");

		String[] ioArgs = new String[] { "hdfs://hmaster:9000/input", "hdfs://hmaster:9000/output" };
		String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: Data Deduplication <in> <out>");
			System.exit(2);
		}

		Job job =  Job.getInstance(conf, "Data Deduplication");
		job.setJarByClass(DiffData.class);

		// 設定Map、Combine和Reduce處理類
		job.setMapperClass(Map.class);
		job.setCombinerClass(Reduce.class);
		job.setReducerClass(Reduce.class);

		// 設定輸出型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		// 設定輸入和輸出目錄
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

執行輸出結果:


最終結果:


其中輸入

file1.txt

2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c

file2.txt

2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c

hadoop安裝機器上檢視結果


四、求平均數

package com.lin.average;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Average {

	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
		// 實現map函式
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			// 將輸入的純文字檔案的資料轉化成String
			String line = value.toString();
			// 將輸入的資料首先按行進行分割
			StringTokenizer tokenizerArticle = new StringTokenizer(line, "\n");
			// 分別對每一行進行處理
			while (tokenizerArticle.hasMoreElements()) {
				// 每行按空格劃分
				StringTokenizer tokenizerLine = new StringTokenizer(tokenizerArticle.nextToken());
				String strName = tokenizerLine.nextToken();// 學生姓名部分
				String strScore = tokenizerLine.nextToken();// 成績部分
				Text name = new Text(strName);
				int scoreInt = Integer.parseInt(strScore);
				// 輸出姓名和成績
				context.write(name, new IntWritable(scoreInt));
			}
		}

	}

	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
		// 實現reduce函式
		public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
			int sum = 0;
			int count = 0;
			Iterator<IntWritable> iterator = values.iterator();
			while (iterator.hasNext()) {
				sum += iterator.next().get();// 計算總分
				count++;// 統計總的科目數
			}
			int average = (int) sum / count;// 計算平均成績
			context.write(key, new IntWritable(average));
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		//設定hadoop的機器、埠
		conf.set("mapred.job.tracker", "10.75.201.125:9000");
		//設定輸入輸出檔案目錄
		String[] ioArgs = new String[] { "hdfs://hmaster:9000/average_in", "hdfs://hmaster:9000/average_out" };
		String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: Score Average <in> <out>");
			System.exit(2);
		}
		//設定一個job
		Job job = Job.getInstance(conf, "Score Average");
		
		//去除重複的輸出資料夾
//        FileSystem fs = FileSystem.get(conf);
//        Path out = new Path(otherArgs[1]);
//        if (fs.exists(out)){
//            fs.delete(out, true);
//        }
		
		job.setJarByClass(Average.class);
		
		// 設定Map、Combine和Reduce處理類
		job.setMapperClass(Map.class);
		job.setCombinerClass(Reduce.class);
		job.setReducerClass(Reduce.class);
		
		// 設定輸出型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		// 將輸入的資料集分割成小資料塊splites,提供一個RecordReder的實現
		job.setInputFormatClass(TextInputFormat.class);
		
		// 提供一個RecordWriter的實現,負責資料輸出
		job.setOutputFormatClass(TextOutputFormat.class);
		
		// 設定輸入和輸出目錄
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

}

執行結果:



下面輸入file1.txt

張三    88
李四    99
王五    66
趙六    77

file2.txt

張三    78
李四    89
王五    96
趙六    67

file3.txt

張三    80
李四    82
王五    84
趙六    86