SODBASE CEP學習進階篇(七)續:SODBASE CEP與Spark streaming整合-低延遲規則管理
許多大資料平臺專案採用流式計算來處理實時資料,會涉及到一個環節:處理規則管理。因為使用者經常有自己配置資料處理規則或策略的需求。同時,維護人員來也有也有將規則提取出來的需求,方便變更和維護的需求。我們知道Spark streaming作為資料歸檔備份時吞吐量高,與Hadoop整合相對方便。但是Spark streaming也存在高延時,框架過重帶來策略規則修改複雜的問題。本文介紹Spark streaming加SODBASE SQL來實現規則管理的示例。
1.示例
1.1 示例簡介
本示例的資料來源是Kafka,從採集裝置到Kafka的過程沒有畫出來。許多時候這種資料是做了二進位制壓縮的,本例中就是這樣。資料解析規則採用了SODBASE SQL語句來表達規則,比如資料的第0位到第7位轉化為整形,作為裝置轉速值。規則處理過的資料可以通過socket傳給Spark streaming,也可以再通過Kafka傳給Spark streaming,從而解決Spark streaming的實時性較弱和定製化能力弱的問題。通過socket的方式前面文章有介紹,本文示例是再通過Kafka傳給Spark
streaming。
1.2 操作步驟
1.2.1 安裝Kafka,建立兩個topic(test,test2)
找一臺linux機器,從官方網站下載Kafka,解壓,啟動
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
建立test topic, bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
1.2.2 下載SODBASE規則示例模型和軟體
(1)SODBASE CEP Server 2.0.23(sp3)以上版本,在LInux下解壓,啟動
$chmod 777 catalina.sh
$./catalina.sh run
初始化狀態伺服器
$curl http://localhost:16111/sodbase-cep-server-webservice-1.0.1/install
(2)SODBASE CEP Admin用於安裝規則模型,在Linux下解壓
(3)SODBASE Studio示例模型中的britork-actuator.soddata2規則模型檔案到本地
1.2.3 使用SODBASE CEP Admin安裝啟動britork-actuator.soddata2
$ cd SODBASE-CEP-Server-Admin-2.0-u24/bin
$ ./installmodel.sh -h localhost -P 16111 -f "../example/britork-actuator.soddata2" -u admin -p cep
$ ./startmodel.sh -h localhost -P 16111 -m britork-actuator -v 1.0 -u admin -p cep
1.2.4執行Spark Streaming
編譯執行一下Spark程式碼,就可以接收資料,接收到的資料這裡作了螢幕列印。Spark streaming本身不需要經常修改,通過配置上文SODSQL語句就可以改變資料處理的邏輯。在實際專案中也可以落地到Hdfs,或者Hbase等儲存中。
package example.streaming
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
object Kafka {
def main(args:Array[String])
{
val sparkConf = new SparkConf().setAppName("KafkaTest")
val ssc = new StreamingContext(sparkConf, Seconds(60))
ssc.checkpoint("checkpoint")
val lines =KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, Map[String, String]("group.id" -> "archive","metadata.broker.list" -> "localhost:9092"), Set("test"))
lines.foreachRDD { (rdd, time) =>{
// println(time)
rdd.foreach(x=>println(x._1+"----------"+x._2))
}}
ssc.start()
ssc.awaitTermination()
}
}
1.3 工作原理
1.3 模型檔案
使用SODBASE Studio可以將.soddata2檔案轉為xml檔案,其中的規則為
SELECT JAVASTATIC:udf.Bytes:bitsToInt(T1.message,'0','7') AS torque
FROM T1:DCSstream
完整XML規則檔案如下<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<GraphModelData>
<CEPSoftwareVersion>2</CEPSoftwareVersion>
<inputAdaptors>
<inputAdaptorClassName>com.sodbase.inputadaptor.kafka.KafkaInputAdaptor</inputAdaptorClassName>
<adaptorParams>DCSstream</adaptorParams>
<adaptorParams>test</adaptorParams>
<adaptorParams>localhost:2181</adaptorParams>
<adaptorParams>message</adaptorParams>
<isExternal>false</isExternal>
</inputAdaptors>
<SODSQLs>CREATE QUERY britork-actuator SELECT JAVASTATIC:udf.Bytes:bitsToInt(T1.message,'0','7') AS torque FROM T1:DCSstream PATTERN T1 WHERE WITHIN 0 </SODSQLs>
<outputAdaptors>
<isOutputAsSelection>true</isOutputAsSelection>
<outputAdaptorClassName>com.sodbase.outputadaptor.PrintEventOutputAdaptor</outputAdaptorClassName>
<adaptorParams>false</adaptorParams>
<adaptorParams>true</adaptorParams>
<isExternal>false</isExternal>
<queryName>britork-actuator</queryName>
</outputAdaptors>
<outputAdaptors>
<isOutputAsSelection>true</isOutputAsSelection>
<outputAdaptorClassName>com.sodbase.outputadaptor.kafka.KafkaOutputAdaptor</outputAdaptorClassName>
<adaptorParams>test2</adaptorParams>
<adaptorParams>localhost:9092</adaptorParams>
<isExternal>false</isExternal>
<queryName>britork-actuator</queryName>
</outputAdaptors>
<modelName>britork-actuator</modelName>
<modelVersion>1.0</modelVersion>
<modelDescription></modelDescription>
</GraphModelData>
參考:
SODBASE CEP學習進階篇(七):SODBASE CEP與Spark streaming整合
SODBASE CEP用於輕鬆、高效實施資料監測、監控類、實時交易類專案快取擴充套件參見。