1. 程式人生 > >MapReduce學習筆記之資料連線(六)

MapReduce學習筆記之資料連線(六)

1. Map側連線

Map端join是指資料到達map處理函式之前進行合併的,效率要遠遠高於Reduce端join,因為Reduce端join是把所有的資料都經過Shuffle,非常消耗資源。


注意:在Map端join操作中,我們往往將較小的表新增到記憶體中,因為記憶體的資源是很寶貴的,這也說明了另外一個問題,那就是如果表的資料量都非常大則不適合使用Map端join。

1.1 基本思路

  1. 需要join的兩個檔案,一個儲存在HDFS中,一個在作業提交前,使用Job.addCacheFile(URI uri)將需要join的另外一個檔案加入到所有Map快取中;
  2. 在Mapper.setup(Context context)函式裡讀取該檔案;
  3. 在Mapper.map(KEYIN key, VALUEIN value, Context context)進行join;
  4. 將結果輸出(即沒有Reduce任務)。

1.2 示例

public class ProvinceMapJoinStatistics {
    public static class ProvinceLeftJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

        private String provinceWithProduct = "";

        /**
         * 載入快取檔案
         */
@Override protected void setup(Context context) throws IOException, InterruptedException { URI[] uri = context.getCacheFiles(); if (uri == null || uri.length == 0) { return; } for (URI p : uri) { if (p.toString().endsWith("part-r-00000"
)) { // 讀快取檔案 try { provinceWithProduct = HdfsUtil.read(new Configuration(), p.toString()); } catch (Exception e) { e.printStackTrace(); } } } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (!provinceWithProduct.contains(value.toString() .substring(0, 2))) { context.write(value, NullWritable.get()); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 3) { System.err.println("Usage: <in> [<in>...] <out>"); System.exit(2); } HdfsUtil.rmr(conf, otherArgs[otherArgs.length - 1]); Job job = Job.getInstance(conf, "ProvinceMapJoinStatistics"); job.setJarByClass(ProvinceMapJoinStatistics.class); // 設定快取檔案 job.addCacheFile(new Path(args[1]).toUri()); job.setMapperClass(ProvinceLeftJoinMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); if (job.waitForCompletion(true)) { HdfsUtil.cat(conf, otherArgs[2] + "/part-r-00000"); System.out.println("success"); } else { System.out.println("fail"); } } }

2. Reduce側連線

Reduce端連線比Map端連線更為普遍,因為輸入的資料不需要特定的結構,但是效率比較低,因為所有資料都必須經過Shuffle過程。

2.1 基本思路

  1. Map端讀取所有的檔案,並在輸出的內容里加上標示,代表資料是從哪個檔案裡來的。
  2. 在reduce處理函式中,按照標識對資料進行處理。
  3. 然後根據Key去join來求出結果直接輸出。

2.2 示例

public class ReduceJoinDemo {

    public static class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // 獲取輸入記錄的字串
            String line = value.toString();

            // 拋棄空記錄
            if (line == null || line.equals("")) {
                return;
            }

            // 獲取輸入檔案的全路徑和名稱
            FileSplit fileSplit = (FileSplit) context.getInputSplit();
            String path = fileSplit.getPath().toString();

            //處理來自tb_a表的記錄
            if (path.contains("province.txt")) {
                context.write(new Text(line.substring(0, 2)), new Text("a#" + line));
            } else if (path.contains("part-r-00000")) {
                context.write(new Text(line.substring(0, 2)), new Text("b#"));
            }
        }
    }

    public static class ReduceJoinReducer extends Reducer<Text, Text, Text, NullWritable> {

        // province.txt存在, part-r-00000不存在的資料
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            int count = 0;
            String province = "";
            for (Text value : values) {
                count++;
                String str = value.toString();
                if (str.startsWith("a#")) {
                    province = str.substring(2);
                }
            }

            if (count == 1) {
                context.write(new Text(province), NullWritable.get());
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 3) {
            System.err.println("Usage: <in> [<in>...] <out>");
            System.exit(2);
        }

        HdfsUtil.rmr(conf, otherArgs[otherArgs.length - 1]);

        Job job = Job.getInstance(conf, ReduceJoinDemo.class.getSimpleName());
        job.setJarByClass(ReduceJoinDemo.class);

        job.setMapperClass(ReduceJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(ReduceJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]), new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        if (job.waitForCompletion(true)) {
            HdfsUtil.cat(conf, otherArgs[2] + "/part-r-00000");
            System.out.println("success");
        } else {
            System.out.println("fail");
        }
    }
}

3. SemiJoin

SemiJoin就是所謂的半連線,其實仔細一看就是reduce join的一個變種,就是在map端過濾掉一些資料,在網路中只傳輸參與連線的資料不參與連線的資料不必在網路中進行傳輸,從而減少了shuffle的網路傳輸量,使整體效率得到提高,其他思想和reduce join是一模一樣的。說得更加接地氣一點就是將小表中參與join的key單獨抽出來通過DistributedCach分發到相關節點,然後將其取出放到記憶體中(可以放到HashSet中),在map階段掃描連線表,將join key不在記憶體HashSet中的記錄過濾掉,讓那些參與join的記錄通過shuffle傳輸到reduce端進行join操作,其他的和reduce join都是一樣的。

4. 參考

《精通Hadoop》 [印] Sandeep Karanth著 劉淼等譯