1. 程式人生 > >結合案例講解MapReduce重要知識點 --------- 多表連線

結合案例講解MapReduce重要知識點 --------- 多表連線

第一張表的內容:

login:
uid	sexid	logindate
1	1	2017-04-17 08:16:20
2   2	2017-04-15 06:18:20
3   1	2017-04-16 05:16:24
4   2	2017-04-14 03:18:20
5   1	2017-04-13 02:16:25
6   2	2017-04-13 01:15:20
7   1	2017-04-12 08:16:34
8   2	2017-04-11 09:16:20
9   0	2017-04-10 05:16:50

第二張表的內容:

sex:
0	不知道
1	男
2	女

第三張表的內容:

user uname
1	小紅
2   小行
3   小通
4   小閃
5   小鎮
6   小振
7   小秀
8   小微
9   小懂
10	小明
11  小剛
12  小舉
13  小黑
14  小白
15  小鵬
16  小習

最終輸出效果:

loginuid	 sex		uname	logindate
1		男	            小紅	 2017-04-17 08:16:20
2        女	 			小行	  2017-04-15 06:18:20
3        男	 			小通	  2017-04-16 05:16:24
4        女	 			小閃	  2017-04-14 03:18:20
5        男	 			小鎮	  2017-04-13 02:16:25
6        女	 			小振	  2017-04-13 01:15:20
7        男	 			小秀	  2017-04-12 08:16:34
9       不知道			   小微	2017-04-10 05:16:50
8       女	 			小懂	  2017-04-11 09:16:20

思路:

map端join:map端join

核心思想:將小表文件快取到分散式快取中,然後再map端進行連線處理。

適用場景:有一個或者多個小表 和 一個或者多個大表檔案。

優點:map端使用記憶體快取小表資料,載入速度快;大大減少map端到reduce端的傳輸量;大大較少shuffle過程耗時。

缺點:解決的業務需要有小表。

semi join:半連線

解決map端的缺點,當多個大檔案同時存在,且一個大檔案中有效資料抽取出來是小檔案時,

則可以單獨抽取出來並快取到分散式快取中,然後再使用map端join來進行連線。

自定義一個writable類User

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

/**
 * user 資訊bean
 * @author lyd
 *
 */
public class User implements Writable{

	public String uid;
	public String uname;
	public String gender;
	public String ldt;
	
	public User(){
		
	}
	
	public User(String uid, String uname, String gender, String ldt) {
		this.uid = uid;
		this.uname = uname;
		this.gender = gender;
		this.ldt = ldt;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(uid);
		out.writeUTF(uname);
		out.writeUTF(gender);
		out.writeUTF(ldt);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.uid = in.readUTF();
		this.uname = in.readUTF();
		this.gender = in.readUTF();
		this.ldt = in.readUTF();
	}

	/**
	 * @return the uid
	 */
	public String getUid() {
		return uid;
	}

	/**
	 * @param uid the uid to set
	 */
	public void setUid(String uid) {
		this.uid = uid;
	}

	/**
	 * @return the uname
	 */
	public String getUname() {
		return uname;
	}

	/**
	 * @param uname the uname to set
	 */
	public void setUname(String uname) {
		this.uname = uname;
	}

	/**
	 * @return the gender
	 */
	public String getGender() {
		return gender;
	}

	/**
	 * @param gender the gender to set
	 */
	public void setGender(String gender) {
		this.gender = gender;
	}

	/**
	 * @return the ldt
	 */
	public String getLdt() {
		return ldt;
	}

	/**
	 * @param ldt the ldt to set
	 */
	public void setLdt(String ldt) {
		this.ldt = ldt;
	}

	/* (non-Javadoc)
	 * @see java.lang.Object#toString()
	 */
	@Override
	public String toString() {
		return uid + "\t" + uname + "\t" + gender + "\t" + ldt;
	}
}

MapReduce類MultipleTableJoin

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MultipleTableJoin extends ToolRunner implements Tool{

	/**
	 * 自定義的myMapper
	 * @author lyd
	 *
	 */
	static class MyMapper extends Mapper<LongWritable, Text, User, NullWritable>{

		Map<String,String> sexMap = new ConcurrentHashMap<String, String>();
		Map<String,String> userMap = new ConcurrentHashMap<String, String>();
		
		//讀取快取檔案
		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
			Path [] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
			for (Path p : paths) {
				String fileName = p.getName();
				if(fileName.equals("sex")){//讀取 “性別表”
					BufferedReader sb = new BufferedReader(new FileReader(new File(p.toString())));
					String str = null;
					while((str = sb.readLine()) != null){
						String []  strs = str.split("\t");
						sexMap.put(strs[0], strs[1]);
					}
					sb.close();
				} else if(fileName.equals("user")){//讀取“使用者表”
					BufferedReader sb = new BufferedReader(new FileReader(new File(p.toString())));
					String str = null;
					while((str = sb.readLine()) != null){
						String []  strs = str.split("\t");
						userMap.put(strs[0], strs[1]);
					}
					sb.close();
				}
			}
		}

		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
			String line = value.toString();
			String lines [] = line.split("\t");
			String uid = lines[0];
			String sexid = lines[1];
			String logindate = lines[2];
			
			//join連線操作
			if(sexMap.containsKey(sexid) && userMap.containsKey(uid)){
				String uname = userMap.get(uid);
				String gender = sexMap.get(sexid);
				//User user = new User(uid, uname, gender, logindate);
				//context.write(new Text(uid+"\t"+uname+"\t"+gender+"\t"+logindate), NullWritable.get());
				User user = new User(uid, uname, gender, logindate);
				context.write(user, NullWritable.get());
			}	
		}

		@Override
		protected void cleanup(Context context)throws IOException, InterruptedException {
		}
	}
	
	/**
	 * 自定義MyReducer
	 * @author lyd
	 *
	 */
	/*static class MyReducer extends Reducer<Text, Text, Text, Text>{

		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
		}
		
		@Override
		protected void reduce(Text key, Iterable<Text> value,Context context)
				throws IOException, InterruptedException {
		}
		
		@Override
		protected void cleanup(Context context)throws IOException, InterruptedException {
		}
	}*/
	
	
	@Override
	public void setConf(Configuration conf) {
		conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
	}

	@Override
	public Configuration getConf() {
		return new Configuration();
	}
	
	/**
	 * 驅動方法
	 */
	@Override
	public int run(String[] args) throws Exception {
		//1、獲取conf物件
		Configuration conf = getConf();
		//2、建立job
		Job job = Job.getInstance(conf, "model01");
		//3、設定執行job的class
		job.setJarByClass(MultipleTableJoin.class);
		//4、設定map相關屬性
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(User.class);
		job.setMapOutputValueClass(NullWritable.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		
		//設定快取檔案  
		job.addCacheFile(new URI(args[2]));
		job.addCacheFile(new URI(args[3]));
		
//		URI [] uris = {new URI(args[2]),new URI(args[3])};
//		job.setCacheFiles(uris);
		
	/*	DistributedCache.addCacheFile(new URI(args[2]), conf);
		DistributedCache.addCacheFile(new URI(args[3]), conf);*/
		
		/*//5、設定reduce相關屬性
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);*/
		//判斷輸出目錄是否存在,若存在則刪除
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[1]))){
			fs.delete(new Path(args[1]), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//6、提交執行job
		int isok = job.waitForCompletion(true) ? 0 : 1;
		return isok;
	}
	
	/**
	 * job的主入口
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			//對輸入引數作解析
			String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs();
			System.exit(ToolRunner.run(new MultipleTableJoin(), argss));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}