1. 程式人生 > >企業安全建設之搭建開源SIEM平臺

企業安全建設之搭建開源SIEM平臺

流式處理 one 形式 blob 庫服務器 foreach 1.0 \n 安裝

https://www.freebuf.com/special/127172.html

https://www.freebuf.com/special/127264.html

https://www.freebuf.com/articles/network/127988.html

前言

SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹下如何使用開源軟件搭建企業的SIEM系統,數據深度分析在下篇。

SIEM的發展

對比Gartner2009年和2016年的全球SIEM廠商排名,可以清楚看出,基於大數據架構的廠商Splunk迅速崛起,傳統四強依托完整的安全產品線和成熟市場渠道,依然占據領導者象限,其他較小的廠商逐漸離開領導者象限。最重要的存儲架構也由盤櫃(可選)+商業數據庫逐漸轉變為可橫向擴展的大數據架構,支持雲環境也成為趨勢。

技術分享圖片

技術分享圖片

開源SIEM領域,比較典型的就是ossim和Opensoc,ossim存儲架構是mysql,支持多種日誌格式,包括鼎鼎大名的Snort、Nmap、 Nessus以及Ntop等,對於數據規模不大的情況是個不錯的選擇,新版界面很酷炫。

技術分享圖片

完整的SIEM至少會包括以下功能:

  • 漏洞管理
  • 資產發現
  • 入侵檢測
  • 行為分析
  • 日誌存儲、檢索
  • 報警管理
  • 酷炫報表

其中最核心的我認為是入侵檢測、行為分析和日誌存儲檢索,本文重點集中討論支撐上面三個功能的技術架構。

Opensoc簡介

Opensoc是思科2014年在BroCon大會上公布的開源項目,但是沒有真正開源其源代碼,只是發布了其技術框架。我們參考了Opensoc發布的架構,結合公司實際落地了一套方案。Opensoc完全基於開源的大數據框架kafka、storm、spark和es等,天生具有強大的橫向擴展能力,本文重點講解的也是基於Opensoc的siem搭建。

技術分享圖片

上圖是Opensoc給出的框架,初次看非常費解,我們以數據存儲與數據處理兩個緯度來細化,以常見的linux服務器ssh登錄日誌搜集為例。

數據搜集緯度

數據搜集緯度需求是搜集原始數據,存儲,提供用戶交互式檢索的UI接口,典型場景就是出現安全事件後,通過檢索日誌回溯攻擊行為,定損。

技術分享圖片

logtash其實可以直接把數據寫es,但是考慮到storm也要數據處理,所以把數據切分放到logstash,切分後的數據發送kafka,提供給storm處理和logstash寫入es。數據檢索可以直接使用kibana,非常方便。數據切分也可以在storm裏面完成。這個就是大名鼎鼎的ELK架構。es比較適合存儲較短時間的熱數據的實時檢索查詢,對於需要長期存儲,並且希望使用hadoop或者spark進行大時間跨度的離線分析時,還需要存儲到hdfs上,所以比較常見的數據流程圖為:

技術分享圖片

數據處理緯度

這裏以數據實時流式處理為例,storm從kafka中訂閱切分過的ssh登錄日誌,匹配檢測規則,檢測結果的寫入mysql或者es。

技術分享圖片

在這個例子中,孤立看一條登錄日誌難以識別安全問題,最多識別非跳板機登錄,真正運行還需要參考知識庫中的常見登錄IP、時間、IP情報等以及臨時存儲處理狀態的狀態庫中最近該IP的登錄成功與失敗情況。比較接近實際運行情況的流程如下:

技術分享圖片

具體判斷邏輯舉例如下,實際中使用大量代理IP同時暴力破解,打一槍換一個地方那種無法覆蓋,這裏只是個舉例:

技術分享圖片

擴展數據源

生產環境中,處理安全事件,分析入侵行為,只有ssh登錄日誌肯定是不夠,我們需要盡可能多的搜集數據源,以下作為參考:

  • linux/window系統安全日誌/操作日誌
  • web服務器訪問日誌
  • 數據庫SQL日誌
  • 網絡流量日誌

