1. 程式人生 > >hadoop用java API實現mapreduce示例

hadoop用java API實現mapreduce示例

自定義資料型別bean

package org.hadoop.total;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/*
 * FlowBean要在節點傳輸,要符合hadoop的序列號機制,實現Writable介面
 * */
public class FlowBean implements Writable {
    //    上行流量
    private long down_flow;
    //    下行流量
    private long up_flow;
    //  總
    private long total;
    //    電話號碼
    private String phone;

    public FlowBean() {
        //為了反射,沒有建構函式的時候不用寫,如果有別的建構函式這裡必須寫
    }


    public FlowBean(String phone, long up_flow, long down_flow) {
        this.phone = phone;
        this.up_flow = up_flow;
        this.down_flow = down_flow;
        this.total = up_flow + down_flow;
    }


    public long getDown_flow() {
        return down_flow;
    }

    public long getUp_flow() {
        return up_flow;
    }

    public long getTotal() {
        return total;
    }

    //資料寫入輸出流
    public void write(DataOutput out) throws IOException {
// 寫進位元組陣列
        out.writeUTF(phone);
        out.writeLong(up_flow);
        out.writeLong(down_flow);
        out.writeLong(total);
    }

    //讀取要傳遞的資料,讀取的順序要和寫的一致,先進先出
    public void readFields(DataInput in) throws IOException {
// 讀出位元組陣列
        phone = in.readUTF();
        up_flow = in.readLong();
        down_flow = in.readLong();
        total = in.readLong();
    }

    @Override
    public String toString() {
        return ""+up_flow+"-"+down_flow+"-"+total;
    }

}

map程式

package org.hadoop.total;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
* 輸入key:LongWritable,起始偏移量
* 輸入value:Text,每行的文字
* 輸出key:Text,電話號碼
* 輸出value:bean
*
* */
public class FlowSunMapper extends Mapper<LongWritable,Text,Text,FlowBean> {
//    map方法會自動傳人一行資料value
//    key是輸入的key
//    context是輸出用的封裝類
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String string = value.toString();  // 獲得一行資料
        String[] arr = StringUtils.split(string, "\t");
        String phonenum = arr[1];
        long u_flow = Long.parseLong(arr[7]);
        long d_flow = Long.parseLong(arr[8]);
        //map的資料要用context封裝
        context.write(new Text(phonenum), new FlowBean(phonenum,u_flow,d_flow));

    }
}

reduce方法

package org.hadoop.total;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class FlowSunReducer extends Reducer<Text,FlowBean,Text,FlowBean> {

    // key是鍵值
//  values 是{flowbean,flowbean,flowbean,flowbean,flowbean}
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException,InterruptedException
    {
        long down_flow_counte = 0;
        long up_flow_counte = 0;

        for (FlowBean bean:values)
        {
            up_flow_counte = up_flow_counte+bean.getUp_flow();
            down_flow_counte= down_flow_counte+bean.getDown_flow();
        }
        context.write(key, new FlowBean(key.toString(),up_flow_counte,down_flow_counte));
    }
}

主方法

package org.hadoop.total;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FlowSunRuner extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop101:9000/");
        // 建立一個job
        Job job = Job.getInstance(conf);
        job.setJarByClass(FlowSunRuner.class);
        //設定map
        job.setMapperClass(FlowSunMapper.class);
        //設定reduce
        job.setReducerClass(FlowSunReducer.class);
        //設定輸入
        job.setMapOutputValueClass(FlowBean.class);
        job.setMapOutputKeyClass(Text.class);
        //設定輸出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        System.out.println(args);
        //給定輸入
        FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.117.101:9000/HTTP_20130313143750.dat"));
        //給定輸出
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.117.101:9000/out/test/"));
        
        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception
    {
        int run = ToolRunner.run(new Configuration(), new FlowSunRuner(), args);
    }
}