1. 程式人生 > >基於時間序列的異常檢測系統的實現思路之一

基於時間序列的異常檢測系統的實現思路之一

技術方案:Spark、kafka、opentsdb、Yahoo的egads
模型靜態訓練:採用兩種演算法進行模型的訓練:指數移動平均和HotWinters,模型一天訓練一次,即每天0點開始訓練,每天凌晨0:5分根據訓練好的模型進行異常檢測,具體包括點的預測以及點的異常檢測;
模型實時訓練:HotWinters根據3個指標進行預測,其中兩個可以進行靜態的訓練,另外一個指標在進行異常檢測之後要 對模型進行實時的訓練;對模型進行實時訓練就要求每個批次的資料訓練的結果做一個有狀態的儲存,經過調查採用mapWithState運算元進行實現;


模型的訓練流程為  根據 配置的要訓練的指標從opentsdb時序資料庫中讀取資料,然後呼叫opentsdb的介面進行訓練,把訓練好的模型儲存到相對應的路徑;
模型預存流程為 根據要預測的指標通過spark streamimg從kafka實時讀取資料,讀取的點首先進行預測,然後在進行異常檢測;當模型不進行實時訓練時,模型廣播到excutor端進行優化,當需要對模型進行實時訓練時間,直接把模型包裝成RDD即可,程式碼如下ModelUpdateOnline類所示

思考過程:對模型進行實時訓練問題,由於spark的運算元與egads不相容,每個批次預測之後,再DStream.foreachRDD方法中,把RDD的資料collect到driver端,然後,再進行模型的訓練,訓練之後 再廣播到各個executor,這樣每個批次都要進行廣播,當廣播模型比較多時,網路開銷特別大;通過調研採用mapWithState來保證增量更新的狀態,優勢,不需要每批次模型被增量更新後都要儲存到redis,下一個批次再從redis讀取資料,這樣網路開銷也比較大。


優化點:1)目前這些配置檔案和訓練好的模型都在伺服器本地檔案系統中,後續把這些檔案放到hdfs上面以保證spark程式在預測時間能夠driver模式;

2)模型訓練當模型比較多時間,由於採用單執行緒  效能是一個瓶頸。

遇到的問題:1)模型太多,要同時訓練多個模型;2)動態訓練模型時,要能保證模型更新的狀態(調研後,採用mapWithState運算元)

package com.tingyun.mlpredict.done




import com.networkbench.avro.cache.ZookeeperAvroSchemaPersister
import com.networkbench.avro.serialize.AvroMessageDecoder
import com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own.MonitorWrappedMessage
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming._
import com.yahoo.egads.control.ModelAdapter
import com.yahoo.egads.data.TimeSeries


object ModelUpdateOnline{




