1. 程式人生 > >spark之DataFrame分析日誌檔案

spark之DataFrame分析日誌檔案

場景:我們利用DataFrame對日誌中出現的錯誤次數進行一個統計。
一,準備日誌檔案:
我這裡是使用的hadoop的日誌檔案,因為以前配置檔案沒有配好,所有每次啟動hadoop會有錯誤的資訊,記錄在日誌檔案。

二,書寫流程:

    1,讀取日誌檔案,將檔案轉化成RDD。
    2,將日誌檔案通過map函式將資料轉化行的格式返回。
    3,建立元型別, 即建立schema,為RDD轉化為DataFrame提供格式。
    4,根據元資料型別將JavaRDD<Row>轉化成DataFrame
    5,使用過濾器篩選錯誤資訊。
    6,輸出錯誤資訊統計次數。

三,程式碼展示:

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class CountErrors2 {
    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");
        JavaSparkContext sc=new JavaSparkContext("local","CountErrors2",conf);
        System.out.println("建立連線成功:"+conf);
        //讀取日誌檔案
        JavaRDD<String> textFile=sc.textFile("hdfs://192.168.61.128:9000/spark001/hadoop.log");
        //將日誌檔案通過map函式將資料轉化行的格式返回
        JavaRDD<Row> rowRDD=textFile.map(new Function<String,Row>(){
            public Row call(String line) throws Exception {
                return RowFactory.create(line);
            }});

        //StructField 結構化檔案格式
        List<StructField> fileds=new ArrayList<StructField>();
        //建立一個結構化檔案,三個引數 分別是  name 值的型別   是否設定為表
        fileds.add(DataTypes.createStructField("line", DataTypes.StringType, true));
        // 建立元型別, 即建立schema
        StructType schema=DataTypes.createStructType(fileds);
        SQLContext sqlContext=new org.apache.spark.sql.SQLContext(sc);
        //根據元資料型別將JavaRDD<Row>轉化成DataFrame
        DataFrame df=sqlContext.createDataFrame(rowRDD, schema);
        //過濾檔案
        DataFrame errors=df.filter(df.col("line").like("%ERROR%"));
        //統計出現次數
        long result=errors.count();
        System.out.println("log檔案總共記錄有:"+result+":次出錯!");
    }
}