MapReduce進一步瞭解(二)——序列化
阿新 • • 發佈:2019-02-14
1、序列化概念
- 序列化(Serialization)是指把結構化物件轉化為位元組流。
- 反序列化(Deserialization)是序列化的逆過程,把位元組流轉回結構化物件。
- java序列化(java.io.Serialization)
2、hadoop序列化的特點
- 緊湊:高效實用儲存空間
- 快速:讀寫資料的額外開銷小
- 可擴充套件:可透明地讀取老格式的資料
- 互操作:支援多語言的互動
========================================================================================
3、氣象資料分析案例
資料來源型別:【氣象站,溫度,,,,氣象時間,當前時間】
最終得到的資料:【氣象站,最高氣溫出現的次數,最高氣溫,溼度,最高氣溫出現的最後一次時間】
首先定義一個SelBean類
package Test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Set; import org.apache.hadoop.io.WritableComparable; public class SelBean implements WritableComparable<SelBean> { //定義氣象站,溫度,溼度,時間,同時右鍵對這些屬性新增set和get方法 private String station; private double temp; private double humi; private String time; //新增有參的構造方法,對應的就應該新增一個無參的構造方法 public void set(String station, double temp, double humi, String time) { this.station = station; this.temp = temp; this.humi = humi; this.time = time; } //無參的構造方法 public void set (){} //反序列化,將位元組流中的內容讀取出來賦給物件 @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.station = in.readUTF(); this.temp = in.readDouble(); this.humi = in.readDouble(); this.time = in.readUTF(); } //序列化,將字記憶體中的資訊存放在位元組流當中 //注意:序列化和反序列化中屬性的順序和型別 @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(station);//支援多種型別 out.writeDouble(temp); out.writeDouble(humi); out.writeUTF(time); } //重寫tostring方法,將整體結果作為一個value返回 @Override public String toString() { // TODO Auto-generated method stub return this.station + "\t" + this.temp + "\t" + this.humi + "\t" + this.time + ";"; //return this.temp + "\t" + this.humi + "\t" + this.time; } //重寫比較方法 @Override public int compareTo(SelBean o) { // TODO Auto-generated method stub if(this.temp == o.getTemp()) { return this.humi > o.getHumi() ? 1 : -1; } else { return this.temp > o.getTemp() ? -1 :1; } } public String getStation() { return station; } public void setStation(String station) { this.station = station; } public double getTemp() { return temp; } public void setTemp(double temp) { this.temp = temp; } public double getHumi() { return humi; } public void setHumi(double humi) { this.humi = humi; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } }
主類
<pre class="java" name="code">package Test; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class DataSelection { public static class DSelMapper extends Mapper<LongWritable, Text, Text, SelBean> { private Text k = new Text();; private SelBean v = new SelBean(); String ti= ""; //重寫map方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //獲取到一行內容 String line=value.toString(); //通過切分資料將資料儲存在陣列中 String[] fields=line.split(","); //獲取到三個欄位【氣象站,溫度,溼度,時間】 String s=fields[0].substring(1, fields[0].length()-1); String te=fields[1].substring(1, fields[1].length()-1); String h=fields[2].substring(1, fields[2].length()-1); //由於每一行的內容不一定相同,所以獲取時間的時候要區分一下 if(fields.length == 7) { ti=fields[5].substring(1, fields[5].length()-1); } else ti= ""; //將獲取得到的溫度和溼度轉換為double型 double te0=Double.parseDouble(te); double h0=Double.parseDouble(h); //設定key、value;key為氣象站,value為【氣象站,溫度,溼度,時間】 k.set(s); v.set(s,te0,h0,ti); //寫入context context.write(k,v); } } private static class DSelReducer extends Reducer<Text, SelBean, Text, SelBean> { private SelBean v = new SelBean(); //重寫reduce方法 //這裡要注意接收到的資料型別 //<key, value><station1,{SelBean(station1,temp1,h1,t1),SelBean(station1,temp2,h2,t2),SelBean(station1,temp3,h3,t3)......}> @Override protected void reduce(Text key, Iterable<SelBean> values,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //定義最好氣溫、溼度、時間、最高氣溫出現次數、最高氣溫出現次數, double maxValue = Double.MIN_VALUE; double h = 0; String t = ""; String s = ""; int count = 0; List<Double> data = new ArrayList<Double>(); //迴圈取得每一個氣象站的最高氣溫 for (SelBean bean : values) { //maxValue = bean.getTemp(); maxValue = Math.max(maxValue, bean.getTemp()); data.add(bean.getTemp()); if(bean.getTemp() >= maxValue) { h = bean.getHumi(); t = bean.getTime(); } //s = bean.getStation(); } //計算最高氣溫出現的次數 for (double bean : data) { if (bean == maxValue) count ++; } s= Integer.toString(count); v.set(s, maxValue, h , t); context.write(key, v); } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(DataSelection.class); job.setMapperClass(DSelMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SelBean.class); job.setReducerClass(DSelReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(SelBean.class); FileInputFormat.addInputPath(job,new Path("hdfs://10.2.173.15:9000/user/guest/input01")); FileOutputFormat.setOutputPath(job, new Path("hdfs://10.2.173.15:9000/user/guest/0data3")); job.waitForCompletion(true); } }