1. 程式人生 > >Mapreduce之自定義OutputFormat應用-日誌增強

Mapreduce之自定義OutputFormat應用-日誌增強

主要介紹的是自定義OutputFormat的使用,給出的需求很簡單
對現有的日誌檔案內容進行增強。
1、從原始日誌檔案中讀取資料
2、根據日誌中的一個URL欄位到外部知識庫中獲取資訊增強到原始日誌
3、如果成功增強,則輸出到增強結果目錄;如果增強失敗,則抽取原始資料中URL欄位輸出到待爬清單目錄
流程圖
這裡寫圖片描述
程式實現

/**
 * 從資料庫(知識庫)中載入資訊,為傳進來的map初始化
 * @author 12706
 *
 */
public class MapLoaderUtils {
    /**
     * 知識庫中載入資訊,初始化map
     * @param map
     */
public static void mapInit(Map<String,String> map){ Connection conn = null; Statement state = null; ResultSet rs = null; try { Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://192.168.191.2:3306/test"
, "root", "123456"); System.out.println("資料庫連線成功"); String sql = "SELECT * FROM url_rule"; state = conn.createStatement(); rs = state.executeQuery(sql); while(rs.next()){ //初始化map <url,content> //System.out.println(rs.getString(1));
//System.out.println(rs.getString(2)); map.put(rs.getString(1), rs.getString(2)); //System.out.println(map); } } catch (Exception e) { e.printStackTrace(); }finally { if(conn != null){ try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } if(state != null){ try { state.close(); } catch (SQLException e) { e.printStackTrace(); } } if(rs != null){ try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public static void main(String[] args) { Map<String, String> map = new HashMap<String, String>(); mapInit(map); System.out.println(map); } }
/**
 * 自定義的OutoutFormat
 * 拿到maptask或者reducetask傳來的kv進行處理
 * 將不同內容輸入到不同的檔案
 *  maptask或者reduceTask在最終輸出時,先呼叫OutPutFormat的getRecordWrite方法得到
 * 一個RecordWrite然後再呼叫RecordWrite的write(k,v)方法將資料寫出
 * @author 12706
 *
 */
public class LogEnhanceOutoutFormat extends FileOutputFormat<Text, NullWritable>{

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
            throws IOException, InterruptedException {
        FileSystem fs = FileSystem.get(job.getConfiguration());
        //獲取增強日誌對應的輸出流 
        FSDataOutputStream enOs = fs.create(new Path("/logenhance/enhance.data"));
        //獲取待爬資訊對應的輸出流
        FSDataOutputStream toCrawOs = fs.create(new Path("/logenhance/tocraw.data"));
        //傳入兩個檔案對應的輸出流
        LogEnhanceRecordWriter recordWriter = new LogEnhanceRecordWriter(enOs, toCrawOs);
        return recordWriter;
    }
    static class LogEnhanceRecordWriter extends RecordWriter<Text,NullWritable>{

        private FSDataOutputStream enOs = null;
        private FSDataOutputStream toCrawOs = null;

        public LogEnhanceRecordWriter(FSDataOutputStream enOs, FSDataOutputStream toCrawOs) {
            this.enOs = enOs;
            this.toCrawOs = toCrawOs;
        }

        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            //拿到的資訊要麼www.abc.com    abcdefg或者www.abc.com    tocraw
            //根據key內容來判斷將key輸出到哪個檔案
            String info = key.toString();
            if(info.contains("tocraw")){
                //輸出到待待爬檔案
                toCrawOs.write(info.getBytes());
            }else {
                //輸出待增強日誌的檔案
                enOs.write(info.getBytes());
            }
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            //關閉流
            if(enOs != null){
                enOs.close();
            }
            if(toCrawOs != null){
                toCrawOs.close();
            }
        }

    }
}
/**
 * 主程式:讀取日誌內容,根據url去知識庫看能否查到內容資訊,如果能的話,
 * 將原來那行日誌資訊增強寫入到檔案a.txt,如果沒有查到,那麼將資訊寫到到待爬檔案b.txt
 * 至於怎麼使得不同內容輸入到不同檔案則使用自定義的OutputFormat,但主程式中要指定。
 * @author 12706
 *
 */
public class MyLogEnhance {
    static class MyLogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        Map<String, String> logMap = new HashMap<String, String>();
        Text k = new Text();
        //
        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            //將資訊載入到map快取中
            MapLoaderUtils.mapInit(logMap);
        }
        //讀取日誌檔案一行文字資訊,切割後第27個為url
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //計數器,用來記錄欄位不符合>=27的行
            Counter counter = context.getCounter("mylog", "illegal_line");
            String line = value.toString();
            String[] fields = line.split("\t");
            try {
                //有的一行不一定有27個欄位
                String url = fields[26];
                //根據url去快取中查詢是否有內容資訊
                String content = logMap.get(url);
                if(content == null){
                    //沒有內容,如果為空則只輸出url到待爬清單
                    k.set(url+"\t"+"tocraw"+"\n");
                }else {
                    //知識庫(資料庫)中有資訊,增強日誌資訊
                    k.set(line+"\t"+context+"\n");
                }
                context.write(k, NullWritable.get());

            } catch (Exception e) {
                counter.increment(1);//一次加1
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(MyLogEnhance.class);
        job.setMapperClass(MyLogEnhanceMapper.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //使用自定義OutputFormat
        job.setOutputFormatClass(LogEnhanceOutoutFormat.class);
        //指明不需要使用reduce
        job.setNumReduceTasks(0);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //儘管我們用的自定義Outputformat,但是它是繼承自FileOutputFormat
        //在FileOutputFormat中,必須輸出一個_success檔案,所以在此還需要輸出path
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean exit = job.waitForCompletion(true);
        System.exit(exit?0:1);

    }
}

測試程式
工程打包上傳到linux
將日誌檔案上傳到linux
建立資料夾/logenhance/input
將日誌檔案上傳到/logenhance/input
執行程式,檢視

[[email protected] ~]# hadoop fs -mkdir -p /logenhance/input
[[email protected] ~]# hadoop fs -put 2013072404-http-combinedBy-1373892200521-log-1 /logenhance/input
[[email protected] ~]# hadoop jar logen.jar com.scu.hadoop.t.logenhanceoutput.MyLogEnhance /logenhance/input /logenhance/output
...
 mylog
                illegal_line=1
        File Input Format Counters 
                Bytes Read=61826249
        File Output Format Counters 
                Bytes Written=68329573(這些是比以前多出來的)
                [[email protected] ~]# hadoop fs -ls /logenhance
Found 4 items
-rw-r--r--   2 root supergroup   68329573 2017-10-17 07:32 /logenhance/enhance.data
drwxr-xr-x   - root supergroup          0 2017-10-17 07:00 /logenhance/input
drwxr-xr-x   - root supergroup          0 2017-10-17 07:32 /logenhance/output
-rw-r--r--   2 root supergroup          0 2017-10-17 07:32 /logenhance/tocraw.data

相關推薦

Mapreduce定義OutputFormat應用-日誌增強

主要介紹的是自定義OutputFormat的使用,給出的需求很簡單 對現有的日誌檔案內容進行增強。 1、從原始日誌檔案中讀取資料 2、根據日誌中的一個URL欄位到外部知識庫中獲取資訊增強到原始日誌 3、如果成功增強,則輸出到增強結果目錄;如果增強失敗,則

MapReduce定義OutputFormat

@[toc] ## OutputFormat介面實現類 `OutputFormat`是`MapReduce`輸出的基類,所有實現`MapReduce`輸出都實現了`OutputFormat`介面。下面介紹幾種常見的OutputFormat實現類。 - 文字輸出`TextoutputFormat` 預設的輸出

MapReduce定義WordCount案例

在一堆給定的文字檔案中統計輸出每一個單詞出現的總次數。 1.分析 mapper階段: 將mapstack 傳給我們的文字資訊內容先轉換成string。 根據空格將一行切分成單詞。 將單詞輸出為<單詞,1>的格式。 reducer階段

MapReduce ---定義全域性計數器,將資訊輸出到控制檯

package jishuqi; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.ha

MapReduce定義partitioner

partitioner定義: partitioner的作用是將mapper(如果使用了combiner的話就是combiner)輸出的key/value拆分為分片(shard),每個reducer對應一個分片。預設情況下,partitioner先計算key的雜湊值(通常

MapReduce定義InputFormat

>在企業開發中,Hadoop框架自帶的`InputFormat`型別不能滿足所有應用場景,需要==自定義==InputFormat來解決實際問題。 自定義InputFormat步驟如下: - (1)自定義一個類繼承`FilelnputFormat`。 - (2)自定義一個類繼承`RecordReader`,實

MapReduce定義分割槽器Partitioner

@[toc] ## 問題引出 >要求將統計結果按照條件輸出到不同檔案中(分割槽)。 比如:將統計結果按照**手機歸屬地不同省份**輸出到不同檔案中(分割槽) ## 預設Partitioner分割槽 ```java public class HashPartitioner extends Partitio

每天一個JS 小demo定義滾動條。主要知識點:事件應用

prevent 數據 滾動 sca listener 視頻 希望 特效 poi <!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><title>D

安卓編程定義字體控件導致應用閃退

有效 face ets type ima 效果 運行時 界面跳轉 手機   這坑踩的是結結實實,近來做項目,需要用到自定義字體,一個項目中近十種字體,果斷選擇了使用自定義控件來實現。   可是,大功告成之後,在性能較差的手機上去運行時,反復切換頁面,應用閃退了,log沒有有

HBase定義HBase-Mapreduce案例一

1.需求場景 將HBase中的ys表中的一部分資料通過Mapreduce遷移到ys_mr表中 2.程式碼編寫   1)構建ReadysMapreduce類,用於讀取ys表中的資料 package cn.ysjh; import java.io.IOException;

MapReduce系列定義Partitioner

partitioner定義:分割槽器 partitioner的作用是將mapper(如果使用了combiner的話就是combiner)輸出的key/value拆分為分片(shard),每個reducer對應一個分片。預設情況下,partitioner先計算key的雜湊值(通常為md5值)。然後

asp.net core mcroservices 架構 分散式日誌(二)定義日誌開發

一   netcore日誌原理                                    &nbs

logback系列七:繼承RollingFileAppender,儲存定義檔名的日誌

繼承類:package com.hk3t.air.system.log; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos

【本人禿頂程式設計師】Nginx學習定義訪問日誌

←←←←←←←←←←←← 快,點關注! 寫在開始 日誌的重要性不言而喻,一般來說我們開發過程中會為每個專案定義自己的日誌格式以及儲存路徑。 就我們普通的JAVAWEB專案來說,重要的日誌一般輸出並存放在Tomcat的log目錄下,並區分日誌輸出級別。用於區分,查閱並統計相關日誌資

反射應用進階篇定義反射工具類在springmvc中的應用

本篇使用自定義工具類進行批量處理物件 ---將批量源物件的屬性值注入到實際需要的目標類物件(屬性名相同,型別不同)中 專案使用maven構建war工程:  spring+spring MVC+Mybatis 回顧知識點: 事務:--->為什麼在使用AOP時需要使

js基礎定義屬性索引的應用

<!DOCTYPE HTML> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"> <title>無標題文件</title>

基於.NetCore3.1系列 —— 日誌記錄定義日誌元件

# 一、前言 回顧:[日誌記錄之日誌核心要素揭祕](https://www.cnblogs.com/i3yuan/p/13442509.html) 在上一篇中,我們通過學習瞭解在.net core 中內建的日誌記錄中的幾大核心要素,在日誌工廠記錄器(`ILoggerFactory`)中實現將日誌記錄提供器

python定義異步IO客戶端

class close sel 封裝 [0 urn 簡單 pytho syn #!/usr/bin/env python # -*- coding: utf8 -*- # __Author: "Skiler Hao" # date: 2017/5/16 15:04

Hadoop實戰-Flume定義Sink(十九)

current ioe back urn oop print out java try import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream;

C#定義特性

創建 tip comm 字段 運算符 包含 自動 名稱 程序   在前面介紹的代碼中有使用特性,這些特性都是Microsoft定義好的,作為.NET Framework類庫的一部分,許多特性都得到了C#編譯器的支持。   .NET Frmework也允許定義自己的特性。自