1. 程式人生 > >結合案例講解MapReduce重要知識點 --------- 多表連線

結合案例講解MapReduce重要知識點 --------- 多表連線


uid	sexid	logindate
1	1	2017-04-17 08:16:20
2   2	2017-04-15 06:18:20
3   1	2017-04-16 05:16:24
4   2	2017-04-14 03:18:20
5   1	2017-04-13 02:16:25
6   2	2017-04-13 01:15:20
7   1	2017-04-12 08:16:34
8   2	2017-04-11 09:16:20
9   0	2017-04-10 05:16:50


0	不知道
1	男
2	女


user uname
1	小紅
2   小行
3   小通
4   小閃
5   小鎮
6   小振
7   小秀
8   小微
9   小懂
10	小明
11  小剛
12  小舉
13  小黑
14  小白
15  小鵬
16  小習


loginuid	 sex		uname	logindate
1		男	            小紅	 2017-04-17 08:16:20
2        女	 			小行	  2017-04-15 06:18:20
3        男	 			小通	  2017-04-16 05:16:24
4        女	 			小閃	  2017-04-14 03:18:20
5        男	 			小鎮	  2017-04-13 02:16:25
6        女	 			小振	  2017-04-13 01:15:20
7        男	 			小秀	  2017-04-12 08:16:34
9       不知道			   小微	2017-04-10 05:16:50
8       女	 			小懂	  2017-04-11 09:16:20




適用場景:有一個或者多個小表 和 一個或者多個大表檔案。



semi join:半連線




import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

 * user 資訊bean
 * @author lyd
public class User implements Writable{

	public String uid;
	public String uname;
	public String gender;
	public String ldt;
	public User(){
	public User(String uid, String uname, String gender, String ldt) {
		this.uid = uid;
		this.uname = uname;
		this.gender = gender;
		this.ldt = ldt;

	public void write(DataOutput out) throws IOException {

	public void readFields(DataInput in) throws IOException {
		this.uid = in.readUTF();
		this.uname = in.readUTF();
		this.gender = in.readUTF();
		this.ldt = in.readUTF();

	 * @return the uid
	public String getUid() {
		return uid;

	 * @param uid the uid to set
	public void setUid(String uid) {
		this.uid = uid;

	 * @return the uname
	public String getUname() {
		return uname;

	 * @param uname the uname to set
	public void setUname(String uname) {
		this.uname = uname;

	 * @return the gender
	public String getGender() {
		return gender;

	 * @param gender the gender to set
	public void setGender(String gender) {
		this.gender = gender;

	 * @return the ldt
	public String getLdt() {
		return ldt;

	 * @param ldt the ldt to set
	public void setLdt(String ldt) {
		this.ldt = ldt;

	/* (non-Javadoc)
	 * @see java.lang.Object#toString()
	public String toString() {
		return uid + "\t" + uname + "\t" + gender + "\t" + ldt;


import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MultipleTableJoin extends ToolRunner implements Tool{

	 * 自定義的myMapper
	 * @author lyd
	static class MyMapper extends Mapper<LongWritable, Text, User, NullWritable>{

		Map<String,String> sexMap = new ConcurrentHashMap<String, String>();
		Map<String,String> userMap = new ConcurrentHashMap<String, String>();
		protected void setup(Context context)throws IOException, InterruptedException {
			Path [] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
			for (Path p : paths) {
				String fileName = p.getName();
				if(fileName.equals("sex")){//讀取 “性別表”
					BufferedReader sb = new BufferedReader(new FileReader(new File(p.toString())));
					String str = null;
					while((str = sb.readLine()) != null){
						String []  strs = str.split("\t");
						sexMap.put(strs[0], strs[1]);
				} else if(fileName.equals("user")){//讀取“使用者表”
					BufferedReader sb = new BufferedReader(new FileReader(new File(p.toString())));
					String str = null;
					while((str = sb.readLine()) != null){
						String []  strs = str.split("\t");
						userMap.put(strs[0], strs[1]);

		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String lines [] = line.split("\t");
			String uid = lines[0];
			String sexid = lines[1];
			String logindate = lines[2];
			if(sexMap.containsKey(sexid) && userMap.containsKey(uid)){
				String uname = userMap.get(uid);
				String gender = sexMap.get(sexid);
				//User user = new User(uid, uname, gender, logindate);
				//context.write(new Text(uid+"\t"+uname+"\t"+gender+"\t"+logindate), NullWritable.get());
				User user = new User(uid, uname, gender, logindate);
				context.write(user, NullWritable.get());

		protected void cleanup(Context context)throws IOException, InterruptedException {
	 * 自定義MyReducer
	 * @author lyd
	/*static class MyReducer extends Reducer<Text, Text, Text, Text>{

		protected void setup(Context context)throws IOException, InterruptedException {
		protected void reduce(Text key, Iterable<Text> value,Context context)
				throws IOException, InterruptedException {
		protected void cleanup(Context context)throws IOException, InterruptedException {
	public void setConf(Configuration conf) {
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");

	public Configuration getConf() {
		return new Configuration();
	 * 驅動方法
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Job job = Job.getInstance(conf, "model01");
		FileInputFormat.addInputPath(job, new Path(args[0]));
		job.addCacheFile(new URI(args[2]));
		job.addCacheFile(new URI(args[3]));
//		URI [] uris = {new URI(args[2]),new URI(args[3])};
//		job.setCacheFiles(uris);
	/*	DistributedCache.addCacheFile(new URI(args[2]), conf);
		DistributedCache.addCacheFile(new URI(args[3]), conf);*/
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[1]))){
			fs.delete(new Path(args[1]), true);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		int isok = job.waitForCompletion(true) ? 0 : 1;
		return isok;
	 * job的主入口
	 * @param args
	public static void main(String[] args) {
		try {
			String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs();
			System.exit(ToolRunner.run(new MultipleTableJoin(), argss));
		} catch (Exception e) {