大資料入門(10)序列化機制,mr流量求和
public class FlowBean implements WritableComparable<FlowBean>{
private String phoneNB;
private long up_flow;
private long d_flow;
private long s_flow;
//在反序列化時,反射機制需要呼叫空參建構函式
public FlowBean() {
}
//為了物件資料的初始化方便,加入一個帶參的建構函式
public FlowBean(String phoneNB, long up_flow, long d_flow) {
this.phoneNB = phoneNB;
this.up_flow = up_flow;
this.d_flow = d_flow;
this.s_flow = up_flow + d_flow;
}
@Override
public String toString() {
return "" + up_flow + "\t" +d_flow + "\t" + s_flow;
}
//從資料流中反序列出物件的資料
//從資料流中讀出物件欄位時,必須跟序列化時的順序保持一致
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.phoneNB = in.readUTF();
this.up_flow = in.readLong();
this.d_flow = in.readLong();
this.s_flow = in.readLong();
}
//將物件資料序列化到流中
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(phoneNB);
out.writeLong(up_flow);
out.writeLong(d_flow);
out.writeLong(s_flow);
}
@Override
public int compareTo(FlowBean o) {
// TODO Auto-generated method stub
return this.s_flow>o.getS_flow()?-1:1;
}
//get/set方法
}
/**********************************************************************************************************************/
/**
* FlowBean 是自定義的一種資料型別,要在hadoop的各個節點之間傳輸,應該遵循hadoop的序列化機制
* 就必須實現hadoop相應的序列化介面
*
*/
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException {
//一行資料
String line = value.toString();
//切分成各個欄位
//String[] fields = StringUtils.split(line,"\t");
String[] fields = line.split("\t");
//需要的欄位
String phoneNB = fields[1];
long u_flow = Long.parseLong(fields[7]);
long d_flow = Long.parseLong(fields[8]);
//封裝資料為kv並輸出
context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
};
}
/*********************************************************************************************************/
//框架每傳遞一組資料<1387788654,{flowbean,flowbean,flowbean,flowbean.....}>呼叫一次我們的reduce方法
//reduce中的業務邏輯就是遍歷values,然後進行累加求和再輸出
public class FlowSumReduce extends Reducer<Text, FlowBean, Text, FlowBean>{
protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException ,InterruptedException {
long up_flow_counter = 0;
long d_flow_counter = 0;
for (FlowBean bean : values){
up_flow_counter+=bean.getUp_flow();
d_flow_counter+=bean.getD_flow();
}
context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));
};
}
/**************************************************************************************************/
public class FlowSumRunner extends Configured implements Tool{
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumRunner.class);
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new FlowSumRunner(),args);
System.exit(res);
}
}