  def main(args: Array[String]) {


    val sparkConf = new SparkConf().setAppName("StreamingAnomalyDetector")
    val ssc = new StreamingContext(sparkConf, Minutes(1))
    val sc = ssc.sparkContext
    ssc.checkpoint("E:\\tmp")


    //val mAdapter = Egads.loadModel("E:\\andy_ty\\work\\ml_egads\\anomolydetection\\src\\main\\resources\\mem\\2017-08-30_127082_2897_TripleExponentialSmoothingModel")
    //val initialRDD = ssc.sparkContext.parallelize(List[(String, ModelAdapter)](("127287_-1",TestModel()),("127287_3272",mAdapter),("127116_-1",mAdapter),("126887_2552",mAdapter),("127082_2897",mAdapter)))
    val initialRDD = List[(String,TestModel)](("127287_-1",TestModel(Seq[MonitorWrappedMessage]())),("127287_3272",TestModel(Seq[MonitorWrappedMessage]())),("127116_-1",TestModel(Seq[MonitorWrappedMessage]())),("126887_2552",TestModel(Seq[MonitorWrappedMessage]())))
    var initialRddBC = sc.broadcast(initialRDD)
    val numThreads = "2"
    val topics = "alarm-detect-streaming"
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val kafkaParams = Map[String, String]("zookeeper.connect" -> "10.194.1.2:2181,10.194.1.12:2181,10.194.1.13:2181", "group.id" -> "group01","zookeeper.connection.timeout.ms" -> "10000")
    val monitorWrappedMessage1 =  KafkaUtils.createStream[String,  Array[Byte], StringDecoder, DefaultDecoder](
      ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2).mapPartitions( partitions => {
      val zookeeperAvroSchemaPersister = new ZookeeperAvroSchemaPersister
      zookeeperAvroSchemaPersister.setServers("10.194.1.2:2181")
      zookeeperAvroSchemaPersister.setConnectionTimeout(10000)
      zookeeperAvroSchemaPersister.init()
      val avroMessageDecoder = new AvroMessageDecoder
      avroMessageDecoder.setAvroMessageEntityPackageToScan("com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own")
      avroMessageDecoder.setAvroSchemaPersister(zookeeperAvroSchemaPersister)
      val mWMessage = partitions.map(line => avroMessageDecoder.decode(line._2).asInstanceOf[MonitorWrappedMessage]).toList
      zookeeperAvroSchemaPersister.destroy()  //  關閉zk連結
      mWMessage.toIterator
    })
    monitorWrappedMessage1.print(100)
   val monitorWrappedMessage = monitorWrappedMessage1.map(mmm => (mmm.getApplicationId + "_" + mmm.getApplicationInstanceId,mmm))
   /* val params = Map("bootstrap.servers" -> "master:9092", "group.id" -> "scala-stream-group")
    val topic = Set("test")
    val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topic)
    val word = messages.flatMap(_._2.split(" ")).map { x => (x, 1) }*/
    //自定義mappingFunction,累加單詞出現的次數並更新狀態
   def mappingFuncDemo(word: String, monitorWrappedMessage: Option[MonitorWrappedMessage], state: State[ModelAdapter]):Option[(String,ModelAdapter)] =  {
      /*state.get.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getTimestamp,1f))
      state.update(state.get())*/
     /* val preMA = state.getOption().getOrElse(new ModelAdapter())
      preMA.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f))
      state.update(preMA)
      val output = (word, preMA)
      Some(output)*/
     val preMA = state.getOption()
      var ma = new ModelAdapter()
      preMA match{
        case Some(modelAdapter) =>{ println(modelAdapter.firstTimeStamp + "==111=" + monitorWrappedMessage.get.getApplicationId);
          ma = preMA.get
                 }
        case _ =>{
          println( "=222==" + monitorWrappedMessage.get.getApplicationId);
        }
      }
      /*preMA.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f))
      state.update(preMA)*/
      val output = (word, ma)
      ma.update(new TimeSeries.DataSequence(monitorWrappedMessage.get.getApplicationId,1f))
      Some(output)
    }
    //word來自於DStream中的key,monitorWrappedMessage來自於DStream中的value,state引數來自於initialState初始化的RDD,當不初始化則來自於  建立的預設空置(即val existingEvents: Seq[MonitorWrappedMessage] = state.getOption().map(_.monitorWrappedMessages.getOrElse(Seq[MonitorWrappedMessage]()))
    //initialState初始化的RDD為prevRDD,當前批次為currentRDD;當沒有通過initialState初始化的RDD時,則prevRDD為新建立的 空物件。
    def mappingFunc(word: String, monitorWrappedMessage: Option[MonitorWrappedMessage], state: State[TestModel]):Option[(String,TestModel)] =  {
      val preMA = state.getOption()
      //var ma = new TestModel(Seq[MonitorWrappedMessage]())
      preMA match{
        case Some(testModel) =>{
          //println(monitorWrappedMessage.get.getApplicationId +"==111=="+ testModel.monitorWrappedMessages );
          val  testModelnew = TestModel(monitorWrappedMessage.get +: testModel.monitorWrappedMessages)
          state.update(testModelnew)
          Some((word,testModelnew))
        }
        case _ =>{
          //println( "=222==" + monitorWrappedMessage.get.getApplicationId);
          None;
        }
      }
     /* val existingEvents: Seq[MonitorWrappedMessage] = state.getOption().map(_.monitorWrappedMessages)
          .getOrElse(Seq[MonitorWrappedMessage]())  當沒有初始化RDD時則建立預設值*/


     /* val testModel = TestModel(monitorWrappedMessage.get +: existingEvents)
      state.update(testModel)*/
      //Some((word,ma))
    }
    // 當initialState 初始化,第一個批次會從 這個例項好的rdd  對應的   map中根據key(就是word,來自於  上一個DSTream中的key)取值,並執行  mappingFunc 中的業務邏輯;
    // 當沒有通過initialState 初始化,在建立時間要新增
    //呼叫mapWithState進行管理流資料的狀態
    val stateDstream = monitorWrappedMessage.mapWithState(StateSpec.function(mappingFunc _).initialState(sc.parallelize(initialRDD)).timeout(Minutes(5))).map(
       ll => {ll match {
         case Some(test) =>{test._1 +"===33333=="+ test._2.monitorWrappedMessages}
         case _ => {"======NODATA======="}
       }
       }
    ).print()
    ssc.start()
    ssc.awaitTermination()
  }


}