Mapreduce之自定義OutputFormat應用-日誌增強
主要介紹的是自定義OutputFormat的使用,給出的需求很簡單
對現有的日誌檔案內容進行增強。
1、從原始日誌檔案中讀取資料
2、根據日誌中的一個URL欄位到外部知識庫中獲取資訊增強到原始日誌
3、如果成功增強,則輸出到增強結果目錄;如果增強失敗,則抽取原始資料中URL欄位輸出到待爬清單目錄
流程圖
程式實現
/**
* 從資料庫(知識庫)中載入資訊,為傳進來的map初始化
* @author 12706
*
*/
public class MapLoaderUtils {
/**
* 知識庫中載入資訊,初始化map
* @param map
*/
public static void mapInit(Map<String,String> map){
Connection conn = null;
Statement state = null;
ResultSet rs = null;
try {
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://192.168.191.2:3306/test" , "root", "123456");
System.out.println("資料庫連線成功");
String sql = "SELECT * FROM url_rule";
state = conn.createStatement();
rs = state.executeQuery(sql);
while(rs.next()){
//初始化map <url,content>
//System.out.println(rs.getString(1));
//System.out.println(rs.getString(2));
map.put(rs.getString(1), rs.getString(2));
//System.out.println(map);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
if(conn != null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(state != null){
try {
state.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(rs != null){
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Map<String, String> map = new HashMap<String, String>();
mapInit(map);
System.out.println(map);
}
}
/**
* 自定義的OutoutFormat
* 拿到maptask或者reducetask傳來的kv進行處理
* 將不同內容輸入到不同的檔案
* maptask或者reduceTask在最終輸出時,先呼叫OutPutFormat的getRecordWrite方法得到
* 一個RecordWrite然後再呼叫RecordWrite的write(k,v)方法將資料寫出
* @author 12706
*
*/
public class LogEnhanceOutoutFormat extends FileOutputFormat<Text, NullWritable>{
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(job.getConfiguration());
//獲取增強日誌對應的輸出流
FSDataOutputStream enOs = fs.create(new Path("/logenhance/enhance.data"));
//獲取待爬資訊對應的輸出流
FSDataOutputStream toCrawOs = fs.create(new Path("/logenhance/tocraw.data"));
//傳入兩個檔案對應的輸出流
LogEnhanceRecordWriter recordWriter = new LogEnhanceRecordWriter(enOs, toCrawOs);
return recordWriter;
}
static class LogEnhanceRecordWriter extends RecordWriter<Text,NullWritable>{
private FSDataOutputStream enOs = null;
private FSDataOutputStream toCrawOs = null;
public LogEnhanceRecordWriter(FSDataOutputStream enOs, FSDataOutputStream toCrawOs) {
this.enOs = enOs;
this.toCrawOs = toCrawOs;
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
//拿到的資訊要麼www.abc.com abcdefg或者www.abc.com tocraw
//根據key內容來判斷將key輸出到哪個檔案
String info = key.toString();
if(info.contains("tocraw")){
//輸出到待待爬檔案
toCrawOs.write(info.getBytes());
}else {
//輸出待增強日誌的檔案
enOs.write(info.getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
//關閉流
if(enOs != null){
enOs.close();
}
if(toCrawOs != null){
toCrawOs.close();
}
}
}
}
/**
* 主程式:讀取日誌內容,根據url去知識庫看能否查到內容資訊,如果能的話,
* 將原來那行日誌資訊增強寫入到檔案a.txt,如果沒有查到,那麼將資訊寫到到待爬檔案b.txt
* 至於怎麼使得不同內容輸入到不同檔案則使用自定義的OutputFormat,但主程式中要指定。
* @author 12706
*
*/
public class MyLogEnhance {
static class MyLogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> logMap = new HashMap<String, String>();
Text k = new Text();
//
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
//將資訊載入到map快取中
MapLoaderUtils.mapInit(logMap);
}
//讀取日誌檔案一行文字資訊,切割後第27個為url
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//計數器,用來記錄欄位不符合>=27的行
Counter counter = context.getCounter("mylog", "illegal_line");
String line = value.toString();
String[] fields = line.split("\t");
try {
//有的一行不一定有27個欄位
String url = fields[26];
//根據url去快取中查詢是否有內容資訊
String content = logMap.get(url);
if(content == null){
//沒有內容,如果為空則只輸出url到待爬清單
k.set(url+"\t"+"tocraw"+"\n");
}else {
//知識庫(資料庫)中有資訊,增強日誌資訊
k.set(line+"\t"+context+"\n");
}
context.write(k, NullWritable.get());
} catch (Exception e) {
counter.increment(1);//一次加1
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MyLogEnhance.class);
job.setMapperClass(MyLogEnhanceMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//使用自定義OutputFormat
job.setOutputFormatClass(LogEnhanceOutoutFormat.class);
//指明不需要使用reduce
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(args[0]));
//儘管我們用的自定義Outputformat,但是它是繼承自FileOutputFormat
//在FileOutputFormat中,必須輸出一個_success檔案,所以在此還需要輸出path
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean exit = job.waitForCompletion(true);
System.exit(exit?0:1);
}
}
測試程式
工程打包上傳到linux
將日誌檔案上傳到linux
建立資料夾/logenhance/input
將日誌檔案上傳到/logenhance/input
執行程式,檢視
[[email protected] ~]# hadoop fs -mkdir -p /logenhance/input
[[email protected] ~]# hadoop fs -put 2013072404-http-combinedBy-1373892200521-log-1 /logenhance/input
[[email protected] ~]# hadoop jar logen.jar com.scu.hadoop.t.logenhanceoutput.MyLogEnhance /logenhance/input /logenhance/output
...
mylog
illegal_line=1
File Input Format Counters
Bytes Read=61826249
File Output Format Counters
Bytes Written=68329573(這些是比以前多出來的)
[[email protected] ~]# hadoop fs -ls /logenhance
Found 4 items
-rw-r--r-- 2 root supergroup 68329573 2017-10-17 07:32 /logenhance/enhance.data
drwxr-xr-x - root supergroup 0 2017-10-17 07:00 /logenhance/input
drwxr-xr-x - root supergroup 0 2017-10-17 07:32 /logenhance/output
-rw-r--r-- 2 root supergroup 0 2017-10-17 07:32 /logenhance/tocraw.data
相關推薦
Mapreduce之自定義OutputFormat應用-日誌增強
主要介紹的是自定義OutputFormat的使用,給出的需求很簡單 對現有的日誌檔案內容進行增強。 1、從原始日誌檔案中讀取資料 2、根據日誌中的一個URL欄位到外部知識庫中獲取資訊增強到原始日誌 3、如果成功增強,則輸出到增強結果目錄;如果增強失敗,則
MapReduce之自定義OutputFormat
@[toc] ## OutputFormat介面實現類 `OutputFormat`是`MapReduce`輸出的基類,所有實現`MapReduce`輸出都實現了`OutputFormat`介面。下面介紹幾種常見的OutputFormat實現類。 - 文字輸出`TextoutputFormat` 預設的輸出
MapReduce之自定義WordCount案例
在一堆給定的文字檔案中統計輸出每一個單詞出現的總次數。 1.分析 mapper階段: 將mapstack 傳給我們的文字資訊內容先轉換成string。 根據空格將一行切分成單詞。 將單詞輸出為<單詞,1>的格式。 reducer階段
MapReduce 之 ---自定義全域性計數器,將資訊輸出到控制檯
package jishuqi; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.ha
MapReduce之自定義partitioner
partitioner定義: partitioner的作用是將mapper(如果使用了combiner的話就是combiner)輸出的key/value拆分為分片(shard),每個reducer對應一個分片。預設情況下,partitioner先計算key的雜湊值(通常
MapReduce之自定義InputFormat
>在企業開發中,Hadoop框架自帶的`InputFormat`型別不能滿足所有應用場景,需要==自定義==InputFormat來解決實際問題。 自定義InputFormat步驟如下: - (1)自定義一個類繼承`FilelnputFormat`。 - (2)自定義一個類繼承`RecordReader`,實
MapReduce之自定義分割槽器Partitioner
@[toc] ## 問題引出 >要求將統計結果按照條件輸出到不同檔案中(分割槽)。 比如:將統計結果按照**手機歸屬地不同省份**輸出到不同檔案中(分割槽) ## 預設Partitioner分割槽 ```java public class HashPartitioner extends Partitio
每天一個JS 小demo之自定義滾動條。主要知識點:事件應用
prevent 數據 滾動 sca listener 視頻 希望 特效 poi <!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><title>D
安卓編程之自定義字體控件導致應用閃退
有效 face ets type ima 效果 運行時 界面跳轉 手機 這坑踩的是結結實實,近來做項目,需要用到自定義字體,一個項目中近十種字體,果斷選擇了使用自定義控件來實現。 可是,大功告成之後,在性能較差的手機上去運行時,反復切換頁面,應用閃退了,log沒有有
HBase之自定義HBase-Mapreduce案例一
1.需求場景 將HBase中的ys表中的一部分資料通過Mapreduce遷移到ys_mr表中 2.程式碼編寫 1)構建ReadysMapreduce類,用於讀取ys表中的資料 package cn.ysjh; import java.io.IOException;
MapReduce系列之自定義Partitioner
partitioner定義:分割槽器 partitioner的作用是將mapper(如果使用了combiner的話就是combiner)輸出的key/value拆分為分片(shard),每個reducer對應一個分片。預設情況下,partitioner先計算key的雜湊值(通常為md5值)。然後
asp.net core mcroservices 架構之 分散式日誌(二)之自定義日誌開發
一 netcore日誌原理 &nbs
logback系列之七:繼承RollingFileAppender,儲存自定義檔名的日誌
繼承類:package com.hk3t.air.system.log; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos
【本人禿頂程式設計師】Nginx學習之自定義訪問日誌
←←←←←←←←←←←← 快,點關注! 寫在開始 日誌的重要性不言而喻,一般來說我們開發過程中會為每個專案定義自己的日誌格式以及儲存路徑。 就我們普通的JAVAWEB專案來說,重要的日誌一般輸出並存放在Tomcat的log目錄下,並區分日誌輸出級別。用於區分,查閱並統計相關日誌資
反射應用進階篇之自定義反射工具類在springmvc中的應用
本篇使用自定義工具類進行批量處理物件 ---將批量源物件的屬性值注入到實際需要的目標類物件(屬性名相同,型別不同)中 專案使用maven構建war工程: spring+spring MVC+Mybatis 回顧知識點: 事務:--->為什麼在使用AOP時需要使
js基礎之自定義屬性索引的應用
<!DOCTYPE HTML> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"> <title>無標題文件</title>
基於.NetCore3.1系列 —— 日誌記錄之自定義日誌元件
# 一、前言 回顧:[日誌記錄之日誌核心要素揭祕](https://www.cnblogs.com/i3yuan/p/13442509.html) 在上一篇中,我們通過學習瞭解在.net core 中內建的日誌記錄中的幾大核心要素,在日誌工廠記錄器(`ILoggerFactory`)中實現將日誌記錄提供器
python之自定義異步IO客戶端
class close sel 封裝 [0 urn 簡單 pytho syn #!/usr/bin/env python # -*- coding: utf8 -*- # __Author: "Skiler Hao" # date: 2017/5/16 15:04
Hadoop實戰-Flume之自定義Sink(十九)
current ioe back urn oop print out java try import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream;
C#之自定義特性
創建 tip comm 字段 運算符 包含 自動 名稱 程序 在前面介紹的代碼中有使用特性,這些特性都是Microsoft定義好的,作為.NET Framework類庫的一部分,許多特性都得到了C#編譯器的支持。 .NET Frmework也允許定義自己的特性。自