1. 程式人生 > >Hadoop經典案例Spark實現(一)——通過採集的氣象資料分析每年的最高溫度

Hadoop經典案例Spark實現(一)——通過採集的氣象資料分析每年的最高溫度

1、原始資料分析
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999
0067011990999991950032418004888888880500001N9+00001+9999999999999999999999

0067011990999991950051507004888888880500001N9+00781+9999999999999999999999


資料說明: 
第15-19個字元是year
第45-50位是溫度表示,+表示零上 -表示零下,且溫度的值不能是9999,9999表示異常資料

第50位值只能是0、1、4、5、9幾個數字

2、首先MapReduce實現

1) map 任務

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class NewMaxTemperatureMapper extends
		Mapper<LongWritable, Text, Text, IntWritable> {
	private static final int MISSING = 9999;

	@Override
	public void map(LongWritable key, Text value,

	Context context) throws IOException, InterruptedException {

		String line = value.toString();

		System.out.println("key: " + key);

		String year = line.substring(15, 19);

		int airTemperature;

		if (line.charAt(45) == '+') {

			airTemperature = Integer.parseInt(line.substring(46, 50));

		} else {

			airTemperature = Integer.parseInt(line.substring(45, 50));

		}

		String quality = line.substring(50, 51);

		System.out.println("quality: " + quality);

		if (airTemperature != MISSING && quality.matches("[01459]")) {

			context.write(new Text(year), new IntWritable(airTemperature));

		}
	}
}
2)reduce任務
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class NewMaxTemperatureReducer extends
		Reducer<Text, IntWritable, Text, IntWritable> {

	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context)
			throws IOException, InterruptedException {
		
		int maxValue = Integer.MIN_VALUE;

		for(IntWritable value: values){

			maxValue = Math.max(maxValue, value.get());

		}


		context.write(key, new IntWritable(maxValue));
	}
	
}

(3)job提交
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NewMaxTemperature {

	public static void main(String[] args)throws Exception {
		if (args.length != 2) {

			System.err.print("Usage: MaxTemperature<input path> <output path>");

			System.exit(-1);

		}

		Job job = new Job();

		job.setJarByClass(NewMaxTemperature.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));

		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.setMapperClass(NewMaxTemperatureMapper.class);

		job.setReducerClass(NewMaxTemperatureReducer.class);

		job.setOutputKeyClass(Text.class);

		job.setOutputValueClass(IntWritable.class);

		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}

}

3、Spark程式碼實現Scala版本
val one = sc.textFile("/tmp/hadoop/one")

val yearAndTemp = one.filter(line => {
 val quality = line.substring(50, 51);
 var airTemperature = 0
 if(line.charAt(45)=='+'){
  airTemperature = line.substring(46, 50).toInt
 }else{
  airTemperature = line.substring(45, 50).toInt
 }
 airTemperature != 9999 && quality.matches("[01459]")}).map{
line =>{
 val year = line.substring(15,19)
 var airTemperature = 0

 if(line.charAt(45)=='+'){
  airTemperature = line.substring(46, 50).toInt
 }else{
  airTemperature = line.substring(45, 50).toInt
 }
  (year,airTemperature)
}
}

val res = yearAndTemp.reduceByKey(
 (x,y)=> if(x>y) x else y
)

res.collect.foreach(x=>println("year : " + x._1+", max : "+x._2))

上面為了過濾非法的資料,在map前先做了filter過濾。


mapreduce與spark執行的任務結果是一樣的

year : 1949, max : 111
year : 1950, max : 78