1. 程式人生 > >《web安全之機器學習入門》第5章K近鄰演算法讀書筆記【上】

《web安全之機器學習入門》第5章K近鄰演算法讀書筆記【上】

K近鄰演算法的思路:如果一個樣本在空間上最近的K鄰居大多數都屬於M類,則該樣本屬於M類。在本章中,使用K近鄰演算法識別使用者操作序列中的異常命令。

分析資料集url:http://www.schonlau.net/

資料集說明:

50個使用者的linux操作日誌

以User開頭的檔案為使用者命令,總共有50個使用者,每個檔案記錄了使用者的15000條命令;其中前5000條是正常操作,而後10000條則包含部分異常操作

label.txt是一個100行,50列的檔案,每一列代表一個使用者,而每一行則代表了對於每100條命令的標註(異常命令只會出現在50001~15000共10000行內,100行內只要出現一次異常操作,則認為異常)

方法一:

#對於每100個操作序列,選取以下特徵
#特徵1:不重複命令個數
#特徵2:操作最頻繁的前10個命令,與資料集內操作最頻繁的前50個命令,計算重合度

#特徵3:操作最不頻繁的前10個命令,與資料集內操作最不頻繁的前50個命令,計算重合度

程式碼如下:

#coding:utf-8
import os
import numpy as np
from sklearn.neighbors import KNeighborsClassifier

#對於每100個操作序列,選取以下特徵
#特徵1:不重複命令個數
#特徵2:操作最頻繁的前10個命令,與資料集內操作最頻繁的前50個命令,計算重合度
#特徵3:操作最不頻繁的前10個命令,與資料集內操作最不頻繁的前50個命令,計算重合度

DATAPATH = os.path.normpath(os.path.dirname(os.path.abspath(__file__)) + "/data") 

def parse_data():
    rtn = list()

    for i in range(1, 51):
        FULLPATH = DATAPATH + "/User" + str(i)
        curdic = dict()
        with open(FULLPATH, "r") as f:
            for line in f.readlines():
                line = line.strip()
                if line in curdic.keys():
                    curdic[line] = curdic[line] + 1
                else:
                    curdic[line] = 1
        clist = sorted(curdic.items(), key=lambda x:x[1], reverse=True)
        rtn.append(clist)

    return rtn

def parse_all_data(ls):
    curdic = dict()
    for item in ls:
        for citem in item:
            if citem[0] not in curdic.keys():
                curdic[citem[0]] = 1
            curdic[citem[0]] = curdic[citem[0]] + citem[1]
    rtn = sorted(curdic.items(), key=lambda x:x[1], reverse=True)
    return rtn 

def parse_user_data():
    rtn = list()

    for i in range(1, 51):
        FULLPATH = DATAPATH + "/User" + str(i)
        curlist = list()
        with open(FULLPATH, "r") as f:
            for line in f.readlines():
                curlist.append(line.strip())
            rtn.append(curlist)

    return rtn

def parse_label_data():
    ls = list()
    for i in range(0,50):
        ls.append(list())
    FULLPATH = DATAPATH + "/label.txt"
    with open(FULLPATH, "r") as f:
        for line in f.readlines():
            lines = line.split()
            for i in range(0, 50):
                ls[i].append(int(lines[i]))

    return ls

if __name__ == '__main__':
    data = parse_data()
    all_data = parse_all_data(data)
    top_cmd_50 = [item[0] for item in all_data[0:50]]
    last_cmd_50 = [item[0] for item in all_data[-50:]]

    all_user_data = parse_user_data()
    all_label_data = parse_label_data()

    #使用使用者3的資料進行訓練和測試
    for m in range(0, 50):
        testdata = all_user_data[m]
        train_data = list()
        for i in range(0,15000,100):
            cmds = testdata[i:i+100]
            feature1 = len(set(cmds))
            cmd_dict = dict()
            for cmd in cmds:
                if cmd not in cmd_dict.keys():
                    cmd_dict[cmd] = 1
                else:
                    cmd_dict[cmd] = cmd_dict[cmd] + 1
            cmd_list = sorted(cmd_dict.items(), key=lambda x:x[1], reverse=True)
            top_cmd_10 = [item[0] for item in cmd_list[0:10]]
            last_cmd_10 = [item[0] for item in cmd_list[-10:]]
            feature2 = len(set(top_cmd_10)&set(top_cmd_50))
            feature3 = len(set(last_cmd_10)&set(last_cmd_50))
            train_data.append([feature1, feature2, feature3])

        #標籤
        label_data = [0]*50 + all_label_data[m]

        #使用前120個訓練KNN模型,使用後30個進行驗證
        model = KNeighborsClassifier(n_neighbors = 3)
        model.fit(train_data[0:120], label_data[0:120])
        test_result = model.predict(train_data[-30:])
        print "user:", m+1, "precision:", np.mean(test_result==label_data[-30:])*100
        print "predict", test_result
        print "correct result", label_data[-30:]

執行效果如下:


方法二:

在50個命令檔案中,去出所有命令(不重複)組成詞集。然後對於每100個操作序列,根據它們在詞集向量空間上的分佈情況得到特徵。

程式碼如下:

#coding:utf-8
import os
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import cross_val_score

DATAPATH = os.path.normpath(os.path.dirname(os.path.abspath(__file__)) + "/data")

#得到命令詞集
def parse_word_dict():
    words = set()
    for i in range(1, 51):
        FULLPATH = DATAPATH + "/User" + str(i)
        with open(FULLPATH, "r") as f:
            for line in f.readlines():
                words.add(line.strip())
    return list(words)

def parse_all_data(words):
    ls = list()
    words_len = len(words)
    for i in range(1, 51):
        FULLPATH = DATAPATH + "/User" + str(i)
        cwordict = dict()
        with open(FULLPATH, "r") as f:
            cmds = list()
            for line in f.readlines():
                line = line.strip()
                cmds.append(line)
        
        for j in range(0, 15000, 100):
            start = j
            end = j+100
            #每100個命令組成詞集向量
            clist = [0]*words_len
            for m in range(start,end):
                for n in range(0, words_len):
                    if cmds[m] == words[n]:
                        clist[n] = 1
                        break
            ls.append(clist)

    return ls

def parse_label_data():
    ls = list()
    for i in range(0,50):
        ls.append(list())
    FULLPATH = DATAPATH + "/label.txt"
    with open(FULLPATH, "r") as f:
        for line in f.readlines():
            lines = line.split()
            for i in range(0,50):
                ls[i].append(lines[i])
    rtnls = list()
    for line in ls:
        rtnls.extend([0]*50+line)

    return rtnls

if __name__ == '__main__':
    words = parse_word_dict()
    test_data = parse_all_data(words)
    label_data = parse_label_data()

    neigh = KNeighborsClassifier(n_neighbors = 3)
    #10輪交叉驗證
    scores = cross_val_score(neigh, test_data, label_data, cv=10)
    print scores
    print "precision:",np.mean(scores)*100

10輪交叉驗證的準確度如下: