1. 程式人生 > >大資料_Shuffle、MapReduce程式設計案例(資料去重、多表查詢、倒排索引、使用單元測試)

大資料_Shuffle、MapReduce程式設計案例(資料去重、多表查詢、倒排索引、使用單元測試)



一、什麼是Shuffle(洗牌) ----> MapReduce核心
    1、序列化
    2、排序
    3、分割槽
    4、合併


二、MapReduce程式設計案例 ------> 掌握方法:如何開發一個程式
    1、資料去重:
        複習:SQL:distinct去掉重複的資料
                           作用於後面所有的列

    複習(學習):Oracle中的多表查詢
        根據連線條件的不同:
        (1)等值連線
        (2)不等值連線
        (3)外連結
        (4
)自連線 注意:(1)多表查詢和子查詢,儘量使用哪個?(多表查詢) (2)一般:多表查詢的表的個數<=3 如果超過5個表,效能就差 2、多表查詢:等值連線 查詢員工資訊:部門名稱、員工姓名 select d.dname,e.ename from emp e,dept d where e.deptno=d.deptno; 問題:如果實現三張表?? 3、多表查詢:自連線: 通過表的別名,將同一張表看成多張表 查詢員工資訊:老闆姓名 員工的姓名 select b.ename
,e.ename from emp b,emp e where b.empno=e.mgr; 注意:產生的笛卡爾積= 原表的平方 ----> 自連線操作不適合操作大表 更好的方式:在Oracle中,使用層次查詢(樹)來取代自連線 4、倒排索引 5、使用單元測試:MRUnit

這裡寫圖片描述

去重

package demo.mr.distinct;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class DistinctMain { public static void main(String[] args) throws Exception { // 建立一個job:job = map + reduce Job job = Job.getInstance(new Configuration()); //指定任務的入口 job.setJarByClass(DistinctMain.class); //指定任務的Mapper和輸出的資料型別: k2 v2 job.setMapperClass(DistinctMapper.class); job.setMapOutputKeyClass(Text.class); //指定k2 job.setMapOutputValueClass(NullWritable.class); //指定v2 //指定任務的Reducer和輸出的資料型別: k4 v4 job.setReducerClass(DistinctReducer.class); job.setOutputKeyClass(Text.class); //指定k4 job.setOutputValueClass(NullWritable.class); //指定v4 //指定輸入的路徑(map)、輸出的路徑(reduce) FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //執行任務 job.waitForCompletion(true); } }
package demo.mr.distinct;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        // 資料  7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
        String data = value1.toString();

        //分詞
        String[] words = data.split(",");

        //將job作為key2進行輸出
        context.write(new Text(words[2]), NullWritable.get());
    }

}
package demo.mr.distinct;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text k3, Iterable<NullWritable> v3,Context context) throws IOException, InterruptedException {
        context.write(k3, NullWritable.get());
    }
}

笛卡爾積
這裡寫圖片描述

使用MR實現等值連線

這裡寫圖片描述

程式碼實現

package demo.mr.multitable;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MultiTableQueryMain {

    public static void main(String[] args) throws Exception {
        // 建立一個job:job = map + reduce
        Job job = Job.getInstance(new Configuration());

        //指定任務的入口
        job.setJarByClass(MultiTableQueryMain.class);

        //指定任務的Mapper和輸出的資料型別: k2  v2
        job.setMapperClass(MultiTableQueryMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);    //指定k2
        job.setMapOutputValueClass(Text.class);  //指定v2

        //指定任務的Reducer和輸出的資料型別: k4 v4
        job.setReducerClass(MultiTableQueryReducer.class);
        job.setOutputKeyClass(Text.class);   //指定k4
        job.setOutputValueClass(Text.class);   //指定v4

        //指定輸入的路徑(map)、輸出的路徑(reduce)
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //執行任務
        job.waitForCompletion(true);        

    }
}

package demo.mr.multitable;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MultiTableQueryMapper extends Mapper<LongWritable, Text, IntWritable, Text> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        // 資料:可能是部門、也可能是員工
        String data = value1.toString();

        //分詞
        String[] words = data.split(",");

        //判斷陣列的長度
        if(words.length == 3){
            //部門表:部門號  部門名稱
            context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*" + words[1]));
        }else{
            //員工表:員工的部門號  姓名
            context.write(new IntWritable(Integer.parseInt(words[7])), new Text(words[1]));
        }
    }
}
package demo.mr.multitable;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MultiTableQueryReducer extends Reducer<IntWritable, Text, Text, Text> {

    @Override
    protected void reduce(IntWritable k3, Iterable<Text> v3, Context context)
            throws IOException, InterruptedException {
        //定義變數:儲存 部門名稱 和 員工姓名
        String dname = "";
        String empNameList = "";

        for(Text v:v3){
            String str = v.toString();

            //找到*號的位置
            int index = str.indexOf("*");
            if(index >= 0){
                //代表的是部門名稱
                dname = str.substring(1);
            }else{
                //代表的就是員工姓名
                empNameList = str + ";" + empNameList;
            }
        }

        //輸出
        context.write(new Text(dname), new Text(empNameList));
    }
}

這裡寫圖片描述

自連線程式碼實現

package demo.mr.selfjoin;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SelfJoinMain {

    public static void main(String[] args) throws Exception {
        // 建立一個job:job = map + reduce
        Job job = Job.getInstance(new Configuration());

        //指定任務的入口
        job.setJarByClass(SelfJoinMain.class);

        //指定任務的Mapper和輸出的資料型別: k2  v2
        job.setMapperClass(SelfJoinMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);    //指定k2
        job.setMapOutputValueClass(Text.class);  //指定v2

        //指定任務的Reducer和輸出的資料型別: k4 v4
        job.setReducerClass(SelfJoinReducer.class);
        job.setOutputKeyClass(Text.class);   //指定k4
        job.setOutputValueClass(Text.class);   //指定v4

        //指定輸入的路徑(map)、輸出的路徑(reduce)
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //執行任務
        job.waitForCompletion(true);
    }

}
package demo.mr.selfjoin;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SelfJoinMapper extends Mapper<LongWritable, Text, IntWritable, Text> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        // 資料:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
        String data = value1.toString();

        //分詞
        String[] words = data.split(",");

        //輸出
        //1. 作為老闆表
        context.write(new IntWritable(Integer.parseInt(words[0])), new Text("*"+words[1]));

        //2. 作為員工表
        try{
            context.write(new IntWritable(Integer.parseInt(words[3])), new Text(words[1]));
        }catch(Exception ex){
            //如果產生了Exception,表示:大老闆
            context.write(new IntWritable(-1), new Text(words[1]));
        }
    }

}
package demo.mr.selfjoin;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SelfJoinReducer extends Reducer<IntWritable, Text, Text, Text> {

    @Override
    protected void reduce(IntWritable k3, Iterable<Text> v3, Context context)
            throws IOException, InterruptedException {
        //定義變數:老闆姓名  員工姓名
        String bossName = "";
        String empNameList = "";

        for(Text t:v3){
            String str = t.toString();

            //判斷是否存在*號
            int index = str.indexOf("*");
            if(index >=0 ){
                //表示:老闆姓名
                bossName = str.substring(1);
            }else{
                //員工的姓名
                empNameList = str + ";" + empNameList;
            }
        }

        //輸出
        //判斷:如果存在老闆和員工  才輸出
        if(bossName.length() > 0 && empNameList.length() > 0 )
            context.write(new Text(bossName), new Text(empNameList));
    }
}

這裡寫圖片描述

程式碼實現

package demo.revertedindex;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class RevertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        //得到資料來自哪個檔案:  /indexdata/data01.txt
        String path = ((FileSplit)context.getInputSplit()).getPath().toString();

        //得到最後一個斜線的位置
        int index = path.lastIndexOf("/");
        //得到檔名
        String fileName = path.substring(index + 1);

        //資料:I love Beijing *******
        String data = value1.toString();
        String[] words = data.split(" ");

        //輸出
        for(String w:words){
            context.write(new Text(w+":"+fileName), new Text("1"));
        }
    }

}
package demo.revertedindex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RevertedIndexReducer extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text k3, Iterable<Text> v3, Context context)
            throws IOException, InterruptedException {
        // 對combiner資料value,拼加
        String str = "";

        for(Text t:v3){
            str = "("+t.toString()+")" + str;
        }

        //輸出
        context.write(k3, new Text(str));
    }

}
package demo.revertedindex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class RevertedIndexCombiner extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text k21, Iterable<Text> v21, Context context)
            throws IOException, InterruptedException {
        // 求和:對同一個檔案中的某個單詞進行求和
        int total = 0;
        for(Text v:v21){
            total = total + Integer.parseInt(v.toString());
        }

        //分離:單詞和檔名  k21:  love:data01.txt
        String data = k21.toString();
        //找到:的位置
        int index = data.indexOf(":");

        String word = data.substring(0, index);//單詞
        String fileName = data.substring(index+1);//檔名

        //輸出
        context.write(new Text(word), new Text(fileName+":"+total));
    }

}
package demo.revertedindex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class RevertedIndexMain {

    public static void main(String[] args) throws Exception {
        // 建立一個job:job = map + reduce
        Job job = Job.getInstance(new Configuration());

        //指定任務的入口
        job.setJarByClass(RevertedIndexMain.class);

        //指定任務的Mapper和輸出的資料型別: k2  v2
        job.setMapperClass(RevertedIndexMapper.class);
        job.setMapOutputKeyClass(Text.class);    //指定k2
        job.setMapOutputValueClass(Text.class);  //指定v2

        //設定任務的combiner
        job.setCombinerClass(RevertedIndexCombiner.class);

        //指定任務的Reducer和輸出的資料型別: k4 v4
        job.setReducerClass(RevertedIndexReducer.class);
        job.setOutputKeyClass(Text.class);   //指定k4
        job.setOutputValueClass(Text.class);   //指定v4

        //指定輸入的路徑(map)、輸出的路徑(reduce)
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //執行任務
        job.waitForCompletion(true);

    }

}

