1. 程式人生 > >使用MapReduce對Hadoop下的日誌記錄進行分析處理

使用MapReduce對Hadoop下的日誌記錄進行分析處理

一. 簡介

MapReduce是一個高效能的批處理分散式計算框架,用於對海量資料進行並行分析和處理。與傳統方法相比較,MapReduce更傾向於蠻力去解決問題,通過簡單、粗暴、有效的方式去處理海量的資料。通過對資料的輸入、拆分與組合(核心),將任務分配到多個節點伺服器上,進行分散式計算,這樣可以有效地提高資料管理的安全性,同時也能夠很好地範圍被管理的資料。
mapreduce核心就是map+shuffle+reducer,首先通過讀取檔案,進行分片,通過map獲取檔案的key-value對映關係,用作reducer的輸入,在作為reducer輸入之前,要先對map的key進行一個shuffle,也就是排個序,然後將排完序的key-value作為reducer的輸入進行reduce操作,當然一個mapreduce任務可以不要有reduce,只用一個map
其實現在MapReduce已經被Spark取代了,不過作為對大資料的學習,還是要稍微瞭解一下,下面是我學習過程中看過和寫過的例子。

二. Hadoop自帶的WordCount

2.1. 建立一個Maven專案,目錄結構如下:

2.2. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>sun</groupId>
    <artifactId>hadoop-MapReduce</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <hadoopVersion>2.6.0</hadoopVersion>
    </properties>

    <dependencies>
        <!-- Hadoop start -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoopVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-it</artifactId>
            <version>1.1.2</version>
        </dependency>
        <!-- Hadoop -->
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
        </dependency>
        <dependency>
            <groupId>org.jetbrains</groupId>
            <artifactId>annotations-java5</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

2.3. WordCount.java

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount
{
    public static void main(String[] args)
            throws Exception
    {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i = 0; i < otherArgs.length - 1; i++) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException
        {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            this.result.set(sum);
            context.write(key, this.result);
        }
    }

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
    {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }
        }
    }
}

三. 我的例子:

3.1. 將WordCount的結果上傳到HBase:

WordCountUpLoadToHBase.java:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;

public class WordCountUpLoadToHBase extends Configured {

    public static class WCHBaseMapper extends Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
            StringTokenizer strs = new StringTokenizer(value.toString());
            while(strs.hasMoreTokens()){
                word.set(strs.nextToken());
                context.write(new ImmutableBytesWritable(Bytes.toBytes(word.toString())), one);
            }
        }

    }

    public static class WCHBaseReducer extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>{

        public void reduce(ImmutableBytesWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
            int sum = 0;
            for(IntWritable val:values){
                sum += val.get();
            }
            Put put = new Put(key.get());
            put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(sum+""));
            context.write(key, put);
        }
    }


    @SuppressWarnings("all")
    public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        String tableName = "wordcount";
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","hadoop");
        conf.set("hbase.zookeeper.property.clientPort","2181");

        HBaseAdmin admin = new HBaseAdmin(conf);
        //如果表格存在就刪除
        if(admin.tableExists(tableName)){
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }
        HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
        HColumnDescriptor columnDescriptor =new HColumnDescriptor("content");
        tableDescriptor.addFamily(columnDescriptor);
        admin.createTable(tableDescriptor);

        Job job = new Job(conf,"upload to hbase");
        job.setJarByClass(WordCountUpLoadToHBase.class);
        job.setMapperClass(WCHBaseMapper.class);
        TableMapReduceUtil.initTableReducerJob(tableName, WCHBaseReducer.class, job,null,null,null,null,false);

        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);
        FileInputFormat.addInputPaths(job, "hdfs://hadoop:9000/agentlog/*");
        System.exit(job.waitForCompletion(true)?0:1);

    }

}

3.2. 從HBase讀取資料

MRReadFromHbase.java:
import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MRReadFromHbase extends Configured {

    public static class WCHBaseMapper extends TableMapper<Text, Text>{

        @Override
        public void map(ImmutableBytesWritable key,Result values,Context context) throws IOException, InterruptedException{
            StringBuffer sb =new StringBuffer("");
            for(Map.Entry<byte[], byte[]> value:values.getFamilyMap("content".getBytes()).entrySet()){
                String  str =new String(value.getValue());
                if(str!=null){
                    sb.append(str);
                }
                context.write(new Text(key.get()), new Text(sb.toString()));
            }
        }
    }

    public static class WCHBaseReducer extends Reducer<Text, Text, Text, Text>{
        private Text result =new Text();
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            for(Text val:values){
                result.set(val);
                context.write(key,result);
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        String tableName = "wordcount";
        Configuration conf =HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop");
        conf.set("hbase.zookeeper.property.clientPort", "2181");

        Job job =new Job(conf,"read from hbase to hdfs");
        job.setJarByClass(MRReadFromHbase.class);
        job.setReducerClass(WCHBaseReducer.class);
        TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), WCHBaseMapper.class, Text.class, Text.class, job);
        FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop1:9000/user/sun/hbase"));
        System.exit(job.waitForCompletion(true)?0:1);
    }

}

