1. 程式人生 > >7.大資料學習之旅——hadoop-MapReduce

7.大資料學習之旅——hadoop-MapReduce

序列化/反序列化機制

當自定義一個類之後,如果想要產生的物件在hadoop中進行傳輸,那麼需要
這個類實現Writable的介面進行序列化/反序列化
案例:統計每一個人產生的總流量

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class Flow implements Writable{

	private String phone;
	private String city;
private String name; private int flow; public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getName() { return name;
} public void setName(String name) { this.name = name; } public int getFlow() { return flow; } public void setFlow(int flow) { this.flow = flow; } // 反序列化 @Override public void readFields(DataInput in) throws IOException { // 按照序列化的順序一個一個將資料讀取出來 this.phone = in.readUTF(); this
.city = in.readUTF(); this.name = in.readUTF(); this.flow = in.readInt(); } // 序列化 @Override public void write(DataOutput out) throws IOException { // 按照順序將屬性一個一個的寫出即可 out.writeUTF(phone); out.writeUTF(city); out.writeUTF(name); out.writeInt(flow); } }
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> {

	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		
		String line = value.toString();
		
		String[] arr = line.split(" ");
		
		Flow f = new Flow();
		f.setPhone(arr[0]);
		f.setCity(arr[1]);
		f.setName(arr[2]);
		f.setFlow(Integer.parseInt(arr[3]));
		
		context.write(new Text(f.getPhone()), f);
		
	}

}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, Flow, Text, IntWritable> {

	public void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
		
		int sum = 0;
		String name = null;
		for (Flow val : values) {
			name = val.getName();
			sum += val.getFlow();
		}
		
		context.write(new Text(key.toString() + " " + name), new IntWritable(sum));
	}

}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "JobName");
		job.setJarByClass(cn.tedu.flow.FlowDriver.class);
		job.setMapperClass(FlowMapper.class);
		job.setReducerClass(FlowReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Flow.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.60.132:9000/mr/flow.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.60.132:9000/flowresult"));

		if (!job.waitForCompletion(true))
			return;
	}

}

練習:統計每一個學生的總成績 — score.txt

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class Student implements Writable {

	private int month;
	private String name;
	private int score;

	public int getMonth() {
		return month;
	}

	public void setMonth(int month) {
		this.month = month;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getScore() {
		return score;
	}

	public void setScore(int score) {
		this.score = score;
	}

	@Override
	public void readFields(DataInput in) throws IOException {

		this.month = in.readInt();
		this.name = in.readUTF();
		this.score = in.readInt();

	}

	@Override
	public void write(DataOutput out) throws IOException {

		out.writeInt(month);
		out.writeUTF(name);
		out.writeInt(score);

	}

}

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ScoreMapper extends Mapper<LongWritable, Text, Text, Student> {

	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		String line = value.toString();

		String[] arr = line.split(" ");

		Student s = new Student();
		s.setMonth(Integer.parseInt(arr[0]));
		s.setName(arr[1]);
		s.setScore(Integer.parseInt(arr[2]));

		context.write(new Text(s.getName()), s);

	}

}

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class ScoreReducer extends Reducer<Text, Student, Text, IntWritable> {

	public void reduce(Text key, Iterable<Student> values, Context context) throws IOException, InterruptedException {
		int sum = 0;
		
		for (Student val : values) {
			sum += val.getScore();
		}
		
		context.write(key, new IntWritable(sum));
	}

}

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ScorePatitioner extends Partitioner<Text, Student> {

	@Override
	public int getPartition(Text key, Student value, int numPartitions) {

		int month = value.getMonth();

		return month - 1;
	}

}

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ScoreDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "JobName");
		job.setJarByClass(cn.tedu.score.ScoreDriver.class);
		job.setMapperClass(ScoreMapper.class);
		job.setReducerClass(ScoreReducer.class);
		
		job.setPartitionerClass(ScorePatitioner.class);
		job.setNumReduceTasks(4);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Student.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.60.132:9000/mr/score1"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.60.132:9000/scoreresult"));

		if (!job.waitForCompletion(true))
			return;
	}

}

分割槽 - Partitioner

在這裡插入圖片描述

分割槽操作是shuffle操作中的一個重要過程,作用就是將map的結果按照規則
分發到不同reduce中進行處理,從而按照分割槽得到多個輸出結果。
Partitioner是partitioner的基類,如果需要定製partitioner也需要繼承該類
HashPartitioner是mapreduce的預設partitioner。計算方法是:which
reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
注:預設情況下,reduceTask數量為1
很多時候MR自帶的分割槽規則並不能滿足我們需求,為了實現特定的效果,
可以需要自己來定義分割槽規則。

案例:根據城市區分,來統計每一個城市中每一個人產生的流量

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class Flow implements Writable{

	private String phone;
	private String city;
	private String name;
	private int flow;

	public String getPhone() {
		return phone;
	}

	public void setPhone(String phone) {
		this.phone = phone;
	}

	public String getCity() {
		return city;
	}

	public void setCity(String city) {
		this.city = city;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getFlow() {
		return flow;
	}

	public void setFlow(int flow) {
		this.flow = flow;
	}

	// 反序列化
	@Override
	public void readFields(DataInput in) throws IOException {
		// 按照序列化的順序一個一個將資料讀取出來
		this.phone = in.readUTF();
		this.city = in.readUTF();
		this.name = in.readUTF();
		this.flow = in.readInt();
	}

	// 序列化
	@Override
	public void write(DataOutput out) throws IOException {
		// 按照順序將屬性一個一個的寫出即可
		out.writeUTF(phone);
		out.writeUTF(city);
		out.writeUTF(name);
		out.writeInt(flow);
	}

}

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> {

	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		String line = value.toString();

		String[] arr = line.split(" ");

		Flow f = new Flow();
		f.setPhone(arr[0]);
		f.setCity(arr[1]);
		f.setName(arr[2]);
		f.setFlow(Integer.parseInt(arr[3]));

		context.write(new Text(f.getPhone()), f);

	}

}
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowPartitioner extends Partitioner<Text, Flow> {

	@Override
	public int getPartition(Text k