1. 程式人生 > >Esper入門及示例(CEP引擎)

Esper入門及示例(CEP引擎)

一,概述

        關係型資料用來查詢相對靜態的資料,如果執行一些複雜的查詢,要降低查詢的頻度。傳統關係型資料庫普遍將資料儲存在硬碟(也有記憶體資料庫),它的檢索效能受限於硬碟訪問效能。受制於關係型資料庫的設計,如果頻繁查詢資料來實現實時統計則需要在較短時間內構建多次查詢語句(SQL),每次檢索都耗時較長,關係型資料庫會成為系統瓶頸。總而言之,傳統關係型資料庫並不適合每秒成百上千次的查詢統計。
        Esper引擎可以使用類似於SQL的EPL語句來構建處理模型,處理每秒幾萬到幾十萬的實時資料的查詢統計。它的工作有點像倒過來的關係型資料庫,它不需要像資料庫那樣儲存資料,而是先構建查詢語句,引擎依據這些處理模型實時的輸出符合的結果。而關係型資料要查詢語句提交後才會輸出結果。
Esper除核心jar包之外還有諸如io,jdbc,jmx等jar。本文只講解核心的前幾章基礎知識。

二,例子

    先看一個Epser處理實時資料的例子:
    收集某網站的實時使用者訪問日誌(accesslog),資料欄位如下:
    ip(訪客ip)、time(訪問時間)、url(頁面地址)、httpcode(狀態碼)、agent(瀏覽器頭資訊)、sizeinbytes(資料大小)、等等。

    場景1,分析1小時(週期)產生的狀態碼數量:
    select httpcode,count(*) as hz from accesslog.win:time_batch(1 hour) group by httpcode otder by hz desc;


    場景2,分析1小時(週期)訪客對url的訪問頻率:
    select ip,url,count(*) as hz from accesslog.win:time_batch(1 hour) group by ip,url order by  hz desc;


    場景3,找到可能危險的請求(狀態碼403 404),分析1小時(週期)訪客對url的訪問頻率:
    select ip,url,count(*) as hz from accesslog(httpcode in(403,404)).win:time_batch(1 hour) group by ip,url order by  hz desc;
    注:三個例子使用了時間批量視窗,它會收集此時間間隔資料一起統計,週期內只輸出一次結果(這樣更像是關係型資料庫的輸出形式),以便於理解。


三,入門

    1,Esper能做什麼?優點有哪些?
        使用SQL的形式來處理實時資料,支援查詢、聚合、連線、過濾等,非常容易上手。
        多種形式(時間/長度)視窗統計事件資料,事件之間可以關聯分析。
        高吞吐(10萬事件/秒)低延時(毫秒級別)。
        核心只有一個jar包,易於擴充套件和釋出。經常和實時分散式框架Storm一起使用。
    2,基本概念
        事件(Event):就是要統計分析的源資料。 Esper支援多種結構(Java POJO、Map 、陣列 、XML)。 POJO的效能最高,map和陣列有利於擴充套件。
        處理模型:
        自定義類實現UpdateListener介面,用來接收epl產生的結果事件:


            public interface UpdateListener{
                public void update(EventBean[] newEvents, EventBean[] oldEvents);
            }


        EventBean類:
 



        EPL:
            首先我們看一個非常簡單的EPL語句:select * from Withdrawal;這個語句選擇所有的Withdrawal事件,每一次新的事件都會觸發updatelistener的update方法。
 
            使用長度視窗EPL: select * from Withdrawal.win:length(5);這條語句選擇最近5條Withdrawal資料。執行如下:


 
            使用過濾和where條件的EPL:
                1)select * from Withdrawal(amount>=200).win:length(5);
                2)select * from Withdrawal.win:length(5) where amount >= 200
                上面兩條EPL語句表示選擇符合amount>=200條件的最近5條資料,不同的是第一條是過濾,只有符合條件的事件才會進入引擎,第二條是where條件,不管是否               符合條件,事件都會進入引擎。仔細觀察下圖可看出區別。


   
                

