1. 程式人生 > >SODBASE CEP學習(四):類SQL語言EPL與Storm或jStorm整合

SODBASE CEP學習(四):類SQL語言EPL與Storm或jStorm整合

Storm框架原本是設計用來做網際網路短文字處理和一些統計工作的,是一種分散式流式計算框架。在一些場合,特別是在已經用了Storm架構以後,發現又想用EPL語句,Storm和類SQL語言EPL結合也不失為一種方案。對線上規則修改、視窗資料可靠性要求高的地方還有用專用的CEP叢集方案、Hot-Hot HA等方案可供選擇,不過這些不是本文的重點。本文的重點就是讓Storm插上EPL的翅膀,輕鬆解決一些實際專案中的攔路虎。

1.示例操作步驟

前3步是安裝Storm(或jStorm),已經安裝了Storm(或jStorm)的讀者可以跳過,直接到第4步。

(1)下載安裝jdk

(2)下載安裝 zookeeper,啟動zookeeper

     exportZOOKEEPER_HOME="/path/to/zookeeper"
     exportPATH=$PATH:$ZOOKEEPER_HOME/bin
     cp/usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg  

     zkServer.sh start

(3)下載storm,

解壓到linux伺服器相應位置,也可以下載其它版本。

(4)EPL引擎配置目錄,

cep_home目錄拷貝到storm的目錄中,如apache-storm-0.9.4/cep_home。這個目錄也可以自定義,需修改EPLExampleTopology.java程式碼中的cep_home變數的值。

lib/sodbasecep/目錄下jar檔案拷貝到storm的lib目錄下

dist目錄下的cep-application-storm-example.jar拷貝到storm的examples目錄下

注:使用jStorm的讀者,配置方法和Storm一致,本系列文章中在Storm上執行EPL的示例可以同時在Storm和jStorm上執行。

(5)啟動storm

# bin/storm nimbus &

# bin/stormsupervisor &


(6)執行Topology

執行命令啟動topology

 bin/storm jar examples/cep-application-storm-example.jar com.sodbase.integration.storm.EPLExampleTopology

(7)輸出結果(本示例的功能是IT系統監控中的分析函式呼叫響應時間,是否超時)


2. 工作原理

Storm中使用SQL,較為簡便的方法就是使用EPLBolt。這種型別的一個Bolt就代表著一個SQL(EPL)語句,有時也能代表多個SQL語句。

2.1 EPL建模

首先,建議大家用Studio建模,建模後匯出為XML檔案,能大大提高建模速度。本例的EPL XML檔案在files目錄下的calltimeout.xml

EPL的輸入流名稱使用com.sodbase.inputadaptor.StubInputAdaptor進行宣告,從而在EPL中可以使用。

EPL的輸出流使用com.sodbase.outputadaptor.storm.SodbaseCepStormOutputAdaptor介面卡

    <outputAdaptors>
        <isOutputAsSelection>true</isOutputAsSelection>
        <outputAdaptorClassName>com.sodbase.outputadaptor.storm.SodbaseCepStormOutputAdaptor</outputAdaptorClassName>
        <isExternal>false</isExternal>
        <queryName>calltimeoutnotification</queryName>
    </outputAdaptors>

查詢響應時間的語句

CREATE QUERY callnottimeout 
SELECT T2._start_time_-T1._start_time_ AS responsetime, T1.callerid AS functionname ,'false' AS timeout 
FROM T1:callstream,T2:callstream 
PATTERN T1&T2 
WHERE T1.callerid=T2.callerid  AND T1.eventtype ='start' AND T2.eventtype ='end' 
WITHIN 1000 
查詢超時的語句
CREATE QUERY calltimeoutnotification 
SELECT '-1' AS responsetime, T1.callerid AS functionname,'true' AS timeout 
FROM T1:callstream,T2:callstream,T3:calltimeoutevent 
PATTERN T1;!T2;T3  
WHERE T3._end_time_-T1._end_time_=1000 AND T2.callerid=T3.callerid AND T1.eventtype='start' 
WITHIN 1000 
下面語句加一個延時輸出就可以形成超時timer用在上面的T3:calltiimeoutevent,
CREATE QUERY calltimeout 
SELECT 'timer' AS type,T1.time AS time,T1.callerid  AS callerid 
FROM T1:callstream 
PATTERN T1 WHERE T1.eventtype ='start' 
WITHIN 0
<outputAdaptors>
    <isOutputAsSelection>true</isOutputAsSelection>
    <outputAdaptorClassName>com.sodbase.outputadaptor.timer.FixedDelayTimerOutputAdaptor</outputAdaptorClassName>
    <adaptorParams>calltimeoutevent</adaptorParams>
    <adaptorParams>1000</adaptorParams>
    <isExternal>false</isExternal>
    <queryName>calltimeout</queryName>
</outputAdaptors>


將EPL引擎的處理結果傳到EPLBolt的下一個bolt中。

2.在storm中使用EPL模型

(1)將EPL需要的包拷貝到storm的lib目錄下,再啟動nimbus和supervisor

sodbase-cep-engine.jar  sodbase-dataadaptor-storm.jar  sodbase-dataadaptor-timer.jar  sodbase-studio.jar  xalan-2.7.1.jar  xercesImpl-2.9.1.jar  xml-apis-1.3.04.jar

(2)在storm目錄下建一個目錄cep_home,存放EPL引擎的配置檔案和日誌。把configuration資料夾、logging.properties放到cep_home下面。cep_home的路徑可以自定義,和下面EPLExampleTopology 程式碼中一致即可。

(3)EPL XML模型檔案放到放到cep_home/files目錄下,位置也可自定義,要和下面EPLExampleTopology 程式碼中一致。

(4)編寫Topology應用EPLExampleTopology, 程式碼如下

public class EPLExampleTopology {

  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("event", new RandomEventSpout(), 1);
    //TODO: to specify stream name that EPL uses, and the output fields
    Fields outputFields = new Fields("functionname","responsetime","timeout");
    String cep_home="/user/apache-storm-0.9.4/cep_home";
    String cepmodelfile="/user/apache-storm-0.9.4/cep_home/files/calltimeout.xml";
    builder.setBolt("EPL", new EPLBolt("callstream",outputFields,cep_home,cepmodelfile), 1).shuffleGrouping("event");
    builder.setBolt("print1", new PrintBolt(), 1).shuffleGrouping("EPL");

    Config conf = new Config();
    conf.setDebug(false);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}

builder.setBolt("EPL", new EPLBolt("callstream",outputFields,cep_home,cepmodelfile), 1).shuffleGrouping("event");

表示將"event" Spout的輸出,接到EPL引擎的streamname "callstream",輸出outputFieds。用了EPLBolt專門處理EPL語句,建構函式有4個引數

streamname:與EPL模型中StubInputAdaptor的名稱對應

outputFields:Bolt的輸出欄位,應與EPL模型中SELECT語句一致

cep_home:EPL引擎配置檔案和日誌檔案目錄

cepmodelfile:EPL XML模型檔案的路徑


(5)EPLExampleTopology的RandomEventSpout和PrintBolt

RandomEventSpout提供呼叫開始結束事件模擬資料的spout

public class RandomEventSpout extends BaseRichSpout {
  SpoutOutputCollector _collector;
  Random _rand;
  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    _collector = collector;
    _rand = new Random();
  }

  @Override
  public void nextTuple() {
    
    int functionid=_rand.nextInt(10000);
    String a = "function-"+functionid;
    String b = "start";    
    _collector.emit(new Values(a,b));
    Utils.sleep(500);
    if(functionid%2==0)
    {    	
        b = "end";    
        _collector.emit(new Values(a,b));
    }
  }

  @Override
  public void ack(Object id) {
  }

  @Override
  public void fail(Object id) {
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("callerid","eventtype"));
  }

}
PrintBolt作用是螢幕列印輸出
public  class PrintBolt extends BaseRichBolt {
    OutputCollector _collector;  
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }
    public void execute(Tuple tuple) {
    	Fields fields = tuple.getFields();
    	for(String field:fields)
    		System.out.print(field+": "+tuple.getValueByField(field));
    	System.out.println();	      
    	_collector.ack(tuple);
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
  }


(6)將Topology打包成jar,在storm中測試執行。

本文示例的功能是監控系統中監測服務呼叫或系統呼叫的響應時間,是否超時。

參考:

SODBASE CEP用於輕鬆、高效實施資料監測、監控類、交易類、日誌分析類專案微笑。EPL語法見SODSQL寫法與示例。圖形化建模請使用SODBASE Studio

相關推薦

SODBASE CEP學習SQL語言EPLStormjStorm整合

Storm框架原本是設計用來做網際網路短文字處理和一些統計工作的,是一種分散式流式計算框架。在一些場合,特別是在已經用了Storm架構以後,發現又想用EPL語句,Storm和類SQL語言EPL結合也不失為一種方案。對線上規則修改、視窗資料可靠性要求高的地方還有用專用的CE

SODBASE CEP學習線上動態修改EPL規則

在實際專案中,常常會需要輸入資料不斷的條件下,線上修改規則。例如:交易策略中,以一定時間視窗的價格平均值和閾值比較決定交易行為。當時間視窗滑動過程中,需要修改閾值。如果單純的停掉這條EPL,修改,再啟動,之前的時間視窗的資料就會丟失,即使不丟失,因為閾值變了,之前的中間結

SODBASE CEP學習流式計算中的SQL語言EPL

(本文中類SQL語句建模、單元測試建議使用SODBASE Studio,參考示例見視訊教程。 SODBASE CEP中,類SQL語言EPL(事件處理語言)也叫做SODSQL。其基本寫法為 CREATE QUERY 查詢名稱 SELECT 查詢欄位 FROM 流 PAT

SODBASE CEP學習流式計算中的儲存和ETL

許多流式計算應用離不開儲存,也就是把資料存在硬碟上,例如歷史資料的儲存。畢竟硬碟適合長期地儲存大量資料。在介紹具體方法之前,先講一個實際專案裡經常要用到的原則,那就是:儲存讀寫速度要和記憶體計算速度匹配。 怎麼理解呢?例如記憶體計算速度是10萬events/s,儲存讀寫速度

ElasticStack學習ElasticSearch文件使用操作

一、文件的CRUD介紹 ElasticSearch中存在五種操作,分別如下: 1、Index 該操作表示:如果文件的ID不存在,則建立新的文件。若有相同的ID,先刪除現有文件,然後再建立新的文件,同時版本會增加。 語法格式如下: PUT index_name/_doc/100 {"field1

Hadoop程式設計學習使用FileSystem進行檔案讀寫及檢視檔案資訊

http://www.cnblogs.com/beanmoon/archive/2012/12/11/2813235.html 在這一節我們要深入瞭解Hadoop的FileSystem類——這是與與hadoop的檔案系統互動的重要介面。雖然我們只是著重於HDFS的實現,但

Petri網學習Petri網的結構性質

一、結構有界性&守恆性 1. 結構有界性 定義:設N=(P,T;F)為一個網。對N賦予任意的初始標識M0,網(N,M0)都是有界的,則稱N為結構有界網; 再回憶一下什麼是有界petri網:在PN=(P,T;F,M0)中,,庫所p都有界,則稱PN為有界petri

PE檔案格式學習匯入表

UPDATE: 在文章的末尾更新了一張圖,在網上找的,有助於理解匯入表的結構 1.概述 匯入表是逆向和病毒分析中比較重要的一個表,在分析病毒時幾乎第一時間都要看一下程式的匯入表的內容,判斷程式大概用了哪些功能。 匯入表是資料目錄表中的第2個元素,排在匯出表的

ionic學習Tab控制元件 學習

實現功能: 1.新增tabs頁面:下部新增一個新聞按鈕 2.去掉二級頁面tabs選單: 3.修改返回按鈕:上圖的左上方箭頭 步驟 1. 將news頁面放在下面   在tabs.ts和tabs.html中引入並顯示news元件     圖示在這

rabbitmq學習利用rabbitmq實現遠端rpc呼叫

一、rabbitmq實現rpc呼叫的原理 ·rabbitmq實現rpc的原理是:客戶端向一個佇列中傳送訊息,並註冊一個回撥的佇列用於接收服務端返回的訊息,該訊息需要宣告一個叫做correaltionId的屬性,該屬性將是該次請求的唯一標識。服務端在接受到訊息(在需要時可以驗證correaltionId)後,

rabbitmq學習利用rabbitmq實現遠程rpc調用

ext new urn trace cat ued 創建 exc false 一、rabbitmq實現rpc調用的原理 ·rabbitmq實現rpc的原理是:客戶端向一個隊列中發送消息,並註冊一個回調的隊列用於接收服務端返回的消息,該消息需要聲明一個叫做correaltio

python學習python變數和函式

python用下劃線作為變數字首和字尾指定特殊變數 _xxx 不能用’from module import *’匯入 __xxx__ 系統定義名字 __xxx 類中的私有變數名 核心風格:避免用下劃線作為變數名的開始。   因為下劃線對直譯器有特殊的意義,而且是內建

Spring Boot學習使用@SpringBootTest註解進行單元測試

一、簡介 專案中經常會遇到需要單元測試的情況,那麼SpringBoot如何實現這種需求,使用@SpringBootTest註解可以執行環境,測試後臺程式碼。 二、環境準備 eclipse + maven + Spring Boot 三、程式碼示例 pom.xml

webpack學習配置CleanWebpackPlugin

demo地址: https://github.com/Lkkkkkkg/webpack-demo 上次配置HtmlWebpackPlugin: https://blog.csdn.net/qq593249106/article/details/84900169 繼上次配置完 HtmlWe

Spring的學習Web中的Spring

Spring通常用來開發Web應用。 SpringMVC的執行過程: 我們可以從以下的圖來分析SpringMVC的的執行過程。 1、客戶端在傳送請求的時候,會呼叫DispatcherServlet,Dispatch是SpringMVC的入口,Dispatche

SpringBoot+Shiro學習Realm授權

上一節我們講了自定義Realm中的認證(doGetAuthenticationInfo),這節我們繼續講另一個方法doGetAuthorizationInfo授權 授權流程 流程如下: 首先呼叫Subject.isPermitted/hasRole介面,其會委託給Security

Python+OGR庫學習重投影shp檔案並另存,屬性表保持不變

程式碼關鍵點 1、首先要定義好轉換引數 2、主要操作物件是要素,需要提前建立好輸出檔案,然後遍歷所有要素,對每一個幾何物件進行座標轉換 3、輸出檔案的欄位屬性定義需要從輸入檔案讀取 程式碼思路 1、匯入相關包,切換路徑,註冊驅動 2、定義轉換關係 3、開啟輸入檔案,讀取到圖層

caffe學習py-faster-rcnn配置,執行測試程式Ubuntu

上一篇部落格中講了在Ubuntu下安裝caffe的經驗總結(各種問題,簡直懷疑人生了)。部落格連結:點我開啟 faster-rcnn有兩個版本,分別是Python的和MATLAB的。這裡介紹python版本的faster-rcnn的配置。 網上有很多相關的教程,起初我在配置

GitHub學習Phpstorm中的git使用2--拉取工程composer使用

    之前我在一臺電腦上將一份不完整的工程儲存在github上,現在我回到家中,換了一臺電腦,接下來就是要用另一臺電腦拉取github中的工程,並用composer把整個工程的依賴檔案什麼亂七八糟的檔案都下下好。     1.首先開啟phpstorm,按圖

Java虛擬機器學習10載入器(ClassLoader)

類載入器 類載入器(ClassLoader)用來載入 class位元組碼到 Java 虛擬機器中。一般來說,Java 虛擬機器使用 Java 類的方式如下:Java 原始檔在經過 Javac之後就被轉換成 Java 位元組碼檔案(.class 檔案)。類載入器負責讀