1. 程式人生 > >map端join的實現 ,用來解決小表中資料的讀取

map端join的實現 ,用來解決小表中資料的讀取

通過閱讀父類Mapper的原始碼,發現 setup方法是在maptask處理資料之前呼叫一次 可以用來做一些初始化工作

1、需求:

訂單資料表t_order:

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

商品資訊表t_product

id

pname

category_id

price

P0001

小米5

1000

2

P0002

錘子T1

1000

3



在訂單後面拼接出商品價格

思路:把商品資訊資料放到map裡,然後通過id 進行查詢讀取然後拼接


mapper類中的setup 在map方法接受資料之前,對資料先進行出力

package cn.itcast.bigdata.mr.mapsidejoin;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MapSideJoin {

	public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
		// 用一個hashmap來載入儲存產品資訊表
		Map<String, String> pdInfoMap = new HashMap<String, String>();

		Text k = new Text();

		/**
		 * 通過閱讀父類Mapper的原始碼,發現 setup方法是在maptask處理資料之前呼叫一次 可以用來做一些初始化工作
		 */
		@Override
		protected void setup(Context context) throws IOException, InterruptedException {
			BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pdts.txt")));
			String line;
			while (StringUtils.isNotEmpty(line = br.readLine())) {
				String[] fields = line.split(",");
				pdInfoMap.put(fields[0], fields[1]);
			}
			br.close();
		}

		// 由於已經持有完整的產品資訊表,所以在map方法中就能實現join邏輯了
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String orderLine = value.toString();
			String[] fields = orderLine.split("\t");
			String pdName = pdInfoMap.get(fields[1]);
			k.set(orderLine + "\t" + pdName);
			context.write(k, NullWritable.get());
		}

	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);

		job.setJarByClass(MapSideJoin.class);

		job.setMapperClass(MapSideJoinMapper.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.setInputPaths(job, new Path("D:/srcdata/mapjoininput"));
		FileOutputFormat.setOutputPath(job, new Path("D:/temp/output"));

		// 指定需要快取一個檔案到所有的maptask執行節點工作目錄
		/* job.addArchiveToClassPath(archive); */// 快取jar包到task執行節點的classpath中
		/* job.addFileToClassPath(file); */// 快取普通檔案到task執行節點的classpath中
		/* job.addCacheArchive(uri); */// 快取壓縮包檔案到task執行節點的工作目錄
		/* job.addCacheFile(uri) */// 快取普通檔案到task執行節點的工作目錄

		// 將產品表文件快取到task工作節點的工作目錄中去
		job.addCacheFile(new URI("file:/D:/srcdata/mapjoincache/pdts.txt"));

		//map端join的邏輯不需要reduce階段,設定reducetask數量為0
		job.setNumReduceTasks(0);
		
		boolean res = job.waitForCompletion(true);
		System.exit(res ? 0 : 1);

	}

}