3.3. 我的自定義格式的日誌資料處理:

日誌以[format:1][user:AAA][interface:/bt/btCourse/get][date:2018/10/10]的格式儲存,針對[format:1]開頭的資料,根據不同使用者user進行排序統計。

UserLog.java:

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;

public class UserLog
{
    public static void main(String[] args)
            throws Exception
    {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(UserLog.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        for (int i = 0; i < otherArgs.length - 1; i++) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    // map將輸入中的value複製到輸出資料的key上,並直接輸出
    public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> {
        private static Text line = new Text();// 每行資料

        // 實現map函式
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            if (value.toString().startsWith("[format:1]")) {
                context.write(new Text(getParameter(value, "user") + "|" + getParameter(value, "time") + "|" + getParameter(value, "html")), new Text(""));
            }
        }
    }

/*    // reduce將輸入中的key複製到輸出資料的key上,並直接輸出
    public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {
        // 實現reduce函式
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text val : values) {
                context.write(key, val);
            }
        }
    }*/

    public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {

        //定義treeMap來保持統計結果,由於treeMap是按key升序排列的,這裡要人為指定Comparator以實現倒排
        private TreeMap<String, String> treeMap = new TreeMap<String, String>(new Comparator<String>() {
            //@Override
            public int compare(String x, String y) {
                return x.compareTo(y);
            }
        });

        //定義treeMap來保持統計結果,由於treeMap是按key升序排列的,這裡要人為指定Comparator以實現倒排
        private TreeMap<String, Long> treeMapResult = new TreeMap<String, Long>(new Comparator<String>() {
            //@Override
            public int compare(String x, String y) {
                return x.compareTo(y);
            }
        });

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //reduce後的結果放入treeMap,而不是向context中記入結果
            treeMap.put(key.toString(), key.toString());
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {

            if (StringUtils.isBlank(context.getCurrentValue().toString())) {
                Iterator it = treeMap.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    context.write(new Text(entry.getKey().toString()), new Text("0"));
                }
            }else{
                String key = "";
                String value = "";

                //將treeMap中的結果,按value-key順序寫入contex中
                Iterator it = treeMap.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry entry = (Map.Entry) it.next();
                    String[] sp = entry.getKey().toString().split("\\|");

                    if (key.equals(sp[0])){
                        long time = getSecondDiff(value, sp[1]);
                        if (!treeMapResult.containsKey(sp[0] + "|" + sp[2])) {
                            treeMapResult.put(sp[0] + "|" + sp[2], time);
                        } else {
                            treeMapResult.put(sp[0] + "|" + sp[2], Long.parseLong(treeMapResult.get(sp[0] + "|" + sp[2]).toString()) + time);
                        }
                    }else{
                        treeMapResult.put(sp[0] + "|" + sp[2], Long.parseLong("0"));
                    }

                    key = sp[0];
                    value = sp[1];
                }

                // 輸出
                Iterator iter = treeMapResult.entrySet().iterator();
                while (iter.hasNext()) {
                    Map.Entry entry = (Map.Entry) iter.next();
                    context.write(new Text(entry.getKey().toString()), new Text(entry.getValue().toString()));
                }
            }
        }
    }

    private static String getParameter(Text value, String param){
        try {
            return value.toString().substring(value.toString().indexOf(param) + param.length() + 1, value.toString().indexOf("]", value.toString().indexOf(param) + param.length() + 1));
        }catch(Exception e){
            return "";
        }
    }

    private static long getSecondDiff(String s1, String s2){

        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        Date d1 = null;
        Date d2 = null;

        try {
            d1 = format.parse(s1);
            d2 = format.parse(s2);

            //毫秒ms
            long diff = d2.getTime() - d1.getTime();

            long diffSeconds = diff / 1000;

            return diffSeconds;
        }catch(Exception e){
            return 0;
        }
    }
}

四. 結果驗證:

4.1. 將專案打成Jar包後,放到CentOS上的/home/hadoop/Downloads目錄下。

4.2. 執行:

hadoop jar /home/hadoop/Downloads/hadoop-MapReduce-1.0-SNAPSHOT.jar org.apache.hadoop.examples.wordcount /agentlog/ /user/sun/MapReduce/wordCountX

4.3. 檢視結果:

hadoop fs -cat /user/sun/MapReduce/wordCountX/part-r-00000