1. 程式人生 > >spark高階資料分析---網路流量異常檢測(升級實戰)

spark高階資料分析---網路流量異常檢測(升級實戰)

在我的上一篇裡我寫的那個只是個人對KMeans聚類在這個專案中的一部分,今天花了很長時間寫完和完整的執行測試完這個程式碼,篇幅很長,都是結合我前面寫的加上自己完善的異常檢測部分,廢話不多說,直接程式碼實戰:

package internet

import org.apache.spark.mllib.clustering.{KMeansModel, KMeans}
import org.apache.spark.mllib.linalg.{Vectors,Vector}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, 
SparkConf} /** * Created by 汪本成 on 2016/7/24. */ object CheckAll { def main(args: Array[String]) { //建立入口物件 val conf = new SparkConf().setAppName("CheckAll").setMaster("local") val sc= new SparkContext(conf) val HDFS_DATA_PATH = "hdfs://node1:9000/user/spark/sparkLearning/cluster/kddcup.data" val
rawData = sc.textFile(HDFS_DATA_PATH) /** 分類統計樣本,降序排序 **/ // clusteringTake1(rawData) /** 評價k**/ // clusteringTake2(rawData) // clusteringTake3(rawData) // clusteringTake4(rawData) // clusteringTake5(rawData) /** R資料視覺化 **/ /** 異常檢測 **/ var beg = System.currentTimeMillis() anomalies(rawData) var
end = System.currentTimeMillis() println("用時:" + (end - beg) / 1000 + "s") } //ClusteringTask1 def clusteringTake1(rawData: RDD[String]) = { //分類統計樣本個數,降序排序 rawData.map(_.split(",").last).countByValue().toSeq.sortBy(_._2).reverse.foreach(println) val labelsAndData = rawData.map { line => //csv格式的行拆分成列,建立一個buffer,是一個可變列表 val buffer = line.split(",").toBuffer //刪除下標從1開始的三個類別型列 buffer.remove(1, 3) //刪除下標最後的標號列 val label = buffer.remove(buffer.length - 1) //保留其他值並將其轉換成一個數值型(Double型物件)陣列 val vector = Vectors.dense(buffer.map(_.toDouble).toArray) //將陣列和標號組成一個元祖 (label, vector) } /** * 為啥要進行labelsAndData => data轉化? * 1k均值在執行過程中只用到特徵向量(即沒有用到資料集的目標標號列) * 2、使data這個RDD只包含元祖的只包含元組的第二個元素 * 3、實現2可以通過元組型別RDDvalues屬性得到,在放入快取中,減少落地 */ //提取出元組的特徵向量 val data = labelsAndData.values.cache() //例項化Kmeans類物件 val kmeans = new KMeans() //建立KMeansModel val model = kmeans.run(data) //輸出每個簇的質心 model.clusterCenters.foreach(println) val clusterLabelCount = labelsAndData.map { case (label, datum) => //預測樣本datum的分類cluster val cluster = model.predict(datum) //返回類別-簇的元組 (cluster, label) }.countByValue() //對簇-類別對分別進行計數,並以可讀方式輸出 clusterLabelCount.toSeq.sorted.foreach { case ((cluster, label), count) => println(f"$cluster%1s$label%18s$count%8s") } data.unpersist() } /** * 歐氏距離公式 * a.toArray.zip(b.toArray)對應 "兩個向量相應元素" * map(p => p._1 - p._2)對應 "" * map(d => d*d).sum對應 "平方和" * math.sqrt()對應 "平方根" * @param a * @param b * @return */ def distance(a: Vector, b: Vector) = math.sqrt(a.toArray.zip(b.toArray).map(p => p._1 - p._2).map(d => d * d).sum) /** * 歐氏距離公式應用到model* KMeansModel.predict方法中呼叫了KMeans物件的findCloest方法 * @param datum * @param model * @return */ def distToCenter(datum: Vector, model: KMeansModel) = { //預測樣本datum的分類cluster val cluster = model.predict(datum) //計算質心 val center = model.clusterCenters(cluster) //應用距離公式 distance(center, datum) } /** * 平均質心距離 * @param data * @param k * @return */ def clusteringScore(data: RDD[Vector], k: Int): Double = { val kmeans = new KMeans() //設定kkmeans.setK(k) //建立KMeansModel val model = kmeans.run(data) //計算kmodel平均質心距離,mean()是平均函式 data.map(datum => distToCenter(datum, model)).mean() } /** * 平均質心距離優化 * @param data * @param k * @param run 執行次數 * @param epsilon 閾值 * @return */ def clusteringScore2(data: RDD[Vector], k: Int, run: Int, epsilon: Double): Double = { val kmeans = new KMeans() kmeans.setK(k) //設定k的執行次數 kmeans.setRuns(run) //設定閾值 kmeans.setEpsilon(epsilon) val model = kmeans.run(data) data.map(datum => distToCenter(datum, model)).mean() } //ClusteringTake2 def clusteringTake2(rawData: RDD[String]): Unit ={ val data = rawData.map { line => val buffer = line.split(",").toBuffer buffer.remove(1, 3) buffer.remove(buffer.length - 1) Vectors.dense(buffer.map(_.toDouble).toArray) }.cache() val run = 10 val epsilon = 1.0e-4 //(5,30)區間內以5為等差數列數值不同k值對其評分 (5 to 30 by 5).map(k => (k, clusteringScore(data, k))).foreach(println) //(20,120)區間內以10為等差數列數值不同k值對其評分 (30 to 100 by 10).par.map(k => (k, clusteringScore2(data, k, run, epsilon))).foreach(println) data.unpersist() } /** * 加工出R視覺化資料存入HDFS* @param rawData * @param k * @param run * @param epsilon */ def visualizationInR(rawData: RDD[String], k: Int, run: Int, epsilon: Double): Unit ={ val data = rawData.map { line => val buffer = line.split(",").toBuffer buffer.remove(1, 3) buffer.remove(buffer.length - 1) Vectors.dense(buffer.map(_.toDouble).toArray) }.cache() val kmeans = new KMeans() kmeans.setK(k) kmeans.setRuns(run) kmeans.setEpsilon(epsilon) val model = kmeans.run(data) val sample = data.map( datum => model.predict(datum) + "," + datum.toArray.mkString(",") ).sample(false, 0.05) //選擇了5%sample.saveAsTextFile("hdfs://nodel:9000/user/spark/R/sample") data.unpersist() } /** * * @param data * @return */ def buildNormalizationFunction(data: RDD[Vector]): (Vector => Vector) = { //將陣列緩衝為Array val dataAsArray = data.map(_.toArray) //資料集第一個元素的長度 val numCols = dataAsArray.first().length //返回資料集的元素個數 val n = dataAsArray.count() //兩個陣列對應元素相加求和 val sums = dataAsArray.reduce((a, b) => a.zip(b).map(t => t._1 + t._2)) //RDD聚合後進行求平方和操作 val sumSquares = dataAsArray.aggregate(new Array[Double](numCols))( (a, b) => a.zip(b).map(t => t._1 + t._2 * t._2), (a, b) => a.zip(b).map(t => t._1 + t._2) ) /** zip函式將傳進來的兩個引數中相應位置上的元素組成一個pair陣列。 * 如果其中一個引數元素比較長,那麼多餘的引數會被刪掉。 * 個人理解就是讓兩個數組裡面的元素一一對應進行某些操作 */ val stdevs = sumSquares.zip(sums).map { case (sumSq, sum) => math.sqrt(n * sumSq - sum * sum) / n } val means = sums.map(_ / n) (datum : Vector) => { val normalizedArray = (datum.toArray, means, stdevs).zipped.map( (value, mean, stdev) => if(stdev <= 0) (value- mean) else (value - mean) /stdev ) Vectors.dense(normalizedArray) } } //clusteringTask3 def clusteringTake3(rawData: RDD[String]): Unit ={ val data = rawData.map { line => val buffer = line.split(',').toBuffer buffer.remove(1, 3) buffer.remove(buffer.length - 1) Vectors.dense(buffer.map(_.toDouble).toArray) } val run = 10 val epsilon = 1.0e-4 val normalizedData = data.map(buildNormalizationFunction(data)).cache() (60 to 120 by 10).par.map( k => (k, clusteringScore2(normalizedData, k, run, epsilon)) ).toList.foreach(println) normalizedData.unpersist() } /** * 基於one-hot編碼實現類別型變數替換邏輯 * @param rawData * @return */ def buildCategoricalAndLabelFunction(rawData: RDD[String]): (String => (String, Vector)) = { val splitData = rawData.map(_.split(",")) //建立三個特徵 val protocols = splitData.map(_(1)).distinct().collect().zipWithIndex.toMap //特徵值是100 val services = splitData.map(_(2)).distinct().collect().zipWithIndex.toMap //特徵值是010 val tcpStates = splitData.map(_(3)).distinct().collect().zipWithIndex.toMap //特徵值是001 // (line: String) => { val buffer = line.split(",").toBuffer val protocol = buffer.remove(1) val service = buffer.remove(1) val tcpState = buffer.remove(1) val label = buffer.remove(buffer.length - 1) val vector = buffer.map(_.toDouble) val newProtocolFeatures = new Array[Double](protocols.size) newProtocolFeatures(protocols(protocol)) = 1.0 val newServiceFeatures = new Array[Double](services.size) newServiceFeatures(services(service)) = 1.0 val newTcpStateFeatures = new Array[Double](tcpStates.size) newTcpStateFeatures(tcpStates(tcpState)) = 1.0 vector.insertAll(1, newTcpStateFeatures) vector.insertAll(1, newServiceFeatures) vector.insertAll(1, newProtocolFeatures) (label, Vectors.dense(vector.toArray)) } } //ClusteringTask4 def clusteringTake4(rawData: RDD[String]): Unit ={ val paraseFunction = buildCategoricalAndLabelFunction(rawData) val data = rawData.map(paraseFunction).values val normalizedData = data.map(buildNormalizationFunction(data)).cache() val run = 10 val epsilon = 1.0e-4 (80 to 160 by 10).map( k=> (k, clusteringScore2(normalizedData, k, run, epsilon)) ).toList.foreach(println) normalizedData.unpersist() } //Clustering, Task5 /** * 對各個簇的熵加權平均,將結果作為聚類得分 * @param counts * @return */ def entropy(counts: Iterable[Int]) = { val values = counts.filter(_ > 0) val n: Double = values.sum values.map { v => val p = v / n -p * math.log(p) }.sum } /** * 計算熵的加權平均 * @param normalizedLabelsAndData * @param k * @param run * @param epsilon * @return */ def clusteringScore3(normalizedLabelsAndData: RDD[(String, Vector)], k: Int, run: Int, epsilon: Double) = { val kmeans = new KMeans() kmeans.setK(k) kmeans.setRuns(run) kmeans.setEpsilon(epsilon) //建立KMeansModel val model = kmeans.run(normalizedLabelsAndData.values) //對每個資料集預測簇類別 val labelAndClusters = normalizedLabelsAndData.mapValues(model.predict) //RDD[(String, Vector)] => RDD[(String, Vector)],swap Keys / Values,對換鍵和值 val clustersAndLabels = labelAndClusters.map(_.swap) //按簇提取標號集合 val labelsInCluster = clustersAndLabels.groupByKey().values //計算所有集合中有多少標籤(label),即標號的出現次數 val labelCounts = labelsInCluster.map(_.groupBy(l => l).map(_._2.size)) //通過類別大小來反映平均資訊量,即熵 val n = normalizedLabelsAndData.count() //根據簇大小計算熵的加權平均 labelCounts.map(m => m.sum * entropy(m)).sum() / n } def clusteringTake5(rawData: RDD[String]): Unit ={ val parseFunction = buildCategoricalAndLabelFunction(rawData) val labelAndData = rawData.map(parseFunction) val normalizedLabelsAndData = labelAndData.mapValues(buildNormalizationFunction(labelAndData.values)).cache() val run = 10 val epsilon = 1.0e-4 (80 to 160 by 10).map( k => (k, clusteringScore3(normalizedLabelsAndData, k, run, epsilon)) ).toList.foreach(println) normalizedLabelsAndData.unpersist() } //Detect anomalies(發現異常) def bulidAnomalyDetector(data: RDD[Vector], normalizeFunction: (Vector => Vector)): (Vector => Boolean) = { val normalizedData = data.map(normalizeFunction) normalizedData.cache() val kmeans = new KMeans() kmeans.setK(150) kmeans.setRuns(10) kmeans.setEpsilon(1.0e-6) val model = kmeans.run(normalizedData) normalizedData.unpersist() //度量新資料點到最近簇質心的距離 val distances = normalizedData.map(datum => distToCenter(datum, model)) //設定閥值為已知資料中離中心點最遠的第100個點到中心的距離 val threshold = distances.top(100).last //檢測,若超過該閥值就為異常點 (datum: Vector) => distToCenter(normalizeFunction(datum), model) > threshold } /** * 異常檢測 * @param rawData */ def anomalies(rawData: RDD[String]) = { val parseFunction = buildCategoricalAndLabelFunction(rawData) val originalAndData = rawData.map(line => (line, parseFunction(line)._2)) val data = originalAndData.values val normalizeFunction = buildNormalizationFunction(data) val anomalyDetector = bulidAnomalyDetector(data, normalizeFunction) val anomalies = originalAndData.filter { case (original, datum) => anomalyDetector(datum) }.keys //10個異常點打印出來 anomalies.take(10).foreach(println) } }
寫的有點雜,但是全部自己封裝好了。執行起來也沒問題,僅供大家參考學習,多多關注下我寫的註釋就好。

累死我了,或許是我電腦不行緣故,計算這1G資料花了這麼長時間,現在我把異常檢測部分執行結果給大家看看好了

16/07/24 22:48:18 INFO Executor: Running task 0.0 in stage 65.0 (TID 385)
16/07/24 22:48:18 INFO HadoopRDD: Input split: hdfs://node1:9000/user/spark/sparkLearning/cluster/kddcup.data:0+134217728
16/07/24 22:48:30 INFO Executor: Finished task 0.0 in stage 65.0 (TID 385). 3611 bytes result sent to driver
16/07/24 22:48:30 INFO TaskSetManager: Finished task 0.0 in stage 65.0 (TID 385) in 11049 ms on localhost (1/1)
9,tcp,telnet,SF,307,2374,0,0,1,0,0,1,0,1,0,1,3,1,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,69,4,0.03,0.04,0.01,0.75,0.00,0.00,0.00,0.00,normal.
16/07/24 22:48:30 INFO TaskSchedulerImpl: Removed TaskSet 65.0, whose tasks have all completed, from pool
16/07/24 22:48:30 INFO DAGScheduler: ResultStage 65 (take at CheckAll.scala:413) finished in 11.049 s
16/07/24 22:48:30 INFO DAGScheduler: Job 41 finished: take at CheckAll.scala:413, took 11.052917 s
0,tcp,http,S1,299,26280,0,0,0,1,0,1,0,1,0,0,0,0,0,0,0,0,15,16,0.07,0.06,0.00,0.00,1.00,0.00,0.12,231,255,1.00,0.00,0.00,0.01,0.01,0.01,0.00,0.00,normal.
0,tcp,telnet,S1,2895,14208,0,0,0,0,0,1,0,0,0,0,13,0,0,0,0,0,1,1,1.00,1.00,0.00,0.00,1.00,0.00,0.00,21,2,0.10,0.10,0.05,0.00,0.05,0.50,0.00,0.00,normal.
23,tcp,telnet,SF,104,276,0,0,0,0,5,0,0,0,0,0,0,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,2,1.00,0.00,1.00,1.00,0.00,0.00,0.00,0.00,guess_passwd.
13,tcp,telnet,SF,246,11938,0,0,0,0,4,1,0,0,0,0,2,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,89,2,0.02,0.04,0.01,0.00,0.00,0.00,0.00,0.00,normal.
12249,tcp,telnet,SF,3043,44466,0,0,0,1,0,1,13,1,0,0,12,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,61,8,0.13,0.05,0.02,0.00,0.00,0.00,0.00,0.00,normal.
60,tcp,telnet,S3,125,179,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,1,1,1.00,1.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,1.00,1.00,0.00,0.00,guess_passwd.
60,tcp,telnet,S3,126,179,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,2,2,0.50,0.50,0.50,0.50,1.00,0.00,0.00,23,23,1.00,0.00,0.04,0.00,0.09,0.09,0.91,0.91,guess_passwd.
583,tcp,telnet,SF,848,25323,0,0,0,1,0,1,107,1,1,100,1,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,1,1,1.00,0.00,1.00,0.00,0.00,0.00,0.00,0.00,normal.
11447,tcp,telnet,SF,3131,45415,0,0,0,1,0,1,0,1,0,0,15,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,100,10,0.09,0.72,0.01,0.20,0.01,0.10,0.69,0.20,normal.
用時:4602s

16/07/24 22:48:30 INFO SparkContext: Invoking stop() from shutdown hook
16/07/24 22:48:30 INFO SparkUI: Stopped Spark web UI at http://192.168.1.102:4040
16/07/24 22:48:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/07/24 22:48:30 INFO MemoryStore: MemoryStore cleared
16/07/24 22:48:30 INFO BlockManager: BlockManager stopped
16/07/24 22:48:30 INFO BlockManagerMaster: BlockManagerMaster stopped
16/07/24 22:48:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/07/24 22:48:30 INFO SparkContext: Successfully stopped SparkContext
16/07/24 22:48:30 INFO ShutdownHookManager: Shutdown hook called
16/07/24 22:48:30 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-1ab0ec11-672d-4778-9ae8-2050f44a5f91
16/07/24 22:48:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/07/24 22:48:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

Process finished with exit code 0

執行結果的十條資料我已經標紅,大家注意下,跑了我怕一個多小時時間,唉