1. 程式人生 > >K-臨近演算法介紹和實踐

K-臨近演算法介紹和實踐

K-臨近演算法概述

KNN演算法(K-Nearest Neighbors)演算法是機器學習領域廣泛使用的分類演算法之一,所謂KNN,說的就是每個樣本的分類都可以用它最接近的K個鄰居來代表。

KNN演算法的基本思路是:

1、給定一個訓練集資料,每個訓練集資料都是已經分好類的。

2、設定一個初始的測試資料a,計算a到訓練集所有資料的歐幾里得距離,並排序。

3、選出訓練集中離a距離最近的K個訓練集資料。

4、比較k個訓練集資料,選出裡面出現最多的分類型別,此分類型別即為最終測試資料a的分類。

Python中的KNN相關API

在python中,我們可以很容易使用sklearn中的neighbors module來建立我們的KNN模型。

API文件中,neighbors模組中的方法如下:

接下來我們用一個簡單的例子只管闡述一下KNN演算法的實現。

K臨近演算法的hello world

首先我們要建立二維空間上的一些點用於我們測試:

from sklearn.neighbors import NearestNeighbors
import numpy as np
X = np.array([[-1, -1], [-2, -1], [-3, -2], [1, 1], [2, 1], [3, 2]])

由以上程式碼可以看到,我們在二維空間中建立了6個測試點用於我們的實驗,接下來我們便是要使用KNN演算法,訓練出一個合適我們這六個點的一個分類模型。

#NearestNeighbors用到的引數解釋 
#n_neighbors=5,預設值為5,表示查詢k個最近鄰的數目 
#algorithm='auto',指定用於計算最近鄰的演算法,auto表示試圖採用最適合的演算法計算最近鄰 
#fit(X)表示用X來訓練演算法 
nbrs = NearestNeighbors(n_neighbors=2, algorithm="ball_tree").fit(X) 

這樣,我們最簡單的一個KNN的分類器就完成了,接下來看看效果。

#返回的indices是距離該點較近的k個點的下標,distance則是距離
distances, indices = nbrs.kneighbors(X)

結果如下圖所示:

第一個矩陣是distances矩陣,第二個則表示k個距離該點較近的點的座標。

還可以輸入以下程式碼視覺化結果:

#輸出的是求解n個最近鄰點後的矩陣圖,1表示是最近點,0表示不是最近點  
print nbrs.kneighbors_graph(X).toarray() 

使用KNN演算法用於監督學習的時候也是非常簡單的:

如下圖所示:

使用K近鄰演算法檢測異常操作

在黑客入侵Web伺服器之後,通常會使用一些系統漏洞進行提權,獲得伺服器的root許可權,在接下來的工作中,我們通過蒐集Linux伺服器的bash操作日誌,通過訓練識別出特定使用者的使用習慣,然後進一步識別出異常操作。

資料情況大概如下:

訓練資料中包含50個使用者的操作日誌,每個日誌包含了15000條操作命令,其中前5000條為正常操作,後面10000條隨機包含有異常操作。為了方便訓練,我們將100個命令視為一個操作序列,在label.txt中是對應每個使用者的後100個操作序列中是否異常的標籤。正常為0,異常為1。

原始碼如下:

# -*- coding:utf-8 -*-
import sys
import urllib
import re
from hmmlearn import hmm
import numpy as np
from sklearn.externals import joblib
import nltk
import matplotlib.pyplot as plt
from nltk.probability import FreqDist
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report
from sklearn import metrics
#測試樣本數
N=90
def load_user_cmd(filename):
    cmd_list=[]
    dist_max=[]
    dist_min=[]
    dist=[]
    with open(filename) as f:
        i=0
        x=[]
        for line in f:
            line=line.strip('\n')
            x.append(line)
            dist.append(line)
            i+=1
            #每100條命令組成一個操作序列
            if i == 100:
                cmd_list.append(x)
                x=[]
                i=0
    #統計最頻繁使用的前50個命令和最少使用的50個命令
    fdist =list(FreqDist(dist).keys())
    dist_max=set(fdist[0:50])
    dist_min = set(fdist[-50:])
    return cmd_list,dist_max,dist_min

