1. 程式人生 > >【尚學堂·Hadoop學習】MapReduce案例1--天氣

【尚學堂·Hadoop學習】MapReduce案例1--天氣

png font base64 sys srx ner soft alt 時間

案例描述
  找出每個月氣溫最高的2天

數據集

1949-10-01 14:21:02    34c
1949-10-01 19:21:02    38c
1949-10-02 14:01:02    36c
1950-01-01 11:21:02    32c
1950-10-01 12:21:02    37c
1951-12-01 12:21:02    23c
1950-10-02 12:21:02    41c
1950-10-03 12:21:02    27c
1951-07-01 12:21:02    45c
1951-07-02 12:21:02    46c
1951-07-03 12:21:03    47c

代碼

  MyTQ.class

package com.hadoop.mr.tq;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 客戶端 * @author Lindsey * */ public class MyTQ { public static void main(String args []) throws Exception{ //加載配置文件 Configuration conf = new Configuration(true); //創建客戶端 Job job = Job.getInstance(conf); job.setJarByClass(MyTQ.
class); //Map配置 job.setMapperClass(TMapper.class); job.setMapOutputKeyClass(Tq.class); job.setMapOutputValueClass(IntWritable.class); //分區類:處理大數據量均衡並發處理 job.setPartitionerClass(TPartitioner.class); //比較類:用buffer字節數組內的key排序 job.setSortComparatorClass(TSortComparator.class); //Reduce配置 job.setNumReduceTasks(2); job.setReducerClass(TReducer.class); //分組比較類:年月相同為一組 job.setGroupingComparatorClass(TGroupingComparator.class); //輸入輸出源 Path input = new Path("/user/hadoop/input/weather.txt"); FileInputFormat.addInputPath(job, input); Path output = new Path("/user/hadoop/output/weather"); if(output.getFileSystem(conf).exists(output)){ output.getFileSystem(conf).delete(output,true); } FileOutputFormat.setOutputPath(job, output); //提交 job.waitForCompletion(true); } }

  TMapper.class

package com.hadoop.mr.tq;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

public class TMapper extends Mapper<LongWritable, Text, Tq,IntWritable>{

    /*
     *     k-v 映射
     *          K(Tq)        V(IntWritable)    
     *  1949-10-01 14:21:02    34c
     * 
     */
    
    Tq mkey = new Tq();
    IntWritable mval =new IntWritable();
    
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        
        try {
            //字符串分割
            String [] strs = StringUtils.split(value.toString(),‘\t‘);
            //設置時間格式  註意月份是大寫!
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            //解析為Date格式
            Date date =  sdf.parse(strs[0]);
            //日歷上設置時間
            Calendar cal = Calendar.getInstance();
            cal.setTime(date);
            
            //Key
            mkey.setYear(cal.get(Calendar.YEAR));
            mkey.setMonth(cal.get(Calendar.MONTH)+1);
            mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
            int temperture = Integer.parseInt(strs[1].substring(0,strs[1].length()-1));
            mkey.setTemperature(temperture);
            
            //value
            mval.set(temperture);
            
            //輸出
            context.write(mkey, mval);
            
        } catch (ParseException e) {
            e.printStackTrace();
        }
        
    }
    
    
}

  Tq.class

package com.hadoop.mr.tq;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class Tq implements WritableComparable<Tq>{
    
    private int year;
    private int month;
    private int day;
    private int temperature;

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public int getDay() {
        return day;
    }

    public void setDay(int day) {
        this.day = day;
    }

    public int getTemperature() {
        return temperature;
    }

    public void setTemperature(int temperature) {
        this.temperature = temperature;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.year=in.readInt();
        this.month=in.readInt();
        this.day=in.readInt();
        this.temperature=in.readInt();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeInt(month);
        out.writeInt(day);
        out.writeInt(temperature);
    }

    @Override
    public int compareTo(Tq that) {
        //約定:日期正序
        int y = Integer.compare(this.year,that.getYear());
        if(y == 0){    //年份相同
            int m = Integer.compare(this.month,that.getMonth());
            if(m == 0){    //月份相同
                return Integer.compare(this.day,that.getDay());
            }
            return m;
        }
        return y;
    }
}

  TPartitioner.class

package com.hadoop.mr.tq;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 分區規則設計 使數據分區均衡避免傾斜
 * @author Lindsey
 *
 */
public class TPartitioner extends Partitioner<Tq,IntWritable>{

    @Override
    public int getPartition(Tq key, IntWritable value, int numPartitions) {
        
        return key.getYear() % numPartitions;
    }

}

  TSortComparator.class

package com.hadoop.mr.tq;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TSortComparator extends WritableComparator{
    
    //對字節數據中map排序        需要先將Key反序列化為對象再比較
    public TSortComparator(){
        super(Tq.class,true);    //true是將Tq實例化
    }

    /* 時間正序 、溫度倒序 */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        
        Tq t1 = (Tq) a;
        Tq t2 = (Tq) b;
        
        int y = Integer.compare(t1.getYear(),t2.getYear());
        if(y == 0){
            int m = Integer.compare(t1.getMonth(),t2.getMonth());
            if(m == 0){
                //加上負號實現倒序
                return -Integer.compare(t1.getTemperature(),t2.getTemperature());
            }
            return m;
        }
        return y;
    }
}

  TReducer.class

package com.hadoop.mr.tq;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.shaded.org.glassfish.grizzly.compression.lzma.impl.lz.InWindow;

public class TReducer extends Reducer<Tq, IntWritable, Text,IntWritable>{

    Text rkey = new Text();
    IntWritable rval = new IntWritable();
    /*
     * 相同的Key為一組:Tq
     */
    @Override
    protected void reduce(Tq key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int flg = 0;    //標誌,表示是否已經取了當天的天氣
        int day = 0;
        for(IntWritable v:values){
            if(flg == 0){
                day = key.getDay();
                //設置文本內容 yyyy-mm-dd:temperture
                rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                rval.set(key.getTemperature());
                flg++;
                context.write(rkey, rval);
            }
            
            if(flg!=0 && day!=key.getDay()){
                rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                rval.set(key.getTemperature());
                context.write(rkey, rval);
                break;
            }
        }
    }
}

  TGroupingComparator.class

package com.hadoop.mr.tq;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TGroupingComparator extends WritableComparator{

    public TGroupingComparator() {
        super(Tq.class,true);
    }
    /*
     * 面向Reduce
     * 年月相同為一組  返回0表示為同一組
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        
        Tq t1 = (Tq) a;
        Tq t2 = (Tq) b;
        
        int y = Integer.compare(t1.getYear(),t2.getYear());
        if(y == 0){
            return Integer.compare(t1.getMonth(),t2.getMonth());
        }
        return y;
    }
}

運行結果

  part-r-00000

  技術分享圖片

  part-r-00001

  技術分享圖片

【尚學堂·Hadoop學習】MapReduce案例1--天氣