以上示例是長度視窗,類似的還有時間視窗,批量長度視窗和批量時間視窗。
                批量視窗收集資料,在給定的時間間隔或長度滿足條件時再觸發計算和輸出。


            Aggregation and Grouping(聚合函式和分組統計):
            因為EPL與SQL很接近,所以語句語法基本和SQL一樣。
            // 統計進入的1小時accesslog事件,獨立ip(訪客)數量;
            select count(distict(ip)) from accesslog.win:time_batch(1 hour)


            // 統計進入的1小時accesslog事件,按ip分組統計ip(訪客)訪問的url(頁面)個數。 
            select ip, count(distict(url)) from accesslog.win: time_batch(1 hour) group by ip


            //統計進入的1小時accesslog事件,按ip分組統計訪客產生的流量。
            select ip,sum(sizeinbytes) from accesslog.win: time_batch(1 hour) group ip


            當然,以上EPL只能解決部分簡單場景。如果要深入使用還要了解Context、Patterns、Windows、View等知識。


四,例項

    這小節我們解決一個實際中(簡化了分析場景)遇到的問題:
    為了更好的瞭解客戶的流量情況,我們收集了客戶主機的網路訪問資料(netaccesslog);欄位如下所示:
    sip(源主機地址)、dip(目的主機地址)、packets(包數量)、octets(流量)、dcust(目的客戶)
        1)實時統計主機的訪問資料包個數。並設定一個閾值(35000pps),將超出訪問資料包頻率的主機顯示出來。
        2)實時統計客戶主機的訪問來源。以便統計訪問情況和排查1)中問題產生的原因。


    場景1,
        select dcust ,dip,sum(packets)/30 as pps 
        from netaccesslog.win:time(30 sec) 
        group by dcust ,dip 
        having sum(packets) /30 >35000 
        output snapshot every 1 sec
    使用時間視窗,統計最近30秒,客戶目的dip的被訪問情況(pps),只輸出pps大於35000的客戶及dip資訊,每1秒輸出一次結果資訊(保證時效性)。


    場景2,
        Select dcust,dip,sip,sum(packets)/30 as pps, sum(octets)/30 as bps
        From netaccesslog.win:time_batch (30 sec)
        Group by dcust,dip,sip
    使用時間批量視窗,統計30秒週期,客戶的主機被其他主機訪問情況。輸出客戶、目標地址、源地址、pps、bps資訊。結果儲存進資料庫以便查詢展現。


以下是原始碼, 引入esper 的jar包就可以執行

<span style="font-family:Microsoft YaHei;">package udf.test;

import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;

import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

/**
 * 簡化的場景分析示例.
 * 
 */
public class Test {
    static EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
    static EPAdministrator admin = epService.getEPAdministrator();

    public static void main(String[] args) throws Exception {
        // 為了更好的瞭解客戶的流量情況,我們收集了客戶主機的網路訪問資料(netaccesslog);欄位如下所示:
        // Sip(源主機地址)、Dip(目的主機地址)、Packets(包數量)、Octets(流量)、DCust(目的客戶)
        // 1) 實時統計主機的訪問資料包個數。並設定一個閾值(35000pps),將超出訪問資料包頻率的主機顯示出來。
        // 2) 實時統計客戶主機的訪問來源。以便統計訪問情況和排查1)中問題產生的原因。

        // 以下示例, 隨機生成網路訪問資料,定時發出一條有異常的資料.用來觸發展示場景1和場景2的結果.
        String netAccessLog = NetAccessEvent.class.getName();
        // 1, 編寫EPL / 註冊監聽
        String epl1Str = "select dcust ,dip,sum(packets)/30 as pps from " + netAccessLog+ ".win:time(30 sec) group by dcust ,dip having sum(packets) /30 >35000 output snapshot every 1 sec";
        String epl2Str = "select dcust,dip,sip,sum(packets)/30 as pps, sum(octets)/30 as bps,count(*) as num from " + netAccessLog+ ".win:time_batch (30 sec) group by dcust,dip,sip order by dcust,dip,sip";
        EPStatement epl1 = admin.createEPL(epl1Str);
        EPStatement epl2 = admin.createEPL(epl2Str);
        // 2,編寫updatelistener監聽結果
        epl1.addListener(new NetAccessListener_epl1());
        epl2.addListener(new NetAccessListener_epl2());
        // 3,傳送隨機的網路訪問資料 ,定時製造一條有異常的資料, 每100毫秒傳送一條事件,每512條資料傳送一條異常資料(來測試場景1)
        EPRuntime runtime = epService.getEPRuntime();
        SendEvent(runtime);
        // 等待引擎處理資料
        TimeUnit.SECONDS.sleep(600);
    }

