1. 程式人生 > >學習筆記(五)-------剖析美國平均氣溫項目,掌握MapReduce編程

學習筆記(五)-------剖析美國平均氣溫項目,掌握MapReduce編程

temp tool tput config throw args on() rri interrupt

數據集導入HDFS

技術分享圖片

package com.hadoop.base;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Temperature extends Configured implements Tool {

	public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
		/**
		 * @function Mapper 解析氣象站數據
		 * @input key=偏移量  value=氣象站數據
		 * @output key=weatherStationId value=temperature
		 */
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String line = value.toString(); //每行氣象數據
			int temperature = Integer.parseInt(line.substring(14, 19).trim());//每小時氣溫值
			if (temperature != -9999) { //過濾無效數據				
				FileSplit fileSplit = (FileSplit) context.getInputSplit();
				String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通過文件名稱提取氣象站id
				context.write(new Text(weatherStationId), new IntWritable(temperature));
			}
		}
	}
	
	public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {
			int sum = 0;
			int count = 0;
			//統計每個氣象站的氣溫值總和
			for (IntWritable val : values) {
				sum += val.get();
				count++;
			}
			//求每個氣象站的氣溫平均值
			result.set(sum / count);
			context.write(key, result);
		}
	}
	
	@Override
	public int run(String[] args) throws Exception {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();//讀取配置文件

		Path mypath = new Path(args[1]);
		FileSystem hdfs = mypath.getFileSystem(conf);
		if (hdfs.isDirectory(mypath)) {
			hdfs.delete(mypath, true);
		}

		Job job = new Job(conf, "Temperature");//新建一個任務
		job.setJarByClass(Temperature.class);// 設置主類
		
		FileInputFormat.addInputPath(job, new Path(args[0]));// 輸入路徑
		FileOutputFormat.setOutputPath(job, new Path(args[1]));// 輸出路徑

		job.setMapperClass(TemperatureMapper.class);// Mapper
		job.setReducerClass(TemperatureReducer.class);// Reducer
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);		
		return job.waitForCompletion(true)?0:1;//提交任務
	}

	/**
	 * @function main 方法
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		//數據輸入路徑和輸出路徑
		String[] args0 = {
							"hdfs://master:9000/middle/weather/",
							"hdfs://master:9000/middle/weather/out/"
						};
		int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
		System.exit(ec);
	}
}

  技術分享圖片

學習筆記(五)-------剖析美國平均氣溫項目,掌握MapReduce編程