1. 程式人生 > >大資料(十三):MapJoin(DistributedCache分散式快取)、資料清理例項與計數器應用

大資料(十三):MapJoin(DistributedCache分散式快取)、資料清理例項與計數器應用

一、在map端表合併(DistributedCache分散式快取)

1.適用場景

        適合用於關聯表中有小表的情形。

        可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表資料進行合併並輸出最終結果,可以大大提高合併操作的併發速度,加快處理速度。

2.程式分析

  1. 載入快取資料:job.addCacheFile(new URI("file://d:/pd.txt"));

  2. map端join的邏輯不需要reduce階段,設定reducetask數量為0:job.setNumReduceTask(0),言外之意MapReduce可以不需要reduce階段,但是必須有Map階段

  3. 在setup()中讀取快取檔案,一行一行的讀取檔案,並快取到集合中

  4. 在map()讀取需要合併的表,與緩衝拼接。

3.編寫Mapper程式碼

public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    Map<String, String> pdMap = new HashMap<>();
    Text k = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
//讀取快取檔案
        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "UTF-8"));
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            //切割資料
            String[] fields = line.split("\t");
            //將快取插入到集合中
            pdMap.put(fields[0], fields[1]);
        }
        //關閉流
        reader.close();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取一行資料
        String line = value.toString();
        //切割資料
        String[] fields = line.split("\t");
        //獲取pid
        String pid = fields[1];
        //根據pid獲取pname
        String pName = pdMap.get(pid);
        //拼接
        line = line + "\t" + pName;
        k.set(line);
        //輸出
        context.write(k, NullWritable.get());
    }
}

4.編寫Driver程式碼

public class DistributedCacheDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        //獲取job資訊
        Configuration entries = new Configuration();
        Job job = Job.getInstance(entries);
        //獲取驅動jar包
        job.setJarByClass(DistributedCacheDriver.class);
        //設定使用的Mapper
        job.setMapperClass(DistributedCacheMapper.class);
        //設定Reducer的個數為0
        job.setNumReduceTasks(0);
        //設定最終輸出型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //設定載入快取檔案
        job.addCacheFile(new URI("file://d:/pd.txt"));
        //設定輸入輸出路徑
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //執行
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

 

二、ETL資料清洗

1.概述

        在執行核心業務MapReduce程式之前,往往要對資料進行清洗,清理掉不符合使用者要求的資料。清理的過程往往只需要執行mapper程式,不需要執行Reducer程式。

 

三、資料清洗例項

1.需求

  1. 對web訪問日誌的個欄位識別切分

  2. 取出日誌中不合法的記錄

  3. 根據統計需求,生成各類訪問請求過濾資料

2.編寫bean程式碼

public class LogBean{
    /**
    * 客戶端ip地址
    */
    private String remoteAddr;
    /**
    * 使用者名稱稱
    */
    private String remoteUser;
    /**
    * 訪問時間和訪問時區
    */
    private String timeLocal;
    /**
    * 請求的url和http協議
    */
    private String request;
    /**
    * 請求狀態
    */
    private String status;
    /**
    * 傳送給客戶端檔案主題內容大小
    */
    private String bodyBytesSent;
    /**
    * 從哪個頁面連結訪問過來的
    */
    private String httpReferer;
    /**
    * 客戶瀏覽器的相關資訊
    */
    private String httpUserAgent;
    /**
    * 判斷資料是否合法
    */
    private boolean valid = true;

    public String getRemoteAddr() {
        return remoteAddr;
    }

    public void setRemoteAddr(String remoteAddr) {
        this.remoteAddr = remoteAddr;
    }

    public String getRemoteUser() {
        return remoteUser;
    }

    public void setRemoteUser(String remoteUser) {
        this.remoteUser = remoteUser;
    }

    public String getTimeLocal() {
        return timeLocal;
    }

