7.大資料學習之旅——hadoop-MapReduce
阿新 • • 發佈:2019-01-12
序列化/反序列化機制
當自定義一個類之後,如果想要產生的物件在hadoop中進行傳輸,那麼需要
這個類實現Writable的介面進行序列化/反序列化
案例:統計每一個人產生的總流量
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Flow implements Writable{
private String phone;
private String city;
private String name;
private int flow;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getFlow() {
return flow;
}
public void setFlow(int flow) {
this.flow = flow;
}
// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
// 按照序列化的順序一個一個將資料讀取出來
this.phone = in.readUTF();
this .city = in.readUTF();
this.name = in.readUTF();
this.flow = in.readInt();
}
// 序列化
@Override
public void write(DataOutput out) throws IOException {
// 按照順序將屬性一個一個的寫出即可
out.writeUTF(phone);
out.writeUTF(city);
out.writeUTF(name);
out.writeInt(flow);
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] arr = line.split(" ");
Flow f = new Flow();
f.setPhone(arr[0]);
f.setCity(arr[1]);
f.setName(arr[2]);
f.setFlow(Integer.parseInt(arr[3]));
context.write(new Text(f.getPhone()), f);
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
public void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
int sum = 0;
String name = null;
for (Flow val : values) {
name = val.getName();
sum += val.getFlow();
}
context.write(new Text(key.toString() + " " + name), new IntWritable(sum));
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.tedu.flow.FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.60.132:9000/mr/flow.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.60.132:9000/flowresult"));
if (!job.waitForCompletion(true))
return;
}
}
練習:統計每一個學生的總成績 — score.txt
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Student implements Writable {
private int month;
private String name;
private int score;
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
@Override
public void readFields(DataInput in) throws IOException {
this.month = in.readInt();
this.name = in.readUTF();
this.score = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(month);
out.writeUTF(name);
out.writeInt(score);
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ScoreMapper extends Mapper<LongWritable, Text, Text, Student> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] arr = line.split(" ");
Student s = new Student();
s.setMonth(Integer.parseInt(arr[0]));
s.setName(arr[1]);
s.setScore(Integer.parseInt(arr[2]));
context.write(new Text(s.getName()), s);
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ScoreReducer extends Reducer<Text, Student, Text, IntWritable> {
public void reduce(Text key, Iterable<Student> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (Student val : values) {
sum += val.getScore();
}
context.write(key, new IntWritable(sum));
}
}
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ScorePatitioner extends Partitioner<Text, Student> {
@Override
public int getPartition(Text key, Student value, int numPartitions) {
int month = value.getMonth();
return month - 1;
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ScoreDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.tedu.score.ScoreDriver.class);
job.setMapperClass(ScoreMapper.class);
job.setReducerClass(ScoreReducer.class);
job.setPartitionerClass(ScorePatitioner.class);
job.setNumReduceTasks(4);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Student.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.60.132:9000/mr/score1"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.60.132:9000/scoreresult"));
if (!job.waitForCompletion(true))
return;
}
}
分割槽 - Partitioner
分割槽操作是shuffle操作中的一個重要過程,作用就是將map的結果按照規則
分發到不同reduce中進行處理,從而按照分割槽得到多個輸出結果。
Partitioner是partitioner的基類,如果需要定製partitioner也需要繼承該類
HashPartitioner是mapreduce的預設partitioner。計算方法是:which
reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
注:預設情況下,reduceTask數量為1
很多時候MR自帶的分割槽規則並不能滿足我們需求,為了實現特定的效果,
可以需要自己來定義分割槽規則。
案例:根據城市區分,來統計每一個城市中每一個人產生的流量
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Flow implements Writable{
private String phone;
private String city;
private String name;
private int flow;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getFlow() {
return flow;
}
public void setFlow(int flow) {
this.flow = flow;
}
// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
// 按照序列化的順序一個一個將資料讀取出來
this.phone = in.readUTF();
this.city = in.readUTF();
this.name = in.readUTF();
this.flow = in.readInt();
}
// 序列化
@Override
public void write(DataOutput out) throws IOException {
// 按照順序將屬性一個一個的寫出即可
out.writeUTF(phone);
out.writeUTF(city);
out.writeUTF(name);
out.writeInt(flow);
}
}
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] arr = line.split(" ");
Flow f = new Flow();
f.setPhone(arr[0]);
f.setCity(arr[1]);
f.setName(arr[2]);
f.setFlow(Integer.parseInt(arr[3]));
context.write(new Text(f.getPhone()), f);
}
}
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowPartitioner extends Partitioner<Text, Flow> {
@Override
public int getPartition(Text k