1. 程式人生 > >hadoop[8]-使用自定義類處理資料

hadoop[8]-使用自定義類處理資料

如果處理的資料結構比較複雜,最好自定義一個類來做mapper和reduce,自定義類需要注意的幾點:

  1. 實現org.apache.hadoop.io.Writable介面
  2. 需要提供無參建構函式
  3. 實現介面中的write和readFields方法
  4. 重寫toString方法

這裡以一個流量統計的示例舉例:

pom.xml中需要引入依賴:

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <
artifactId>hadoop-common</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.4.1</version
> </dependency> </dependencies>
View Code

FlowBean:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable{
    private long upflow;
    private long dflow;
    
private long sumflow; // 因為反射機制的需要,必須定義一個無參的建構函式 public FlowBean() {} public FlowBean(long upflow, long dflow) { this.upflow = upflow; this.dflow = dflow; this.sumflow = upflow + dflow; } public long getUpflow() { return upflow; } public void setUpflow(long upflow) { this.upflow = upflow; } public long getDflow() { return dflow; } public void setDflow(long dflow) { this.dflow = dflow; } public long getSumflow() { return sumflow; } public void setSumflow(long sumflow) { this.sumflow = sumflow; } // 序列化方法 將我們要傳輸的資料序列化成位元組流 @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(dflow); } // 反序列化的方法 從資料位元組流中恢復出各個欄位 @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); dflow = in.readLong(); } @Override public String toString() { return upflow + "\t" + dflow + "\t" + sumflow; } }
View Code

FlowCountMapper:

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.commons.lang.StringUtils;

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = StringUtils.split(line, "\t");
        
        String phone = fields[1];
        long upflow = Long.parseLong(fields[fields.length - 3]);
        long dflow = Long.parseLong(fields[fields.length - 2]);
        
        FlowBean bean = new FlowBean(upflow, dflow);
        context.write(new Text(phone), bean);
    }
}
View Code

FlowCountReducer:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean>{
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long upflowsum = 0;
        long dflowsum = 0;

        for (FlowBean value : values) {
            upflowsum += value.getUpflow();
            dflowsum += value.getDflow();
        }

        FlowBean bean = new FlowBean(upflowsum, dflowsum);
        context.write(key, bean);
    }
}
View Code

FlowCountSubmitter:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountSubmitter {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(FlowCountSubmitter.class);

        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}
View Code

之後打包,上傳到00伺服器,在00伺服器上準備好資料目錄和分析結果輸出目錄,並且上傳資料檔案到hadoop的srcdata路徑下

hadoop fs -mkdir -p /flow/output
hadoop fs -mkdir -p /flow/srcdata

執行:hadoop jar hadoop-mapreduce-customer-1.0.jar com.wange.FlowCountSubmitter /flow/srcdata /flow/output,引數分別為:main函式所在路徑、待分析的檔案所在的目錄、分析結果資料的目錄,執行完畢就可以看到結果了。

 

檢視yarn的web管理:http://hadoop-server-00:8088/cluster、hdfs的目錄web管理為:http://hadoop-server-00:50070