單元測試

package demo.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//public class WordCountMapper extends Mapper<k1, v1, k2, v2> {
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key1, Text value1, Context context)
            throws IOException, InterruptedException {
        /*
         * context 代表Mapper的上下文
         * 上文:HDFS
         * 下文:Reducer
         */

        //取出資料:  I love Beijing
        String data = value1.toString();

        //分詞
        String[] words = data.split(" ");

        //輸出
        for(String word:words){
            //            k2 就是 單詞                          v2: 記一次數
            context.write(new Text(word), new IntWritable(1));
        }
    }
}
package demo.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//public class WordCountReducer extends Reducer<k3, v3, k4, v4> {
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text k3, Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException {
        /*
         * context 代表reduce的上下文
         * 上文:Mapper
         * 下文:HDFS
         */

        //對v3進行求和
        int total = 0;
        for(IntWritable v:v3){
            total += v.get();
        }

        //輸出:k4 單詞     v4 頻率
        context.write(k3, new IntWritable(total));
    }

}
package demo.wc;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Test;

public class MRUnitWordCount {

    @Test
    public void testMapper() throws Exception{
        //設定一個環境變數
        System.setProperty("hadoop.home.dir", "D:\\temp\\hadoop-2.4.1\\hadoop-2.4.1");

        //建立一個測試物件
        WordCountMapper mapper = new WordCountMapper();

        //建立一個MapDriver進行單元測試
        MapDriver<LongWritable, Text, Text, IntWritable> driver = new MapDriver<>(mapper);

        //指定Map的輸入資料: k1  v1
        driver.withInput(new LongWritable(1), new Text("I love Beijing"));

        //指定Map的輸出:k2   v2  ----> 是我們期望得到結果
        driver.withOutput(new Text("I"), new IntWritable(1))
              .withOutput(new Text("love"), new IntWritable(1))
              .withOutput(new Text("Beijing"), new IntWritable(1));

        //執行單元測試:對比  期望的結果 和 實際的結果
        driver.runTest();
    }

    @Test
    public void testReducer() throws Exception{
        //設定一個環境變數
        System.setProperty("hadoop.home.dir", "D:\\temp\\hadoop-2.4.1\\hadoop-2.4.1");

        //建立一個測試物件
        WordCountReducer reducer = new WordCountReducer();

        //建立一個ReducerDriver進行單元測試
        ReduceDriver<Text, IntWritable, Text, IntWritable> driver = new ReduceDriver<>(reducer);

        //構造v3:List
        List<IntWritable> value3 = new ArrayList<>();
        value3.add(new IntWritable(1));
        value3.add(new IntWritable(1));
        value3.add(new IntWritable(1));


        //指定reducer的輸入
        driver.withInput(new Text("Beijing"), value3);


        //指定reducer的輸出
        driver.withOutput(new Text("Beijing"), new IntWritable(3));


        //執行測試
        driver.runTest();
    }

    @Test
    public void testJob() throws Exception{
        //設定一個環境變數
        System.setProperty("hadoop.home.dir", "D:\\temp\\hadoop-2.4.1\\hadoop-2.4.1");

        //建立一個測試物件
        WordCountMapper mapper = new WordCountMapper();     
        WordCountReducer reducer = new WordCountReducer();      

        //建立一個Driver
        //MapReduceDriver<K1, V1, K2, V2, K4, V4>
        MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable>
                driver = new MapReduceDriver<>(mapper,reducer);

        //指定Map輸入的資料
        driver.withInput(new LongWritable(1), new Text("I love Beijing"))
              .withInput(new LongWritable(4), new Text("I love China"))
              .withInput(new LongWritable(7), new Text("Beijing is the capital of China"));

        //指定Reducer輸出
//      driver.withOutput(new Text("I"), new IntWritable(2))
//            .withOutput(new Text("love"), new IntWritable(2))
//            .withOutput(new Text("Beijing"), new IntWritable(2))
//            .withOutput(new Text("China"), new IntWritable(2))
//            .withOutput(new Text("is"), new IntWritable(1))
//            .withOutput(new Text("the"), new IntWritable(1))
//            .withOutput(new Text("capital"), new IntWritable(1))
//            .withOutput(new Text("of"), new IntWritable(1));

        //需要考慮排序
        driver.withOutput(new Text("Beijing"), new IntWritable(2))
              .withOutput(new Text("China"), new IntWritable(2))
              .withOutput(new Text("I"), new IntWritable(2))
              .withOutput(new Text("capital"), new IntWritable(1))
              .withOutput(new Text("is"), new IntWritable(1))
              .withOutput(new Text("love"), new IntWritable(2))
              .withOutput(new Text("of"), new IntWritable(1))
              .withOutput(new Text("the"), new IntWritable(1));

        driver.runTest();
    }
}