    public static void SendEvent(final EPRuntime runtime) {
        Thread sendThread = new Thread(new Runnable() {
            @Override
            public void run() {
                String custPrefix = "客戶";
                String dstIpPrefix = "200.200.200.";
                String srcIpPrefix = "100.100.";
                int count = 1;
                while (true) {
                    try {
                        NetAccessEvent nae = new NetAccessEvent();
                        nae.setDcust(custPrefix + "_" + ((int) (Math.random() * 10)));
                        nae.setDip(dstIpPrefix + ((int) (Math.random() * 20)));
                        nae.setSip(srcIpPrefix + ((int) (Math.random() * 10)) + "." + ((int) (Math.random() * 10)));
                        nae.setOctets(((long) (Math.random() * 100)));
                        nae.setPackets(((long) (Math.random() * 100)));
                        runtime.sendEvent(nae);
                        count++;
                        if (count % 100 == 0) {
                            System.out.println("傳送了" + count + "條事件");
                        }
                        if (count % 512 == 0) {
                            NetAccessEvent naeAnomaly = new NetAccessEvent();
                            naeAnomaly.setDcust(custPrefix + "_" + ((int) (Math.random() * 10)));
                            naeAnomaly.setDip(dstIpPrefix + ((int) (Math.random() * 20)));
                            naeAnomaly.setSip(srcIpPrefix + ((int) (Math.random() * 10)) + "." + ((int) (Math.random() * 10)));
                            naeAnomaly.setOctets(35000 * 30l);
                            naeAnomaly.setPackets(35000 * 30l);
                            runtime.sendEvent(naeAnomaly);
                        }
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {

                    }
                }
            }
        });
        sendThread.start();
    }
}

class NetAccessListener_epl1 implements UpdateListener {
    public void update(EventBean[] newEvents, EventBean[] oldEvents) {
        if (newEvents != null) {
            for (EventBean eventbean : newEvents) {
                System.out.println("客戶:" + eventbean.get("dcust") + "的主機:" + eventbean.get("dip") + "的訪問超過了限制; " + "pps:" + eventbean.get("pps"));
            }
        }
    }
}

class NetAccessListener_epl2 implements UpdateListener {
    public void update(EventBean[] newEvents, EventBean[] oldEvents) {
        if (newEvents != null) {
            System.out.println("\t 客戶\t 目標主機\t 源主機\t pps\t bps\t 條目數\n");
            DecimalFormat df = new DecimalFormat("#0.0000");
            for (EventBean eventbean : newEvents) {
                // 沒有資料的專案不輸出
                if (eventbean.get("pps") == null) {
                    continue;
                }
                System.out.println(eventbean.get("dcust") + "\t" + eventbean.get("dip") + "\t" + eventbean.get("sip") + "\t" + df.format(eventbean.get("pps")) + "\t" + df.format(eventbean.get("bps")) + "\t" + eventbean.get("num"));
            }
        }
    }
}

/**
 * 代表網路訪問事件資料
 */
class NetAccessEvent {
    private String sip;
    private String dip;
    private Long packets;
    private Long octets;
    private String dcust;

    public String getSip() {
        return sip;
    }

    public void setSip(String sip) {
        this.sip = sip;
    }

    public String getDip() {
        return dip;
    }

    public void setDip(String dip) {
        this.dip = dip;
    }

    public Long getPackets() {
        return packets;
    }

    public void setPackets(Long packets) {
        this.packets = packets;
    }

    public Long getOctets() {
        return octets;
    }

    public void setOctets(Long octets) {
        this.octets = octets;
    }

    public String getDcust() {
        return dcust;
    }

    public void setDcust(String dcust) {
        this.dcust = dcust;
    }

}
</span>