hadoop用java API實現mapreduce示例
阿新 • • 發佈:2018-12-22
自定義資料型別bean
package org.hadoop.total; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /* * FlowBean要在節點傳輸,要符合hadoop的序列號機制,實現Writable介面 * */ public class FlowBean implements Writable { // 上行流量 private long down_flow; // 下行流量 private long up_flow; // 總 private long total; // 電話號碼 private String phone; public FlowBean() { //為了反射,沒有建構函式的時候不用寫,如果有別的建構函式這裡必須寫 } public FlowBean(String phone, long up_flow, long down_flow) { this.phone = phone; this.up_flow = up_flow; this.down_flow = down_flow; this.total = up_flow + down_flow; } public long getDown_flow() { return down_flow; } public long getUp_flow() { return up_flow; } public long getTotal() { return total; } //資料寫入輸出流 public void write(DataOutput out) throws IOException { // 寫進位元組陣列 out.writeUTF(phone); out.writeLong(up_flow); out.writeLong(down_flow); out.writeLong(total); } //讀取要傳遞的資料,讀取的順序要和寫的一致,先進先出 public void readFields(DataInput in) throws IOException { // 讀出位元組陣列 phone = in.readUTF(); up_flow = in.readLong(); down_flow = in.readLong(); total = in.readLong(); } @Override public String toString() { return ""+up_flow+"-"+down_flow+"-"+total; } }
map程式
package org.hadoop.total; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* * 輸入key:LongWritable,起始偏移量 * 輸入value:Text,每行的文字 * 輸出key:Text,電話號碼 * 輸出value:bean * * */ public class FlowSunMapper extends Mapper<LongWritable,Text,Text,FlowBean> { // map方法會自動傳人一行資料value // key是輸入的key // context是輸出用的封裝類 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String string = value.toString(); // 獲得一行資料 String[] arr = StringUtils.split(string, "\t"); String phonenum = arr[1]; long u_flow = Long.parseLong(arr[7]); long d_flow = Long.parseLong(arr[8]); //map的資料要用context封裝 context.write(new Text(phonenum), new FlowBean(phonenum,u_flow,d_flow)); } }
reduce方法
package org.hadoop.total; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowSunReducer extends Reducer<Text,FlowBean,Text,FlowBean> { // key是鍵值 // values 是{flowbean,flowbean,flowbean,flowbean,flowbean} @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException,InterruptedException { long down_flow_counte = 0; long up_flow_counte = 0; for (FlowBean bean:values) { up_flow_counte = up_flow_counte+bean.getUp_flow(); down_flow_counte= down_flow_counte+bean.getDown_flow(); } context.write(key, new FlowBean(key.toString(),up_flow_counte,down_flow_counte)); } }
主方法
package org.hadoop.total;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowSunRuner extends Configured implements Tool {
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop101:9000/");
// 建立一個job
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSunRuner.class);
//設定map
job.setMapperClass(FlowSunMapper.class);
//設定reduce
job.setReducerClass(FlowSunReducer.class);
//設定輸入
job.setMapOutputValueClass(FlowBean.class);
job.setMapOutputKeyClass(Text.class);
//設定輸出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
System.out.println(args);
//給定輸入
FileInputFormat.setInputPaths(job,new Path("hdfs://192.168.117.101:9000/HTTP_20130313143750.dat"));
//給定輸出
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.117.101:9000/out/test/"));
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception
{
int run = ToolRunner.run(new Configuration(), new FlowSunRuner(), args);
}
}