1. 程式人生 > >MapReduce之MapJoin案例

MapReduce之MapJoin案例

@[toc] ## **使用場景** Map Join 適用於一張表十分小、一張表很大的場景。 ## **優點** 思考:在Reduce 端處理過多的表,非常容易產生資料傾斜。怎麼辦? 在Map端快取多張表,提前處理業務邏輯,這樣增加Map 端業務,減少Reduce 端資料的壓力,儘可能的減少資料傾斜。 ## **具體辦法**:採用`DistributedCache` (1)在Mapper的setup階段,將檔案讀取到快取集合中。 (2)在驅動函式中載入快取。 ```java /快取普通檔案到Task執行節點。 job.addCacheFile(new URI("file://e:/cache/pd.txt"); ``` ## 案例 ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200817131815367.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0RURlRf,size_16,color_FFFFFF,t_70#pic_center) 每個MapTask在map()中完成Join ==注意:== - 只需要將要Join的資料order.txt作為切片,讓MapTask讀取 - pd.txt不以切片形式讀入,而直接在MapTask中使用HDFS下載此檔案,下載後,使用輸入流手動讀取其中的資料 - 在map()之前通常是將大檔案以切片形式讀取,小檔案手動讀取! order.txt---->切片(orderId,pid,amount)----JoinMapper.map() pd.txt----->切片(pid,pname)----JoinMapper.map() ## 需求分析 `MapJoin`適用於關聯表中有小表的情形 ## 程式碼實現 JoinBean.java ```java public class JoinBean { private String orderId; private String pid; private String pname; private String amount; @Override public String toString() { return orderId + "\t" + pname + "\t" + amount ; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public String getAmount() { return amount; } public void setAmount(String amount) { this.amount = amount; } } ``` MapJoinMapper.java ```java /* * 1. 在Hadoop中,hadoop為MR提供了分散式快取 * ①用來快取一些Job執行期間的需要的檔案(普通檔案,jar,歸檔檔案(har)) * ②通過在Job的Configuration中,使用uri代替要快取的檔案 * ③分散式快取會假設當前的檔案已經上傳到了HDFS,並且在叢集的任意一臺機器都可以訪問到這個URI所代表的檔案 * ④分散式快取會在每個節點的task執行之前,提前將檔案傳送到節點 * ⑤分散式快取的高效是由於每個Job只會複製一次檔案,且可以自動在從節點對歸檔檔案解歸檔 * * * * */ public class MapJoinMapper extends Mapper{ private JoinBean out_key=new JoinBean(); private Map pdDatas=new HashMap(); //在map之前手動讀取pd.txt中的內容 @Override protected void setup(Mapper.Context context) throws IOException, InterruptedException { //從分散式快取中讀取資料 URI[] files = context.getCacheFiles(); for (URI uri : files) { BufferedReader reader = new BufferedReader(new FileReader(new File(uri))); String line=""; //迴圈讀取pd.txt中的每一行 while(StringUtils.isNotBlank(line=reader.readLine())) { String[] words = line.split("\t"); pdDatas.put(words[0], words[1]); } reader.close(); } } //對切片中order.txt的資料進行join,輸出 @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String[] words = value.toString().split("\t"); out_key.setOrderId(words[0]); out_key.setPname(pdDatas.get(words[1])); out_key.setAmount(words[2]); context.write(out_key, NullWritable.get()); } } ``` MapJoinDriver.java ```java public class MapJoinDriver { public static void main(String[] args) throws Exception { Path inputPath=new Path("e:/mrinput/mapjoin"); Path outputPath=new Path("e:/mroutput/mapjoin"); //作為整個Job的配置 Configuration conf = new Configuration(); //保證輸出目錄不存在 FileSystem fs=FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } // ①建立Job Job job = Job.getInstance(conf); job.setJarByClass(MapJoinDriver.class); // 為Job建立一個名字 job.setJobName("wordcount"); // ②設定Job // 設定Job執行的Mapper,Reducer型別,Mapper,Reducer輸出的key-value型別 job.setMapperClass(MapJoinMapper.class); // 設定輸入目錄和輸出目錄 FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); // 設定分散式快取 job.addCacheFile(new URI("file:///e:/pd.txt")); //取消reduce階段 job.setNumReduceTasks(0); // ③執行Job job.waitForCompletion(true); } } ```