1. 程式人生 > >MapReduce進一步瞭解(二)——序列化

MapReduce進一步瞭解(二)——序列化

1、序列化概念

  1. 序列化(Serialization)是指把結構化物件轉化為位元組流。
  2. 反序列化(Deserialization)是序列化的逆過程,把位元組流轉回結構化物件。
  3. java序列化(java.io.Serialization)

2、hadoop序列化的特點

  1. 緊湊:高效實用儲存空間
  2. 快速:讀寫資料的額外開銷小
  3. 可擴充套件:可透明地讀取老格式的資料
  4. 互操作:支援多語言的互動

========================================================================================

3、氣象資料分析案例

資料來源型別:【氣象站,溫度,,,,氣象時間,當前時間】

最終得到的資料:【氣象站,最高氣溫出現的次數,最高氣溫,溼度,最高氣溫出現的最後一次時間】

首先定義一個SelBean類

package Test;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Set;

import org.apache.hadoop.io.WritableComparable;

public class SelBean implements WritableComparable<SelBean> {

	//定義氣象站,溫度,溼度,時間,同時右鍵對這些屬性新增set和get方法
	private String station;
	private double temp;
	private double humi;
	private String time;
	
	//新增有參的構造方法,對應的就應該新增一個無參的構造方法
	public void set(String station, double temp, double humi, String time)
	{
		this.station = station;
		this.temp = temp;
		this.humi = humi;		
		this.time = time; 
	}
	//無參的構造方法
	public void set (){}
	
	//反序列化,將位元組流中的內容讀取出來賦給物件
	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		this.station = in.readUTF();
		this.temp = in.readDouble();
		this.humi = in.readDouble();
		this.time = in.readUTF();
	}

	//序列化,將字記憶體中的資訊存放在位元組流當中
	//注意:序列化和反序列化中屬性的順序和型別
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeUTF(station);//支援多種型別
		out.writeDouble(temp);
		out.writeDouble(humi);
		out.writeUTF(time);
	}   

	//重寫tostring方法,將整體結果作為一個value返回
	@Override
	public String toString() {
		// TODO Auto-generated method stub
		return this.station + "\t" + this.temp + "\t" + this.humi + "\t" + this.time + ";";
		//return this.temp + "\t" + this.humi + "\t" + this.time;
	}


	//重寫比較方法
	@Override
	public int compareTo(SelBean o) {
		// TODO Auto-generated method stub
		if(this.temp == o.getTemp())
		{
			return this.humi > o.getHumi() ? 1 : -1;
		}
		else
		{
			return this.temp > o.getTemp() ? -1 :1;
		} 
	}

	public String getStation() {
		return station;
	}

	public void setStation(String station) {
		this.station = station;
	}

	public double getTemp() {
		return temp;
	}

	public void setTemp(double temp) {
		this.temp = temp;
	}

	public double getHumi() {
		return humi;
	}

	public void setHumi(double humi) {
		this.humi = humi;
	}

	public String getTime() {
		return time;
	}

	public void setTime(String time) {
		this.time = time;
	}

}
主類
<pre class="java" name="code">package Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DataSelection {
	
	public static class DSelMapper extends Mapper<LongWritable, Text, Text, SelBean>
	{

		private Text k = new Text();; 
		private SelBean v = new SelBean();
		String ti= "";
		
		//重寫map方法
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			//獲取到一行內容
			String line=value.toString();
			//通過切分資料將資料儲存在陣列中
			String[] fields=line.split(",");
			//獲取到三個欄位【氣象站,溫度,溼度,時間】
			String s=fields[0].substring(1, fields[0].length()-1);
			String te=fields[1].substring(1, fields[1].length()-1);
			String h=fields[2].substring(1, fields[2].length()-1);
			//由於每一行的內容不一定相同,所以獲取時間的時候要區分一下
			if(fields.length == 7)
			{
				 ti=fields[5].substring(1, fields[5].length()-1); 
			}
			else
				 ti= "";
			//將獲取得到的溫度和溼度轉換為double型
			double te0=Double.parseDouble(te);
			double h0=Double.parseDouble(h); 
			//設定key、value;key為氣象站,value為【氣象站,溫度,溼度,時間】
			k.set(s);
			v.set(s,te0,h0,ti);
			//寫入context
			context.write(k,v);
		} 
		 
	}
	private static class DSelReducer extends Reducer<Text, SelBean, Text, SelBean>
	{
		private SelBean v = new SelBean(); 
	 
		//重寫reduce方法
		//這裡要注意接收到的資料型別
		//<key, value><station1,{SelBean(station1,temp1,h1,t1),SelBean(station1,temp2,h2,t2),SelBean(station1,temp3,h3,t3)......}>
		@Override
		protected void reduce(Text key, Iterable<SelBean> values,Context context)
				throws IOException, InterruptedException {
			// TODO Auto-generated method stub 
			//定義最好氣溫、溼度、時間、最高氣溫出現次數、最高氣溫出現次數,
			double maxValue = Double.MIN_VALUE;
			double h = 0;
			String t = "";
			String s = "";
			int count = 0;
			List<Double> data = new ArrayList<Double>();
			//迴圈取得每一個氣象站的最高氣溫
			for (SelBean bean : values)
			{
				//maxValue = bean.getTemp();
				maxValue = Math.max(maxValue, bean.getTemp()); 
				data.add(bean.getTemp());
				if(bean.getTemp() >= maxValue)
				{ 
					h = bean.getHumi(); 
					t = bean.getTime();
				}
				//s = bean.getStation(); 
			} 
			//計算最高氣溫出現的次數
			for (double bean : data)
			{ 
				 if (bean == maxValue)
					 count ++; 
			} 		

			s= Integer.toString(count); 
			v.set(s, maxValue, h , t);
			
			context.write(key, v);
		}
		
	} 
	
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { 
	 
		Job job = Job.getInstance(new Configuration());
		
		job.setJarByClass(DataSelection.class); 
		job.setMapperClass(DSelMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(SelBean.class);
		
		 
		job.setReducerClass(DSelReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(SelBean.class);
		 
		FileInputFormat.addInputPath(job,new Path("hdfs://10.2.173.15:9000/user/guest/input01"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://10.2.173.15:9000/user/guest/0data3")); 
		
		job.waitForCompletion(true); 
	
	} 
}