簡化後的系統架構如下,報警也存es主要是查看報警也可以通過kibana,人力不足界面都不用開發了:

技術分享圖片

storm拓撲

storm拓撲支持python開發,以處理SQL日誌為例子:

假設SQL日誌的格式

"Feb 16 06:32:50 "	"127.0.0.1" "root@localhost" "select * from user where id=1"

一般storm的拓撲結構

技術分享圖片

簡化後spout是通用的從kafka讀取數據的,就一個bolt處理SQL日誌,匹配規則,命中策略即輸出”alert”:”原始SQL日誌”

核心bolt代碼doSQLCheckBolt偽碼

import storm

class doSQLCheckBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
         sql = word[3]
        if re.match(規則,sql):
            storm.emit(["sqli",tup.values[0]])

doSQLCheckBolt().run()
TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("sqlLog", new kafkaSpout(), 10);        
builder.setBolt("sqliAlert", new doSQLCheckBolt(), 3)
        .shuffleGrouping("sqlLog");

拓撲提交示例

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("doSQL", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("doSQL");
cluster.shutdown();

logstash

在本文環節中,logstash的配置量甚至超過了storm的拓撲腳本開發量,下面講下比較重點的幾個點,切割日誌與檢索需求有關系,非常個性化,這裏就不展開了。

從文件讀取

input
    file {
        path => ["/var/log/*.log", "/var/log/message"]
        type => "system"
        start_position => "beginning"
    }
}

從kafka中訂閱

input {
    kafka {
        zk_connect => "localhost:2181"
        group_id => "logstash"
        topic_id => "test"
        reset_beginning => false # boolean (optional), default: false
        consumer_threads => 5  # number (optional), default: 1
        decorate_events => true # boolean (optional), default: false
        }
    }

寫kafka

output {
    kafka {
        broker_list => "localhost:9092"
        topic_id => "test"
        compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none"
    }
}

寫hdfs

output {
    hadoop_webhdfs {
        workers => 2
        server => "localhost:14000"
        user => "flume"
        path => "/user/flume/logstash/dt=%{+Y}-%{+M}-%{+d}/logstash-%{+H}.log"
        flush_size => 500
        compress => "snappy"
        idle_flush_time => 10
        retry_interval => 0.5
    }
}

寫es

output {
    elasticsearch {
        host => "localhost"
        protocol => "http"
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        index_type => "%{type}"
        workers => 5
        template_overwrite => true
    }
}

前言

SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹如何使用開源軟件離線分析數據,使用攻擊建模的方式識別攻擊行為。

回顧系統架構

技術分享圖片

以數據庫為例,通過logstash搜集mysql的查詢日誌,近實時備份到hdfs集群上,通過hadoop腳本離線分析攻擊行為。

數據庫日誌搜集

常見的數據日誌搜集方式有三種:

鏡像方式

大多數數據庫審計產品都支持這種模式,通過分析數據庫流量,解碼數據庫協議,識別SQL預計,抽取出SQL日誌

技術分享圖片

代理方式

比較典型的就是db-proxy方式,目前百度、搜狐、美團、京東等都有相關開源產品,前端通過db-proxy訪問後端的真實數據庫服務器。SQL日誌可以直接在db-proxy上搜集。

技術分享圖片

客戶端方式

通過在數據庫服務器安裝客戶端搜集SQL日誌,比較典型的方式就是通過logstash來搜集,本文以客戶端方式進行講解,其余方式本質上也是類似的。

logstash配置

安裝

下載logstash https://www.elastic.co/downloads/logstash 目前最新版本5.2.1版

開啟mysql查詢日誌

技術分享圖片

mysql查詢日誌

技術分享圖片

配置logstash

input {

file {

type => "mysql_sql_file"

path => "/var/log/mysql/mysql.log"

start_position => "beginning"

sincedb_path => "/dev/null"

}

}