    public void setTimeLocal(String timeLocal) {
        this.timeLocal = timeLocal;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getBodyBytesSent() {
        return bodyBytesSent;
    }

    public void setBodyBytesSent(String bodyBytesSent) {
        this.bodyBytesSent = bodyBytesSent;
    }

    public String getHttpReferer() {
        return httpReferer;
    }

    public void setHttpReferer(String httpReferer) {
        this.httpReferer = httpReferer;
    }

    public String getHttpUserAgent() {
        return httpUserAgent;
    }

    public void setHttpUserAgent(String httpUserAgent) {
        this.httpUserAgent = httpUserAgent;
    }

    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }

    @Override
    public String toString() {
        return "LogBean{" +
            "remoteAddr='" + remoteAddr + '\'' +
            ", remoteUser='" + remoteUser + '\'' +
            ", timeLocal='" + timeLocal + '\'' +
            ", request='" + request + '\'' +
            ", status='" + status + '\'' +
            ", bodyBytesSent='" + bodyBytesSent + '\'' +
            ", httpReferer='" + httpReferer + '\'' +
            ", httpUserAgent='" + httpUserAgent + '\'' +
            ", valid=" + valid +
            '}';
    }
}

3.編寫Mapper程式碼

public class LogMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
    Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取1行
        String line = value.toString();
        //分析日誌是否合法
        LogBean bean = pressLog(line);
        if (!bean.isValid()){
            return;
        }
        k.set(bean.toString());
        //輸出
        context.write(k,NullWritable.get());
    }

    private LogBean pressLog(String line) {
        LogBean logBean = new LogBean();
        //擷取
        String[] fields = line.split(" ");
        if (fields.length>1){
            //封裝資料
            logBean.setRemoteAddr(fields[0]);
            logBean.setRemoteUser(fields[1]);
            logBean.setTimeLocal(fields[3].substring(1));
            logBean.setRequest(fields[6]);
            logBean.setStatus(fields[8]);
            logBean.setBodyBytesSent(fields[9]);
            logBean.setHttpReferer(fields[10]);
            if (fields.length>12) {
                logBean.setHttpUserAgent(fields[11]+""+fields[12]);
            }else {
                logBean.setHttpUserAgent(fields[11]);
            }
            //大於400,HTTP錯誤
            if (Integer.parseInt(logBean.getStatus())>=400){
                logBean.setValid(false);
            }
        }else {
            logBean.setValid(false);
        }
        return logBean;
    }
}

4.編寫Driver程式碼

public class LogDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //獲取job資訊
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        //載入jar包
        job.setJarByClass(LogDriver.class);
        //關聯mapper
        job.setMapperClass(LogMapper.class);
        //設定最終輸出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //設定輸入輸出路徑
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //執行
        boolean flag = job.waitForCompletion(true);
        System.exit(flag ? 0 : 1);
    }
}

 

四、計數器應用

        Hadoop為每個作業維護若干內建計數器,以描述多項指標。列入,某些計數器記錄已處理的位元組數和記錄數,使使用者可以監控已處理的輸入資料量和已產生的輸出資料量。

1.採用列舉的方式統計計數

enum MyCounter(MALFORORMED,NORMAL)

//對列舉定義的自定義計數器加1

context.getCounter(MyCounter.MALFORORMED).increment(1);

2.採用計數器組、計數器名稱的方式統計

context.getCounter("counterGroup","countera").increment(1);

組名和計數器名稱隨意起,但最好有意義。

3.計數寄過在程式執行後的控制檯上可以檢視。

4.改寫ETL資料清理例項的map方法,新增計數器

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //獲取1行
    String line = value.toString();
    //分析日誌是否合法
    LogBean bean = pressLog(line);
    if (!bean.isValid()){
        context.getCounter("map","false").increment(1);
        return;
    }
    context.getCounter("map","true").increment(1);
    k.set(bean.toString());
    //輸出
    context.write(k,NullWritable.get());
}