Hadoop經典案例Spark實現(一)——通過採集的氣象資料分析每年的最高溫度
阿新 • • 發佈:2019-02-03
1、原始資料分析
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
0067011990999991950051507004888888880500001N9+00781+9999999999999999999999
資料說明:
第15-19個字元是year
第45-50位是溫度表示,+表示零上 -表示零下,且溫度的值不能是9999,9999表示異常資料
2)reduce任務
(3)job提交
3、Spark程式碼實現Scala版本
mapreduce與spark執行的任務結果是一樣的
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999
0067011990999991950051507004888888880500001N9+00781+9999999999999999999999
資料說明:
第15-19個字元是year
第45-50位是溫度表示,+表示零上 -表示零下,且溫度的值不能是9999,9999表示異常資料
第50位值只能是0、1、4、5、9幾個數字
2、首先MapReduce實現
1) map 任務
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class NewMaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); System.out.println("key: " + key); String year = line.substring(15, 19); int airTemperature; if (line.charAt(45) == '+') { airTemperature = Integer.parseInt(line.substring(46, 50)); } else { airTemperature = Integer.parseInt(line.substring(45, 50)); } String quality = line.substring(50, 51); System.out.println("quality: " + quality); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } }
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class NewMaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for(IntWritable value: values){ maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } }
(3)job提交
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class NewMaxTemperature { public static void main(String[] args)throws Exception { if (args.length != 2) { System.err.print("Usage: MaxTemperature<input path> <output path>"); System.exit(-1); } Job job = new Job(); job.setJarByClass(NewMaxTemperature.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(NewMaxTemperatureMapper.class); job.setReducerClass(NewMaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
3、Spark程式碼實現Scala版本
val one = sc.textFile("/tmp/hadoop/one") val yearAndTemp = one.filter(line => { val quality = line.substring(50, 51); var airTemperature = 0 if(line.charAt(45)=='+'){ airTemperature = line.substring(46, 50).toInt }else{ airTemperature = line.substring(45, 50).toInt } airTemperature != 9999 && quality.matches("[01459]")}).map{ line =>{ val year = line.substring(15,19) var airTemperature = 0 if(line.charAt(45)=='+'){ airTemperature = line.substring(46, 50).toInt }else{ airTemperature = line.substring(45, 50).toInt } (year,airTemperature) } } val res = yearAndTemp.reduceByKey( (x,y)=> if(x>y) x else y ) res.collect.foreach(x=>println("year : " + x._1+", max : "+x._2))
上面為了過濾非法的資料,在map前先做了filter過濾。
mapreduce與spark執行的任務結果是一樣的
year : 1949, max : 111
year : 1950, max : 78