output {

kafka { broker_list => "localhost:9092" topic_id => "test" compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none" }

}

運行logstash

bin/logstash -f mysql.conf

日誌舉例

2017-02-16T23:29:00.813Z localhost 170216 19:10:15 37 Connect

debian-sys-maint@localhost on

2017-02-16T23:29:00.813Z localhost 37 Quit

2017-02-16T23:29:00.813Z localhost 38 Connect debian-sys-maint@localhost on

2017-02-16T23:29:00.813Z localhost 38 Query SHOW VARIABLES LIKE ‘pid_file‘

切詞

最簡化操作是不用進行切詞,如果喜歡自動切分出數據庫名,時間等字段,請參考:

grok語法

https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns

grok語法調試

http://grokdebug.herokuapp.com/

常見攻擊特征

以常見的wavsep搭建靶場環境,請參考我的另外一篇文章《基於WAVSEP的靶場搭建指南》

使用SQL掃描鏈接

技術分享圖片

分析攻擊特征,下列列舉兩個,更多攻擊特征請大家自行總結

特征一

2017-02-16T23:29:00.993Z localhost 170216 19:19:12   46 Query SELECT username, password FROM users WHERE username=‘textvalue‘ UNION ALL SELECT NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL#‘ AND password=‘textvalue2

使用聯合查詢枚舉數據時會產生大量的NULL字段

特征二、三

枚舉數據庫結構時會使用INFORMATION_SCHEMA,另外個別掃描器會使用GROUP BY x)a)

2017-02-16T23:29:00.998Z localhost    46 Query SELECT username, password FROM users WHERE username=‘textvalue‘ AND (SELECT 7473 FROM(SELECT COUNT(*),CONCAT(0x7171716271,(SELECT (CASE WHEN (8199= 8199) THEN 1 ELSE 0 END)),0x717a627871,FLOOR(RAND(0)*2))x FROM INFORMATION_SCHEMA.PLUGINS GROUP BY x)a)-- LFpQ‘ AND password=‘textvalue2‘

hadoop離線處理

hadoop是基於map,reduce模型

技術分享圖片

簡化理解就是:

cat data.txt | ./map | ./reduce

最簡化期間,我們可以只開發map程序,在map中逐行處理日誌數據,匹配攻擊行為。

以perl腳本開發,python類似

#!/usr/bin/perl -w

my $rule="(null,){3,}|information_schema|GROUP BY x\\)a\\)";

my $line="";

while($line=<>)

{

if( $line=~/$rule/i )

{

printf($line);

}

}

在hadoop下運行即可。

生產環境

生產環境中的規則會比這復雜很多,需要你不斷補充,這裏只是舉例;

單純只編寫map會有大量的重復報警,需要開發reduce用於聚合;

應急響應時需要知道SQL註入的是那個庫,使用的是哪個賬戶,這個需要在logstash切割字段時補充;

應急響應時最好可以知道SQL註入對應的鏈接,這個需要將web的accesslog與SQL日誌關聯分析,比較成熟的方案是基於機器學習,學習出基於時間的關聯矩陣;

客戶端直接搜集SQL數據要求mysql也開啟查詢日誌,這個對服務器性能有較大影響,我知道的大型公司以db-prxoy方式接入為主,建議可以在db-proxy上搜集;

基於規則識別SQL註入存在瓶頸,雖然相對web日誌層面以及流量層面有一定進步,SQL語義成為必然之路。

後繼

前言

SIEM(security information and event management),顧名思義就是針對安全信息和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹如何使用開源軟件離線分析數據,使用算法挖掘未知攻擊行為。上集傳送門

回顧系統架構

技術分享圖片

以WEB服務器日誌為例,通過logstash搜集WEB服務器的查詢日誌,近實時備份到hdfs集群上,通過hadoop腳本離線分析攻擊行為。

自定義日誌格式

開啟httpd自定義日誌格式,記錄User-Agen以及Referer

<IfModule logio_module>
      # You need to enable mod_logio.c to use %I and %O
      LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-Agent}i\" %I %O" combinedio
    </IfModule>
    CustomLog "logs/access_log" combined

日誌舉例

