前言
SIEM(security information and event management),顧名思義就是針對安全資訊和事件的管理系統,針對大多數企業是不便宜的安全系統,本文結合作者的經驗介紹下如何使用開源軟件搭建企業的SIEM系統,數據深度分析在下篇。
SIEM的發展
對比Gartner2009年和2016年的全球SIEM廠商排名,可以清楚看出,基於大數據架構的廠商Splunk迅速崛起,傳統四强依託完整的安全產品線和成熟市場通路,依然佔據領導者象限,其他較小的廠商逐漸離開領導者象限。最重要的存儲架構也由盤櫃(可選)+商業資料庫逐漸轉變為可橫向擴展的大數據架構,支持雲環境也成為趨勢。
開源SIEM領域,比較典型的就是ossim和Opensoc,ossim存儲架構是mysql,支持多種日誌格式,包括鼎鼎大名的Snort、Nmap、Nessus以及Ntop等,對於數據規模不大的情况是個不錯的選擇,新版介面很酷炫。
完整的SIEM至少會包括以下功能:
- 漏洞管理
漏洞管理
- 資產發現
資產發現
- 入侵偵測
入侵偵測
- 行為分析
行為分析
- 日誌存儲、檢索
日誌存儲、檢索
- 報警管理
報警管理
- 酷炫報表
酷炫報表
其中最覈心的我認為是入侵偵測、行為分析和日誌存儲檢索,本文重點集中討論支撐上面三個功能的科技架構。
Opensoc簡介
Opensoc是思科2014年在BroCon大會上公佈的開源項目,但是沒有真正開源其原始程式碼,只是發佈了其科技框架。我們參攷了Opensoc發佈的架構,結合公司實際落地了一套方案。Opensoc完全基於開源的大數據框架kafka、storm、spark和es等,天生具有强大的橫向擴展能力,本文重點講解的也是基於Opensoc的siem搭建。
上圖是Opensoc給出的框架,初次看非常費解,我們以資料存儲與資料處理兩個緯度來細化,以常見的linux服務器ssh登入日誌蒐集為例。
數據蒐集緯度
數據蒐集緯度需求是蒐集原始數據,存儲,提供用戶互動式檢索的UI介面,典型場景就是出現安全事件後,通過檢索日誌回溯攻擊行為,定損。
logtash其實可以直接把數據寫es,但是考慮到storm也要資料處理,所以把數據切分放到logstash,切分後的數據發送kafka,提供給storm處理和logstash寫入es。數據檢索可以直接使用kibana,非常方便。數據切分也可以在storm裡面完成。這個就是大名鼎鼎的ELK架構。es比較適合存儲較短時間的熱數據的實时檢索査詢,對於需要長期存儲,並且希望使用hadoop或者spark進行大時間跨度的離線分析時,還需要存儲到hdfs上,所以比較常見的資料流程圖為:
資料處理緯度
這裡以數據實时流式處理為例,storm從kafka中訂閱切分過的ssh登入日誌,匹配檢測規則,檢測結果的寫入mysql或者es。
在這個例子中,孤立看一條登入日誌難以識別安全問題,最多識別非跳板機登入,真正運行還需要參攷知識庫中的常見登入IP、時間、IP情報等以及臨時存儲處理狀態的狀態庫中最近該IP的登入成功與失敗情况。比較接近實際運行情况的流程如下:
具體判斷邏輯舉例如下,實際中使用大量代理IP同時暴力破解,打一槍換一個地方那種無法覆蓋,這裡只是個舉例:
storm拓撲
storm拓撲支持python開發,以處理SQL日誌為例子:
假設SQL日誌的格式為
“Feb 16 06:32:50”“127.0.0.1”“[email protected]”“select * from user where id=1”
一般storm的拓撲結構為:
簡化後spout是通用的從kafka讀取數據的,就一個bolt處理SQL日誌,匹配規則,命中策略即輸出“alert”:“原始SQL日誌”
覈心bolt程式碼doSQLCheckBolt偽碼為:
import stormclass
SplitSentenceBolt(storm.BasicBolt):
def process(self,tup):
words = tup.values[0].split(“”)
sql = word[3]
if re.match(規則,sql):
storm.emit([“sqli”,tup.values[0]])
SplitSentenceBolt().run()
logstash
在本文環節中,logstash的配寘量甚至超過了storm的拓撲腳本開發量,這裡講下比較重點的幾個點:
從檔案讀取
input file {
path => [“/var/log/*.log”,“/var/log/message”] type =>“system”
start_position =>“beginning” }}
從kafka中訂閱
input { kafka {
zk_connect =>“localhost:2181”
group_id =>“logstash”
topic_id =>“test”
reset_beginning => false # boolean(optional),default: false consumer_threads => 5 # number(optional),default: 1 decorate_events => true # boolean(optional),default: false } }
寫kafka
output { kafka {
broker_list =>“localhost:9092”
topic_id =>“test”
compression_codec =>“snappy”# string(optional),
one of [“none”,“gzip”,“snappy”],default:“none” }}
寫hdfs
output { hadoop_webhdfs {
workers => 2
server =>“your.nameno.de:14000”
user =>“flume”
path =>“/user/flume/logstash/dt=%{+Y}-%{+M}-%{+d}/logstash-%{+H}.log”
flush_size => 500
compress =>“snappy”
idle_flush_time => 10
retry_interval => 0.5 }}
寫es
output {
elasticsearch {
host =>“192.168.0.2”
protocol =>“http”
index =>“logstash-%{type}-%{+YYYY.MM.dd}” index_type =>“%{type}”
workers => 5
template_overwrite => true }}
擴展資料來源
生產環境中,處理安全事件,分析入侵行為,只有ssh登入日誌肯定是不够,我們需要盡可能多的蒐集資料來源,以下作為參攷:
- linux/window系統安全日誌/操作日誌
linux/window系統安全日誌/操作日誌
- web服務器訪問日誌
web服務器訪問日誌
- 資料庫SQL日誌
資料庫SQL日誌
- 網路流量日誌
網路流量日誌
簡化後的系統架構如下:
後記
如何在離線數據中,通過行為分析和攻擊建模識別出深入的入侵行為呢?請看下篇。