#特徵化使用者使用習慣
def get_user_cmd_feature(user_cmd_list,dist_max,dist_min):
    user_cmd_feature=[]
    for cmd_block in user_cmd_list:
        #以100個命令為統計單元,作為一個操作序列,去重後的操作命令個數作為特徵
        #將list轉為set去重
        f1=len(set(cmd_block))
        #FreqDist轉為統計字典轉化為命令:出現次數的形式
        fdist = list(FreqDist(cmd_block).keys())
        #最頻繁使用的10個命令
        f2=fdist[0:10]
        #最少使用的10個命令
        f3 = fdist[-10:]
        f2 = len(set(f2) & set(dist_max))
        f3 = len(set(f3) & set(dist_min))
        #返回該統計單元中和總的統計的最頻繁使用的前50個命令和最不常使用的50個命令的重合程度
        #f1:統計單元中出現的命令型別數量
        #f2:統計單元中最常使用的10個命令和總的最常使用的命令的重合程度
        #f3:統計單元中最不常使用的10個命令和總的最不常使用的命令的重合程度
        x=[f1,f2,f3]
        user_cmd_feature.append(x)
    return user_cmd_feature

def get_label(filename,index=0):
    x=[]
    with open(filename) as f:
        for line in f:
            line=line.strip('\n')
            x.append( int(line.split()[index]))
    return x

if __name__ == '__main__':    
    user_cmd_list,user_cmd_dist_max,user_cmd_dist_min=load_user_cmd("../data/MasqueradeDat/User3")
    user_cmd_feature=get_user_cmd_feature(user_cmd_list,user_cmd_dist_max,user_cmd_dist_min)
    #index=2 即為User3對應的label
    labels=get_label("../data/MasqueradeDat/label.txt",2)
    #前5000個記錄為正常操作 即前50個序列為正常操作
    y=[0]*50+labels
    x_train=user_cmd_feature[0:N]
    y_train=y[0:N]
    x_test=user_cmd_feature[N:150]
    y_test=y[N:150]
    neigh = KNeighborsClassifier(n_neighbors=3)
    neigh.fit(x_train, y_train)
    y_predict=neigh.predict(x_test)
    score=np.mean(y_test==y_predict)*100
    print(y_test)
    print(y_predict)
    print(score)
    print(classification_report(y_test, y_predict))
    print(metrics.confusion_matrix(y_test, y_predict))

執行一下,可以看到準確率約為83.3%,結果不是很理想。

如果我們想提高我們的驗證效果,我們需要更加全盤考慮異常操作的出現。因此接下來,我們使用全量比較,來重新對使用者的是否異常進行訓練。

在資料蒐集和清洗的時候,我們使用dict資料結構,將全部命令去重之後形成一個大型的向量空間,每個命令代表一個特徵,首先通過遍歷全部命令生成對應的詞集。因此我們重寫load_user_cmd方法,如下:

def load_user_cmd_new(filename):
    cmd_list=[]
    dist=[]
    with open(filename) as f:
        i=0
        x=[]
        for line in f:
            line=line.strip('\n')
            x.append(line)
            dist.append(line)
            i+=1
            if i == 100:
                cmd_list.append(x)
                x=[]
                i=0
    fdist = list(FreqDist(dist).keys())
    return cmd_list,fdist

得到詞集之後,將命令向量化,方便模型的訓練。因此再重寫get_user_feature方法,重新獲取使用者特徵。

def get_user_cmd_feature_new(user_cmd_list,dist):
    user_cmd_feature=[]
    for cmd_list in user_cmd_list:
        v=[0]*len(dist)
        for i in range(0,len(dist)):
            if dist[i] in cmd_list:
                v[i]+=1
        user_cmd_feature.append(v)
    return user_cmd_feature

訓練新模型和之前的方法一樣。

效果驗證的時候,使用交叉驗證,通過十次隨機取樣和驗證,提高驗證的可信度:

score=cross_val_score(neigh,user_cmd_feature,y,n_jobs = -1,cv=10)

最後可以得到一個準確率約在93%左右的較好的模型。