180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/ HTTP/1.1" 200 17443 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21"
180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/wp-json/ HTTP/1.1" 200 51789 "-" "print `env`"
180.76.152.166
 - - [26/Feb/2017:13:12:38 +0800] "GET 
/wordpress/wp-admin/load-styles.php?c=0&dir=ltr&load[]=dashicons,buttons,forms,l10n,login&ver=Li4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vLi4vZXRjL3Bhc3N3ZAAucG5n
 HTTP/1.1" 200 35841 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21"
180.76.152.166 - - [26/Feb/2017:13:12:38 +0800] "GET /wordpress/ HTTP/1.1" 200 17442 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21"

測試環境

在wordpress目錄下添加測試代碼1.php,內容為phpinfo

技術分享圖片

針對1.php的訪問日誌

[root@instance-8lp4smgv logs]# cat access_log | grep ‘wp-admin/1.php‘
125.33.206.140
 - - [26/Feb/2017:13:09:47 +0800] "GET /wordpress/wp-admin/1.php 
HTTP/1.1" 200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) 
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 
Safari/537.36"
125.33.206.140 - - [26/Feb/2017:13:11:19 +0800] 
"GET /wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "Mozilla/5.0 
(Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like 
Gecko) Chrome/50.0.2661.102 Safari/537.36"
125.33.206.140 - - 
[26/Feb/2017:13:13:44 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 
200 17 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) 
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 
Safari/537.36"
127.0.0.1 - - [26/Feb/2017:13:14:19 +0800] "GET 
/wordpress/wp-admin/1.php HTTP/1.1" 200 17 "-" "curl/7.19.7 
(x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 
libidn/1.18 libssh2/1.4.2"
127.0.0.1 - - [26/Feb/2017:13:16:04 
+0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 200 107519 "-" 
"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 
zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
125.33.206.140 - - 
[26/Feb/2017:13:16:12 +0800] "GET /wordpress/wp-admin/1.php HTTP/1.1" 
200 27499 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) 
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 
Safari/537.36"
[root@instance-8lp4smgv logs]#

hadoop離線處理

hadoop是基於map,reduce模型

map腳本

localhost:work maidou$ cat mapper-graph.pl 
#!/usr/bin/perl -w
#180.76.152.166 - - [26/Feb/2017:13:12:37 +0800] "GET /wordpress/ HTTP/1.1" 200 17443 "http://180.76.190.79:80/" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.21 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.21"
my $line="";
while($line=<>)
{
if( $line=~/"GET (\S+) HTTP\/1.[01]" 2\d+ \d+ "(\S+)"/ )
{
my $path=$1;
my $ref=$2;
if( $path=~/(\S+)\?(\S+)/  )
{
$path=$1;
}
if( $ref=~/(\S+)\?(\S+)/  )
                {
                        $ref=$1;
                }
if( ($ref=~/^http:\/\/180/)||( "-" eq $ref  )  )
{
my $line=$ref."::".$path."\n";
#printf("$ref::$path\n");
print($line);
}
}
}

reducer腳本

localhost:work maidou$ cat reducer-graph.pl 
#!/usr/bin/perl -w
my %result;
my $line="";
while($line=<>)
{
if( $line=~/(\S+)\:\:(\S+)/ )
{
unless( exists($result{$line})  )
{
$result{$line}=1;
}
}
}
foreach $key (sort keys %result)   
{   
if( $key=~/(\S+)\:\:(\S+)/ )
        {
                my $ref=$1;
                my $path=$2;#這裏是舉例,過濾你關註的webshell文件後綴,常見的有php、jsp,白名單形式過濾存在漏報風險;也可以以黑名單形式過濾你忽略的文件類型
                if( $path=~/(\.php)$/  )
                {
                        my $output=$ref." -> ".$path."\n";
                        print($output);
                }
        }       
 
}

生成結果示例為:

- -> http://180.76.190.79/wordpress/wp-admin/1.php
- -> http://180.76.190.79/wordpress/wp-admin/admin-ajax.php
- -> http://180.76.190.79/wordpress/wp-admin/customize.php
http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-admin/edit-comments.php
http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-admin/profile.php
http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/wp-login.php
http://180.76.190.79/wordpress/ -> http://180.76.190.79/wordpress/xmlrpc.php

圖算法

講生成數據導入圖數據庫neo4j,滿足webshell特征的為:

入度出度均為0

入度出度均為1且自己指向自己

neo4j

neo4j是一個高性能的,NOSQL圖形數據庫,它將結構化數據存儲在網絡上而不是表中,因其嵌入式、高性能、輕量級等優勢,越來越受到關註。

技術分享圖片

neo4j安裝

https://neo4j.com/ 上下載安裝包安裝,默認配置即可

ne04j啟動

以我的mac為例子,通過gui啟動即可,默認密碼為ne04j/ne04j,第一次登錄會要求更改密碼

技術分享圖片

GUI管理界面

技術分享圖片

python api庫安裝

sudo pip install neo4j-driver

下載JPype

https://pypi.python.org/pypi/JPype1

安裝JPype

tar -zxvf JPype1-0.6.2.tar.gz 
cd JPype1-0.6.2
sudo python setup.py install

將數據導入圖數據庫代碼如下:

B0000000B60544:freebuf liu.yan$ cat load-graph.py 
import re
from neo4j.v1 import GraphDatabase, basic_auth
nodes={}
index=1
driver = GraphDatabase.driver("bolt://localhost:7687",auth=basic_auth("neo4j","maidou"))
session = driver.session()
file_object = open(‘r-graph.txt‘, ‘r‘)
try:
for line in file_object:
matchObj = re.match( r‘(\S+) -> (\S+)‘, line, re.M|re.I)
if matchObj:
path = matchObj.group(1);
ref = matchObj.group(2);
if path in nodes.keys():
path_node = nodes[path]
else:
path_node = "Page%d" % index
nodes[path]=path_node
sql = "create (%s:Page {url:\"%s\" , id:\"%d\",in:0,out:0})" %(path_node,path,index)
index=index+1
                        session.run(sql)
#print sql
if ref in nodes.keys():
                                ref_node = nodes[ref]
                        else:
                                ref_node = "Page%d" % index
                                nodes[ref]=ref_node
sql = "create (%s:Page {url:\"%s\",id:\"%d\",in:0,out:0})" %(ref_node,ref,index)
                                index=index+1
                                session.run(sql)
#print sql
sql = "create (%s)-[:IN]->(%s)" %(path_node,ref_node)
session.run(sql)
#print sql
sql = "match (n:Page {url:\"%s\"}) SET n.out=n.out+1" % path
                        session.run(sql)
                        #print sql
                        sql = "match (n:Page {url:\"%s\"}) SET n.in=n.in+1" % ref
                        session.run(sql)
                        #print sql
finally:
     file_object.close( )
session.close()

生成有向圖如下

技術分享圖片

技術分享圖片

查詢入度為1出度均為0的結點或者查詢入度出度均為1且指向自己的結點,由於把ref為空的情況也識別為”-”結點,所以入度為1出度均為0。

技術分享圖片

優化點

生產環境實際使用中,我們遇到誤報分為以下幾種:

主頁,各種index頁面(第一個誤報就是這種)

phpmyadmin、zabbix等運維管理後臺

hadoop、elk等開源軟件的控制臺

API接口

這些通過短期加白可以有效解決,比較麻煩的是掃描器對結果的影響(第二個誤報就是這種),這部分需要通過掃描器指紋或者使用高大上的人機算法來去掉幹擾。

後記

使用算法來挖掘未知攻擊行為是目前非常流行的一個研究方向,本文只是介紹了其中比較好理解和實現的一種算法,該算法並非我首創,不少安全公司也都或多或少有過實踐。篇幅有限,我將陸續在企業安全建設專題其他文章中由淺入深介紹其他算法。算法或者說機器學習本質是科學規律在大數據集集合上趨勢體現,所以很難做到精準報警,目前階段還是需要通過各種規則和模型來輔助,不過對於挖掘未知攻擊行為確實是一支奇兵。

企業安全建設之搭建開源SIEM平臺