學習筆記(五)-------剖析美國平均氣溫項目,掌握MapReduce編程
阿新 • • 發佈:2019-05-03
temp tool tput config throw args on() rri interrupt
數據集導入HDFS
package com.hadoop.base; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Temperature extends Configured implements Tool { public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> { /** * @function Mapper 解析氣象站數據 * @input key=偏移量 value=氣象站數據 * @output key=weatherStationId value=temperature */ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //每行氣象數據 int temperature = Integer.parseInt(line.substring(14, 19).trim());//每小時氣溫值 if (temperature != -9999) { //過濾無效數據 FileSplit fileSplit = (FileSplit) context.getInputSplit(); String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通過文件名稱提取氣象站id context.write(new Text(weatherStationId), new IntWritable(temperature)); } } } public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; int count = 0; //統計每個氣象站的氣溫值總和 for (IntWritable val : values) { sum += val.get(); count++; } //求每個氣象站的氣溫平均值 result.set(sum / count); context.write(key, result); } } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration();//讀取配置文件 Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } Job job = new Job(conf, "Temperature");//新建一個任務 job.setJarByClass(Temperature.class);// 設置主類 FileInputFormat.addInputPath(job, new Path(args[0]));// 輸入路徑 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 輸出路徑 job.setMapperClass(TemperatureMapper.class);// Mapper job.setReducerClass(TemperatureReducer.class);// Reducer job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true)?0:1;//提交任務 } /** * @function main 方法 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //數據輸入路徑和輸出路徑 String[] args0 = { "hdfs://master:9000/middle/weather/", "hdfs://master:9000/middle/weather/out/" }; int ec = ToolRunner.run(new Configuration(), new Temperature(), args0); System.exit(ec); } }
學習筆記(五)-------剖析美國平均氣溫項目,掌握MapReduce編程