1. 程式人生 > >(七)MapReduce自定義型別及分割槽演算法

(七)MapReduce自定義型別及分割槽演算法

需求

有以下資料:電話 | 地區 | 姓名 | 使用流量

三個reduce生成三個檔案,按照地區來分割槽,得到每個人使用流量的彙總結果。

13877779999 bj zs 2145
13766668888 sh ls 1028
13766668888 sh ls 9987
13877779999 bj zs 5678
13544445555 sz ww 10577
13877779999 sh zs 2145
13766668888 sh ls 9987

涉及的知識點

  • 編寫Map元件,Reduce元件
  • 編寫自定義的類作為輸入輸出型別
  • 編寫自定義的分割槽類
  • 設定ReduceTask的數量
  • MR會對Mapper輸出key預設做排序

程式碼實現

package hadoop;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

//使用者自定義的javabean,必須實現hadoop的Writable序列化機制
public class JavaBeanDemo implements Writable {
	private String phone;
	private String address;
	private String name;
	private long flow;
	
	//序列化方法
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phone);
		out.writeUTF(address);
		out.writeUTF(name);
		out.writeLong(flow);
	}
		
	//反列化方法
	//反序列化與序列化的順序必須一致
	@Override
	public void readFields(DataInput in) throws IOException {
		this.phone = in.readUTF();
		this.address = in.readUTF();
		this.name = in.readUTF();
		this.flow = in.readLong();
	}

	public String getPhone() {
		return phone;
	}

	public void setPhone(String phone) {
		this.phone = phone;
	}

	public String getAddress() {
		return address;
	}

	public void setAddress(String address) {
		this.address = address;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public long getFlow() {
		return flow;
	}

	public void setFlow(long flow) {
		this.flow = flow;
	}

	@Override
	public String toString() {
		return "JavaBeanDemo [phone=" + phone + ", address=" + address + ", name=" + name + ", flow=" + flow + "]";
	}
	
}
package hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PartitionDemo extends Partitioner<Text, JavaBeanDemo>{

	//根據地點對資料分割槽,分別傳給對應的reduce
	@Override
	public int getPartition(Text key, JavaBeanDemo value, int numPartitions) {
		if(value.getAddress().equals("bj")) {
			return 0;
		}else if(value.getAddress().equals("sh")) {
			return 1;
		}else if(value.getAddress().equals("sz")) {
			return 2;
		}else{
			return 3;
		}
	}

}
package hadoop;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//開發Mapper元件,讓該類繼承Mapper
/*四個泛型的含義:前面兩個泛型型別是固定的,後面兩個是根據需求來決定的
  				 Mapper輸入key型別(key是每行行首偏移量)
				 Mapper輸入value型別(value是每行的內容)
				 Mapper輸出key型別
				 Mapper輸出value型別
*/				 
public class MapperDemo extends Mapper<LongWritable, Text, Text, JavaBeanDemo>{
	
	/*
	 * Mapper元件通過map方法,將輸入key和value傳給開發者
	 * 利用context.write(key, value)可以輸出key和value
	 */
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, JavaBeanDemo>.Context context)
			throws IOException, InterruptedException {
		 
		String line=value.toString();
		String[] data=line.split(" ");
		
		JavaBeanDemo jbd=new JavaBeanDemo();
		jbd.setPhone(data[0]);
		jbd.setAddress(data[1]);
		jbd.setName(data[2]);
		jbd.setFlow(Long.parseLong(data[3]));
		//輸出{名字,javabeandemo物件}
		context.write(new Text(jbd.getName()), jbd);
	}
}
package hadoop;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * 四個泛型:前面兩個對應了Mapper的輸出key,value,後面兩個根據需求決定
 * 			Reducer輸入key型別(值為Mapper輸出key)
 * 			Reducer輸入value型別(值為Mapper輸出value)
 * 			Reducer輸出key型別
 * 			Reducer輸出value型別
 */
public class ReducerDemo extends Reducer<Text, JavaBeanDemo, Text, JavaBeanDemo>{
	
	@Override
	protected void reduce(Text key, Iterable<JavaBeanDemo> values,
			Reducer<Text, JavaBeanDemo, Text, JavaBeanDemo>.Context context) throws IOException, InterruptedException {
		JavaBeanDemo jbd = new JavaBeanDemo();
		
		//把相同的人,使用的流量相加後再封裝
		for(JavaBeanDemo jbdTmp : values){
			jbd.setFlow(jbd.getFlow()+jbdTmp.getFlow());
			jbd.setPhone(jbdTmp.getPhone());
			jbd.setName(jbdTmp.getName());
			jbd.setAddress(jbdTmp.getAddress());
		}
		
		context.write(new Text(key), jbd);
		
	}
}
package hadoop;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class Driver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		//建立MR job物件
		Job job = Job.getInstance(conf);
		
		//設定job的執行主類(main)入口
		job.setJarByClass(Driver.class);
		
		//設定Mapper元件類
		job.setMapperClass(MapperDemo.class);
		
		//設定Mapper的輸出key和value型別
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(JavaBeanDemo.class);
		
		//設定Reducer元件類
		job.setReducerClass(ReducerDemo.class);
		
		//設定Reducer輸出key和value型別
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(JavaBeanDemo.class);
		
		//設定自定義分割槽的類
		job.setPartitionerClass(PartitionDemo.class);
		
		//設定reduce task的數量,如果不設定,預設是一個
		job.setNumReduceTasks(3);
		
		//設定待處理檔案的HDFS路徑(檔案或資料夾)
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.80.100:9000/mrTestDir"));
		
		//設定輸出結果的檔案路徑(該目錄事先不能存在,否則報錯)
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.80.100:9000/mrTestDir/result"));
		
		//提交job
		job.waitForCompletion(true);
	}
}