1. 程式人生 > >【hadoop】1、MapReduce進行日誌分析,並排序統計結果

【hadoop】1、MapReduce進行日誌分析,並排序統計結果

1.網上很多關於搭建Hadoop叢集的知識,這裡不多做敘述,並且本機執行Hadoop程式是不需要hdfs叢集的,我們本機執行只做個demo樣式,當真的需要執行大資料的時候,才需要真正的叢集

2.還有就是詞頻統計的知識,不論是官方文件,還是網上的知識,基本都能隨意百度個幾百篇出來

但是我找半天,確實是沒有找到對詞頻的結果進行全域性排序的操作,實在是苦於搜尋不到,我就自己瞎鼓搗一波,搞了個demo出來,還有決定不找介面了,之前一直說自己忙,沒時間寫blog,現在想想其實還是介面,因為永遠沒有那麼多閒餘的時間給你慢慢學學。。。

 

廢話少聊,這裡實現對結果進行排序的根本,其實也很簡單,藉助MapReduce本身的排序機制,我們只需要進行2次MapReduce即可

在第二次執行MapReduce的時候,我們需要調轉一下key-value的順序,就可以實現對結果資料的排序

 

package cn.cutter.demo.hadoop.demo;

import cn.cutter.demo.hadoop.utils.ZipUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; import java.io.*; import java.net.URI; import java.util.*; /** * @program: cutter-point * @description: 測試hadoop,新加上排序能力 * @author: xiaof * @create: 2018-11-24 20:47 * * (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output) **/ public class WorkCount4 { /** * map類 */ public static class TokenizerMapper extends Mapper<Object, Text, NewKey1, IntWritable> { static enum CountersEnum { INPUT_WORDS } private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean caseSensitive; private Set<String> patternsToSkip = new HashSet<String>(); private Configuration conf; private BufferedReader fis; public void map(Object key, Text value, Mapper<Object, Text, NewKey1, IntWritable>.Context context) throws IOException, InterruptedException { String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase(); for(String pattern : patternsToSkip) { line = line.replaceAll(pattern, "").replace(" ", "").trim(); } if(line.contains("Exception") || line.contains("exception")) { NewKey1 newKey1 = new NewKey1(line, 1l); context.write(newKey1, one); } } } /** * reduce類 */ public static class IntSumReducer extends Reducer<NewKey1, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(NewKey1 newKey1, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); newKey1.setSecond((long)result.get()); context.write(new Text(newKey1.getFirst()), result); } } public static class NewKey1 implements WritableComparable<NewKey1> { private String first; private Long second; public NewKey1() {} public NewKey1(String first, Long second) { this.first = first; this.second = second; } public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public Long getSecond() { return second; } public void setSecond(Long second) { this.second = second; } @Override public int compareTo(NewKey1 o) { //優先根據value進行排序 Long result = this.second - o.second; if(result != 0) return result.intValue(); else return first.compareTo(o.first); } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.write((this.first + "\n").getBytes()); dataOutput.writeLong(this.second); } @Override public void readFields(DataInput dataInput) throws IOException { this.first = dataInput.readLine(); this.second = dataInput.readLong(); } @Override public int hashCode() { return this.first.hashCode() + this.second.hashCode() + Integer.valueOf(random(6)); } @Override public boolean equals(Object obj) { if(!(obj instanceof NewKey1)) { return false; } NewKey1 newKey1 = (NewKey1) obj; return (this.first.equals(newKey1.first)) && (this.second == newKey1.second); } } public static String random(int i) { String sources = "0123456789"; Random random = new Random(); StringBuffer flag = new StringBuffer(); for(int j = 0; j < i; ++j) { flag.append(sources.charAt(random.nextInt(9))); } return flag.toString(); } public static class SortMap1 extends Mapper<Object, Text, IntWritable, Text> { @Override protected void map(Object key, Text value, Mapper<Object, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String words[] = line.split("\t"); if(words.length == 2) { IntWritable intWritable = new IntWritable(); intWritable.set(Integer.valueOf(words[1])); context.write(intWritable, new Text(words[0])); } } } public static class SortReduce1 extends Reducer<IntWritable, Text, Text, IntWritable> { @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), key); } } /** * 使用遠端input目錄的資料,需要用hdfs的目錄,用本地目錄不行 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir", "F:\\hadoop-2.7.7"); Configuration conf = new Configuration(); GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args); // conf.set("fs.default.name", "hdfs://127.0.0.1:9000"); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WorkCount4.class); job.setMapperClass(TokenizerMapper.class); job.setMapOutputKeyClass(NewKey1.class); // job.setCombinerClass(NewKey1.class); //制定reduce類 job.setReducerClass(IntSumReducer.class); //指定輸出<k3,v3>的型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //先解析zip檔案,並刪除zip包 //H:\ideaworkspace\1-tmp\input H:\ideaworkspace\1-tmp\output String temp[] = {"H:\\ideaworkspace\\1-tmp\\input", "H:\\ideaworkspace\\1-tmp\\output"}; String name = random(temp.length); args = temp; ZipUtil.unZipFilesAndDel(args[0]); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1] + name)); //job1加入控制器 ControlledJob ctrlJob1 = new ControlledJob(conf); ctrlJob1.setJob(job); //JOB2設定 Job job2 = Job.getInstance(conf, "word count2"); job2.setJarByClass(WorkCount4.class); job2.setMapperClass(SortMap1.class); job2.setMapOutputKeyClass(IntWritable.class); job2.setMapOutputValueClass(Text.class); //制定reduce類 job2.setReducerClass(SortReduce1.class); //指定輸出<k3,v3>的型別 job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); //job2加入控制器 ControlledJob ctrlJob2 = new ControlledJob(conf); ctrlJob2.setJob(job2); FileInputFormat.setInputPaths(job2, new Path(args[1] + name)); FileOutputFormat.setOutputPath(job2, new Path(args[1] + name + "-result")); //設定作業之間的以來關係,job2的輸入以來job1的輸出 ctrlJob2.addDependingJob(ctrlJob1); //設定主控制器,控制job1和job2兩個作業 JobControl jobCtrl = new JobControl("myCtrl"); //新增到總的JobControl裡,進行控制 jobCtrl.addJob(ctrlJob1); jobCtrl.addJob(ctrlJob2); //線上程中啟動,記住一定要有這個 Thread thread = new Thread(jobCtrl); thread.start(); while (true) { if (jobCtrl.allFinished()) { System.out.println(jobCtrl.getSuccessfulJobList()); jobCtrl.stop(); break; } } // System.exit(job.waitForCompletion(true) ? 0 : 1); } }

 

輔助類,因為伺服器上的日誌都是自動壓縮好的,要想進行分析,那就先要進行解壓

package cn.cutter.demo.hadoop.utils;

import java.io.*;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Random;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;

/**
 * @ClassName ZipUtil
 * @Description TODO
 * @Author xiaof
 * @Date 2018/12/11 23:08
 * @Version 1.0
 **/
public class ZipUtil {

    private static byte[] ZIP_HEADER_1 = new byte[] { 80, 75, 3, 4 };
    private static byte[] ZIP_HEADER_2 = new byte[] { 80, 75, 5, 6 };

    /**
     * 解壓這個目錄的zip檔案
     * @param zipPath
     */
    public static void unZipFilesAndDel(String zipPath) throws IOException {

        File file = new File(zipPath);
        if(file.isDirectory()) {
            //遍歷所有檔案
            File files[] = file.listFiles();
            for (int i = 0; i < files.length; ++i) {
                unZipFilesAndDel(files[i].getAbsolutePath());
            }
        } else {
            if(isArchiveFile(file)) {
                String filename = file.getName();
                unZipFile(file);
                file.delete();
                System.out.println("完成解壓:" + filename);
            }
        }
    }

    public static String random(int i) {
        String sources = "0123456789";
        Random random = new Random();
        StringBuffer flag = new StringBuffer();
        for(int j = 0; j < i; ++j) {
            flag.append(sources.charAt(random.nextInt(9)));
        }

        return flag.toString();
    }

    private static void unZipFile(File file) throws IOException {
        ZipFile zip = new ZipFile(file,Charset.forName("UTF-8"));//解決中文資料夾亂碼
        String name = zip.getName().substring(zip.getName().lastIndexOf('\\') + 1, zip.getName().lastIndexOf('.'));

        BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file));
        ZipInputStream zipInputStream = new ZipInputStream(bufferedInputStream);

        BufferedOutputStream bufferedOutputStream = null;

        ZipEntry zipEntry = null;
        while((zipEntry = zipInputStream.getNextEntry()) != null) {
            String entryName = zipEntry.getName();
            bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(file.getParentFile() + "\\" + name + random(6)));
            int b = 0;
            while((b = zipInputStream.read()) != -1) {
                bufferedOutputStream.write(b);
            }
            bufferedOutputStream.flush();
            bufferedOutputStream.close();
        }
        zipInputStream.close();
        bufferedInputStream.close();
        zip.close();
    }

    /**
     * 判斷檔案是否為一個壓縮檔案
     *
     * @param file
     * @return
     */
    public static boolean isArchiveFile(File file) {

        if (file == null) {
            return false;
        }

        if (file.isDirectory()) {
            return false;
        }

        boolean isArchive = false;
        InputStream input = null;
        try {
            input = new FileInputStream(file);
            byte[] buffer = new byte[4];
            int length = input.read(buffer, 0, 4);
            if (length == 4) {
                isArchive = (Arrays.equals(ZIP_HEADER_1, buffer)) || (Arrays.equals(ZIP_HEADER_2, buffer));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (input != null) {
                try {
                    input.close();
                } catch (IOException e) {
                }
            }
        }

        return isArchive;
    }

    public static void main(String[] args) {
        File file = new File("H:\\ideaworkspace\\1-tmp\\input\\111 - 副本.zip");

        try {
            unZipFile(file);
            boolean res = file.delete();
            System.out.println(res);
        } catch (IOException e) {
            e.printStackTrace();
        }

        System.out.println(isArchiveFile(file));
        System.out.println(file.getAbsolutePath());

    }

}

 

這裡說個點,我發現自己一臺電腦就單單分析20G的資料,都要跑半天,幾個小時下來毛都沒跑出來。。。

尷尬了,於是只能找個幾十M的檔案試試水

 

 

 

 

 

 

 

 我們發現是這個地方報錯非常頻繁,可以從這個入手,看看是那些sql導致的,當然這個程式還有待改進,這裡只能找到發生最頻繁的異常,並不能分析出到底是哪個地方(當然跟記錄日誌的格式也有關係,這個太亂)。。。。