map端join的實現 ,用來解決小表中資料的讀取
阿新 • • 發佈:2019-01-24
通過閱讀父類Mapper的原始碼,發現 setup方法是在maptask處理資料之前呼叫一次 可以用來做一些初始化工作
1、需求:
訂單資料表t_order:
id |
date |
pid |
amount |
1001 |
20150710 |
P0001 |
2 |
1002 |
20150710 |
P0001 |
3 |
1002 |
20150710 |
P0002 |
3 |
商品資訊表t_product
id |
pname |
category_id |
price |
P0001 |
小米5 |
1000 |
2 |
P0002 |
錘子T1 |
1000 |
3 |
在訂單後面拼接出商品價格
思路:把商品資訊資料放到map裡,然後通過id 進行查詢讀取然後拼接
mapper類中的setup 在map方法接受資料之前,對資料先進行出力
package cn.itcast.bigdata.mr.mapsidejoin; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MapSideJoin { public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> { // 用一個hashmap來載入儲存產品資訊表 Map<String, String> pdInfoMap = new HashMap<String, String>(); Text k = new Text(); /** * 通過閱讀父類Mapper的原始碼,發現 setup方法是在maptask處理資料之前呼叫一次 可以用來做一些初始化工作 */ @Override protected void setup(Context context) throws IOException, InterruptedException { BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pdts.txt"))); String line; while (StringUtils.isNotEmpty(line = br.readLine())) { String[] fields = line.split(","); pdInfoMap.put(fields[0], fields[1]); } br.close(); } // 由於已經持有完整的產品資訊表,所以在map方法中就能實現join邏輯了 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String orderLine = value.toString(); String[] fields = orderLine.split("\t"); String pdName = pdInfoMap.get(fields[1]); k.set(orderLine + "\t" + pdName); context.write(k, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MapSideJoin.class); job.setMapperClass(MapSideJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("D:/srcdata/mapjoininput")); FileOutputFormat.setOutputPath(job, new Path("D:/temp/output")); // 指定需要快取一個檔案到所有的maptask執行節點工作目錄 /* job.addArchiveToClassPath(archive); */// 快取jar包到task執行節點的classpath中 /* job.addFileToClassPath(file); */// 快取普通檔案到task執行節點的classpath中 /* job.addCacheArchive(uri); */// 快取壓縮包檔案到task執行節點的工作目錄 /* job.addCacheFile(uri) */// 快取普通檔案到task執行節點的工作目錄 // 將產品表文件快取到task工作節點的工作目錄中去 job.addCacheFile(new URI("file:/D:/srcdata/mapjoincache/pdts.txt")); //map端join的邏輯不需要reduce階段,設定reducetask數量為0 job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }