1. 程式人生 > >hadoop之mapreduce程式設計例項(系統日誌初步清洗過濾處理)

hadoop之mapreduce程式設計例項(系統日誌初步清洗過濾處理)

剛剛開始接觸hadoop的時候,總覺得必須要先安裝hadoop叢集才能開始學習MR程式設計,其實並不用這樣,當然如果你有條件有機器那最好是自己安裝配置一個hadoop叢集,這樣你會更容易理解其工作原理。我們今天就是要給大家演示如何不用安裝hadoop直接除錯程式設計MapReduce函式。

開始之前我們先來理解一下mapreduce的工作原理:

hadoop叢集是有DataNode和NameNode兩種節點構成,DataNode負責儲存資料本身而NameNode負責儲存資料的元資料資訊,在啟動mapreduce任務時,資料首先是通過inputformat模組從叢集的檔案庫中讀出,然後按照設定的Splitsize進行Split(預設是一個block大小128MB),通過ReadRecorder(RR)將每個split的資料塊按行進行輪詢訪問結果給到map函式,由map函式按照程式設計的程式碼邏輯進行處理,輸出key和value。由map到reduce的處理過程中包含三件事情,Combiner(map端的預先處理,相對於map段reduce)Partitioner(負責將map輸出資料均衡的分配給reduce)Shulffling&&sort(根據map輸出的key進行洗牌和排序,將結果根據partitioner的分配情況傳輸給指定的reduce),最後reduce按照程式碼邏輯處理輸出結果(也是key,value格式)。

注意:

map階段的key-value對的格式是由輸入的格式所決定的,如果是預設的TextInputFormat,則每行作為一個記錄程序處理,其中key為此行的開頭相對於檔案的起始位置,value就是此行的字元文字
map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應

下面是wordcount的處理過程大家來理解一下:

現在我們開始我們的本地MR程式設計吧

首先我們得去官網下載一個hadoop安裝包(本文用的hadoop2.6.0版本,不用安裝,我們只要包中jars)

下載連結:https://archive.apache.org/dist/hadoop/common/(下載最多的那個就可以了,版本自己選個)

下面就上MR的程式碼吧:

package loganalysis;
import java.io.IOException;
import java.util.StringTokenizer;
import java.lang.*;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

 
public class WordCount {
 
  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
 
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private String imei = new String();
    private String areacode  = new String();
    private String responsedata = new String();
    private String requesttime = new String();
    private String requestip = new String();

//    map階段的key-value對的格式是由輸入的格式所決定的,如果是預設的TextInputFormat,則每行作為一個記錄程序處理,其中key為此行的開頭相對於檔案的起始位置,value就是此行的字元文字
//    map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      //StringTokenizer itr = new StringTokenizer(value.toString());
      
      int areai = value.toString().indexOf("areacode", 21);
      int imeii = value.toString().indexOf("imei", 21);
      int redatai = value.toString().indexOf("responsedata", 21);
      int retimei = value.toString().indexOf("requesttime", 21);
      int reipi = value.toString().indexOf("requestip", 21);
      
      if (areai==-1)
      { areacode=""; }
      else
      {
      areacode=value.toString().substring(areai+11);
      int len2=areacode.indexOf("\"");
      if(len2 <= 1)
        {
    	  areacode="";
        }
      else 
        {
    	  areacode=areacode.substring(0,len2);
        }
      
      }
      if (imeii==-1)
      { imei=""; }
      else
      {
    	  imei=value.toString().substring(imeii+9);
      int len2=imei.indexOf("\\");
      if(len2 <= 1)
        {
    	  imei="";
        }
      else 
        {
    	  imei=imei.substring(0,len2);
        }
      
      }
     
      
      if (redatai==-1)
      { responsedata=""; }
      else
      {
    	  responsedata=value.toString().substring(redatai+15);
      int len2=responsedata.indexOf("\"");
      if(len2 <= 1)
        {
    	  responsedata="";
        }
      else 
        {
    	  responsedata=responsedata.substring(0,len2);
        }
      
      }
      
      
      if (retimei==-1)
      { requesttime=""; }
      else
      {
    	  requesttime=value.toString().substring(retimei+14);
      int len2=requesttime.indexOf("\"");
      if(len2 <= 1)
        {
    	  requesttime="";
        }
      else 
        {
    	  requesttime=requesttime.substring(0,len2);
        }
      
      }
      
      if (reipi==-1)
      { requestip=""; }
      else
      {
    	  requestip=value.toString().substring(reipi+12);
      int len2=requestip.indexOf("\"");
      if(len2 <= 1)
        {
    	  requestip="";
        }
      else 
        {
    	  requestip=requestip.substring(0,len2);
        }
      
      }
     /* while (itr.hasMoreTokens()) {
    	  string tim;
    	  
        word.set(itr.nextToken());
        context.write(word, one);
      }*/
     if(imei!=""&&areacode!=""&&responsedata!=""&&requesttime!=""&&requestip!="")
     {
       String wd=new String();
       wd=imei+"\t"+areacode+"\t"+responsedata+"\t"+requesttime+"\t"+requestip;
       //wd="areacode|"+areacode +"|imei|"+ imei +"|responsedata|"+ responsedata +"|requesttime|"+ requesttime +"|requestip|"+ requestip;
       word.set(wd);
       context.write(word, one);
     }

    }
  }
 
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
 
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
 
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
  //  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    String[] otherArgs=new String[]{"/Users/mac/tmp/inputmr","/Users/mac/tmp/output1"};
    
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    
	//Job job = new Job(conf, "word count");
    Job job = Job.getInstance(conf);
    
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
主要以上除了jdk1.7其他的jar包都來自hadoop安裝包中的share檔案下下面

如果你不知道那些包需要那就將share\hadoop\下面的所以得jar包都新增到專案中


注意:我的電腦是mac pro如果你的是Windows機器相關的路徑需要修改一下,前面加上“file:///”( file:///D:\tmp\input file:///D:\tmp\output)

 String[] otherArgs=new String[]{"file:///D:\tmp\input","file:///D:\tmp\output"};
這個程式核心程式碼都是在map中,主要做了系統日誌中相關核心欄位的提取並拼接以key形式返回給reduce,value都是設定為1,是為了方便以後的統計。因為是例項所以簡單的弄了幾個欄位,實際可不止這些。

下面給下測試的系統日誌:

2016-04-18 16:00:00 {"areacode":"浙江省麗水市","countAll":0,"countCorrect":0,"datatime":"4134362","logid":"201604181600001184409476","requestinfo":"{\"sign\":\"4\",\"timestamp\":\"1460966390499\",\"remark\":\"4\",\"subjectPro\":\"123456\",\"interfaceUserName\":\"12345678900987654321\",\"channelno\":\"100\",\"imei\":\"12345678900987654321\",\"subjectNum\":\"13989589062\",\"imsi\":\"12345678900987654321\",\"queryNum\":\"13989589062\"}","requestip":"36.16.128.234","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"無查詢結果"}
2016-04-18 16:00:00 {"areacode":"寧夏銀川市","countAll":0,"countCorrect":0,"datatime":"4715990","logid":"201604181600001858043208","requestinfo":"{\"sign\":\"4\",\"timestamp\":\"1460966400120\",\"remark\":\"4\",\"subjectPro\":\"123456\",\"interfaceUserName\":\"12345678900987654321\",\"channelno\":\"1210\",\"imei\":\"A0000044ABFD25\",\"subjectNum\":\"15379681917\",\"imsi\":\"460036951451601\",\"queryNum\":\"\"}","requestip":"115.168.93.87","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"無查詢結果","userAgent":"ZTE-Me/Mobile"}
2016-04-18 16:00:00 {"areacode":"黑龍江省哈爾濱市","countAll":0,"countCorrect":0,"datatime":"5369561","logid":"201604181600001068429609","requestinfo":"{\"interfaceUserName\":\"12345678900987654321\",\"queryNum\":\"\",\"timestamp\":\"1460966400139\",\"sign\":\"4\",\"imsi\":\"460030301212545\",\"imei\":\"35460207765269\",\"subjectNum\":\"55588237\",\"subjectPro\":\"123456\",\"remark\":\"4\",\"channelno\":\"2100\"}","requestip":"42.184.41.180","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"無查詢結果"}
2016-04-18 16:00:00 {"areacode":"浙江省麗水市","countAll":0,"countCorrect":0,"datatime":"4003096","logid":"201604181600001648238807","requestinfo":"{\"sign\":\"4\",\"timestamp\":\"1460966391025\",\"remark\":\"4\",\"subjectPro\":\"123456\",\"interfaceUserName\":\"12345678900987654321\",\"channelno\":\"100\",\"imei\":\"12345678900987654321\",\"subjectNum\":\"13989589062\",\"imsi\":\"12345678900987654321\",\"queryNum\":\"13989589062\"}","requestip":"36.16.128.234","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"無查詢結果"}
2016-04-18 16:00:00 {"areacode":"廣西南寧市","countAll":0,"countCorrect":0,"datatime":"4047993","logid":"201604181600001570024205","requestinfo":"{\"sign\":\"4\",\"timestamp\":\"1460966382871\",\"remark\":\"4\",\"subjectPro\":\"123456\",\"interfaceUserName\":\"12345678900987654321\",\"channelno\":\"1006\",\"imei\":\"A000004853168C\",\"subjectNum\":\"07765232589\",\"imsi\":\"460031210400007\",\"queryNum\":\"13317810717\"}","requestip":"219.159.72.3","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"無查詢結果"}
2016-04-18 16:00:00 {"areacode":"海南省五指山市","countAll":0,"countCorrect":0,"datatime":"5164117","logid":"201604181600001227842048","requestinfo":"{\"sign\":\"4\",\"timestamp\":\"1460966399159\",\"remark\":\"4\",\"subjectPro\":\"123456\",\"interfaceUserName\":\"12345678900987654321\",\"channelno\":\"1017\",\"imei\":\"A000005543AFB7\",\"subjectNum\":\"089836329061\",\"imsi\":\"460036380954376\",\"queryNum\":\"13389875751\"}","requestip":"140.240.171.71","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"無查詢結果"}
2016-04-18 16:00:00 {"areacode":"山西省","countAll":0,"countCorrect":0,"datatime":"14075772","logid":"201604181600001284030648","requestinfo":"{\"sign\":\"4\",\"timestamp\":\"1460966400332\",\"remark\":\"4\",\"subjectPro\":\"123456\",\"interfaceUserName\":\"12345678900987654321\",\"channelno\":\"1006\",\"imei\":\"A000004FE0218A\",\"subjectNum\":\"03514043633\",\"imsi\":\"460037471517070\",\"queryNum\":\"\"}","requestip":"1.68.5.227","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"無查詢結果"}
2016-04-18 16:00:00 {"areacode":"四川省","countAll":0,"countCorrect":0,"datatime":"6270982","logid":"201604181600001173504863","requestinfo":"{\"sign\":\"4\",\"timestamp\":\"1460966398896\",\"remark\":\"4\",\"subjectPro\":\"123456\",\"interfaceUserName\":\"12345678900987654321\",\"channelno\":\"100\",\"imei\":\"12345678900987654321\",\"subjectNum\":\"13666231300\",\"imsi\":\"12345678900987654321\",\"queryNum\":\"13666231300\"}","requestip":"182.144.66.97","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"無查詢結果"}
2016-04-18 16:00:00 {"areacode":"浙江省","countAll":0,"countCorrect":0,"datatime":"4198522","logid":"201604181600001390637240","requestinfo":"{\"sign\":\"4\",\"timestamp\":\"1460966399464\",\"remark\":\"4\",\"subjectPro\":\"123456\",\"interfaceUserName\":\"12345678900987654321\",\"channelno\":\"100\",\"imei\":\"12345678900987654321\",\"subjectNum\":\"05533876327\",\"imsi\":\"12345678900987654321\",\"queryNum\":\"05533876327\"}","requestip":"36.23.9.49","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"000000","responsedata":"操作成功"}
2016-04-18 16:00:00 {"areacode":"江蘇省連雲港市","countAll":0,"countCorrect":0,"datatime":"4408097","logid":"201604181600001249944032","requestinfo":"{\"sign\":\"4\",\"timestamp\":\"1460966395908\",\"remark\":\"4\",\"subjectPro\":\"123456\",\"interfaceUserName\":\"12345678900987654321\",\"channelno\":\"100\",\"imei\":\"12345678900987654321\",\"subjectNum\":\"18361451463\",\"imsi\":\"12345678900987654321\",\"queryNum\":\"18361451463\"}","requestip":"58.223.4.210","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"無查詢結果"}

最後給出執行結果截圖: