1. 程式人生 > >SODBASE CEP學習進階篇(七)續:SODBASE CEP與Spark streaming整合-低延遲規則管理

SODBASE CEP學習進階篇(七)續:SODBASE CEP與Spark streaming整合-低延遲規則管理

許多大資料平臺專案採用流式計算來處理實時資料,會涉及到一個環節:處理規則管理。因為使用者經常有自己配置資料處理規則或策略的需求。同時,維護人員來也有也有將規則提取出來的需求,方便變更和維護的需求。我們知道Spark streaming作為資料歸檔備份時吞吐量高,與Hadoop整合相對方便。但是Spark streaming也存在高延時,框架過重帶來策略規則修改複雜的問題。本文介紹Spark streaming加SODBASE SQL來實現規則管理的示例。

1.示例

1.1 示例簡介

本示例的資料來源是Kafka,從採集裝置到Kafka的過程沒有畫出來。許多時候這種資料是做了二進位制壓縮的,本例中就是這樣。資料解析規則採用了SODBASE SQL語句來表達規則,比如資料的第0位到第7位轉化為整形,作為裝置轉速值。規則處理過的資料可以通過socket傳給Spark streaming,也可以再通過Kafka傳給Spark streaming,從而解決Spark streaming的實時性較弱和定製化能力弱的問題。通過socket的方式前面文章有介紹,本文示例是再通過Kafka傳給Spark streaming。


1.2 操作步驟

1.2.1 安裝Kafka,建立兩個topic(test,test2)

找一臺linux機器,從官方網站下載Kafka,解壓,啟動

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

建立test topic, bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

1.2.2 下載SODBASE規則示例模型和軟體

(1)SODBASE CEP Server 2.0.23(sp3)以上版本,在LInux下解壓,啟動

$chmod 777 catalina.sh

$./catalina.sh run

初始化狀態伺服器

$curl http://localhost:16111/sodbase-cep-server-webservice-1.0.1/install

(2)SODBASE CEP Admin用於安裝規則模型,在Linux下解壓

(3)SODBASE Studio示例模型中的britork-actuator.soddata2規則模型檔案到本地

1.2.3 使用SODBASE CEP Admin安裝啟動britork-actuator.soddata2

$ cd SODBASE-CEP-Server-Admin-2.0-u24/bin

$ ./installmodel.sh -h localhost -P 16111 -f "../example/britork-actuator.soddata2" -u admin -p cep
$ ./startmodel.sh -h localhost -P 16111 -m britork-actuator -v 1.0 -u admin -p cep


1.2.4執行Spark Streaming

編譯執行一下Spark程式碼,就可以接收資料,接收到的資料這裡作了螢幕列印。Spark streaming本身不需要經常修改,通過配置上文SODSQL語句就可以改變資料處理的邏輯。在實際專案中也可以落地到Hdfs,或者Hbase等儲存中。

package example.streaming

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder

object Kafka {
  def main(args:Array[String])
  {
    
    val sparkConf = new SparkConf().setAppName("KafkaTest")
    val ssc =  new StreamingContext(sparkConf, Seconds(60))
    ssc.checkpoint("checkpoint")
  val lines =KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, Map[String, String]("group.id" -> "archive","metadata.broker.list" -> "localhost:9092"), Set("test")) 
      
    
    lines.foreachRDD { (rdd, time) =>{
     // println(time)
     rdd.foreach(x=>println(x._1+"----------"+x._2))
    }}
    ssc.start()
    ssc.awaitTermination()
  }
}

1.3 工作原理

1.3 模型檔案

使用SODBASE Studio可以將.soddata2檔案轉為xml檔案,其中的規則為

SELECT JAVASTATIC:udf.Bytes:bitsToInt(T1.message,'0','7') AS torque 
FROM T1:DCSstream
完整XML規則檔案如下
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<GraphModelData>
    <CEPSoftwareVersion>2</CEPSoftwareVersion>
    <inputAdaptors>
        <inputAdaptorClassName>com.sodbase.inputadaptor.kafka.KafkaInputAdaptor</inputAdaptorClassName>
        <adaptorParams>DCSstream</adaptorParams>
        <adaptorParams>test</adaptorParams>
        <adaptorParams>localhost:2181</adaptorParams>
        <adaptorParams>message</adaptorParams>
        <isExternal>false</isExternal>
    </inputAdaptors>
    <SODSQLs>CREATE QUERY britork-actuator SELECT JAVASTATIC:udf.Bytes:bitsToInt(T1.message,'0','7') AS torque FROM T1:DCSstream PATTERN T1 WHERE   WITHIN 0 </SODSQLs>
    <outputAdaptors>
        <isOutputAsSelection>true</isOutputAsSelection>
        <outputAdaptorClassName>com.sodbase.outputadaptor.PrintEventOutputAdaptor</outputAdaptorClassName>
        <adaptorParams>false</adaptorParams>
        <adaptorParams>true</adaptorParams>
        <isExternal>false</isExternal>
        <queryName>britork-actuator</queryName>
    </outputAdaptors>
    <outputAdaptors>
        <isOutputAsSelection>true</isOutputAsSelection>
        <outputAdaptorClassName>com.sodbase.outputadaptor.kafka.KafkaOutputAdaptor</outputAdaptorClassName>
        <adaptorParams>test2</adaptorParams>
        <adaptorParams>localhost:9092</adaptorParams>
        <isExternal>false</isExternal>
        <queryName>britork-actuator</queryName>
    </outputAdaptors>
    <modelName>britork-actuator</modelName>
    <modelVersion>1.0</modelVersion>
    <modelDescription></modelDescription>
</GraphModelData>

參考:
SODBASE CEP學習進階篇(七):SODBASE CEP與Spark streaming整合

SODBASE CEP用於輕鬆、高效實施資料監測、監控類、實時交易類專案微笑快取擴充套件參見