Python第七周 學習筆記(1)
阿新 • • 發佈:2018-05-06
學習筆記日誌分析
-
業務中會生成大量的系統日誌、應用程序日誌、安全日誌等,通過對日誌的分析可以了解服務器的負載、健康狀況,可以分析客戶的分布情況、客戶的行為,甚至基於這些分析可以做出預測
-
一般采集流程
-
日誌產出 -> 采集(Logstash、Flume、Scribe)-> 存儲 -> 分析 -> 存儲(數據庫、NoSQL)-> 可視化
- 開源實時日誌分析ELK平臺
- Logstash收集日誌,並存放到ElasticSearch集群中,Kibana則從ES集群中查詢數據生成圖表,返回瀏覽器端
-
分析的前提
半結構化數據
- 日誌是半結構化數據,是有組織的,有格式的數據。可以分割成行和列,就可以當作表理解和處理,分析裏面的數據
文本分析
- 日誌是文本文件,需要依賴文件IO、字符串操作、正則表達式等技術
-
通過這些技術就能夠把日誌中需要的數據提取出來
-
目標數據形如:
123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" 124.```
- nginx、tomcat等WEB Server都會產生這樣的日誌
提取數據
一、分割
import datetime line = ‘‘‘ 123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)" ‘‘‘ CHARS = set(" \t") def makekey(line: str): start = 0 skip = False for i, c in enumerate(line): if not skip and c in ‘"[‘: start = i + 1 skip = True elif skip and c in ‘"]‘: skip = False yield line[start:i] start = i + 1 continue if skip: continue if c in CHARS: if start == i: start = i + 1 continue yield line[start:i] start = i + 1 else: if start < len(line): yield line[start:] names = (‘remote‘, ‘‘, ‘‘, ‘datetime‘, ‘request‘, ‘status‘, ‘length‘, ‘‘, ‘useragent‘) ops = (None, None, None, lambda timestr: datetime.datetime.strptime(timestr, ‘%d/%b/%Y:%H:%M:%S %z‘), lambda request: dict(zip([‘method‘, ‘url‘, ‘protocol‘], request.split())), int, int, None, None) def extract(line: str): return dict(map(lambda item: (item[0], item[2](item[1]) if item[2] is not None else item[1]), zip(names, makekey(line), ops))) print(extract(line))
二、正則表達式分割
PATTERN = r‘‘‘(?P<ip>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s"(?P<method>[^"\s]+)\s(?P<url>[^"\s]+)\s(?P<protocol>[^"\s]+)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"(?:.+)"\s"(?P<useragent>[^"]+)"‘‘‘ pattern = re.compile(PATTERN) ops = {‘datetime‘: (lambda x: datetime.datetime.strptime(x, ‘%d/%b/%Y:%H:%M:%S %z‘)), ‘status‘: int, ‘size‘: int} def extract(text): mat = pattern.match(text) return {k: ops.get(k, lambda x: x)(v) for k, v in mat.groupdict().items()}
異常處理
- 日誌中不免會出現一些不匹配的行,需要處理
- 這裏使用re.match方法,有可能匹配不上。所以要增加一個判斷
- 采用拋出異常的方式,讓調用者獲得異常並自行處理
PATTERN = r‘‘‘(?P<ip>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s"(?P<method>[^"\s]+)\s(?P<url>[^"\s]+)\s(?P<protocol>[^"\s]+)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"(?:.+)"\s"(?P<useragent>[^"]+)"‘‘‘
pattern = re.compile(PATTERN)
ops = {‘datetime‘: (lambda x: datetime.datetime.strptime(x, ‘%d/%b/%Y:%H:%M:%S %z‘)), ‘status‘: int, ‘size‘: int}
def extract(text) -> dict:
mat = pattern.match(text)
if mat:
return {k: ops.get(k, lambda x: x)(v) for k, v in mat.groupdict().items()}
else:
raise Exception(‘No match‘)
- 或者返回一個特殊值
def extract(text) -> dict:
mat = pattern.match(text)
if mat:
return {k: ops.get(k, lambda x: x)(v) for k, v in mat.groupdict().items()}
else:
return None
滑動窗口
數據載入
def load(path):
with open(path) as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
時間窗口分析
概念
- 很多數據,例如日誌,都是和時間相關的,都是按照時間順序產生的
-
產生的數據分析的時候,要按照時間求值
- interval表示每一次求值的時間間隔
- width時間窗接口寬度,指一次求值的時間窗口寬度
當width > interval
- 數據求值時會有重疊
當width = interval
- 數據求值沒有重疊
時序數據
- 運維環境中,日誌、監控等產生的數據都是與時間相關的數據,按照時間先後產生並記錄下來的數據,所以一般按照時間對數據進行分析
數據分析基本程序結構
- 無限的生成隨機數函數,產生時間相關的數據,返回時間和隨機數字典
import random
import datetime
import time
def source():
while True:
yield {‘value‘: random.randint(1, 100), ‘datetime‘: datetime.datetime.now()}
time.sleep(1)
s = source()
items = [next(s) for _ in range(3)]
def handler(iterable):
return sum(map(lambda item: item[‘value‘], iterable)) / len(iterable)
print(items)
print("{:.2f}".format(handler(items)))
窗口函數實現
- 將上面的程序擴展為window函數
import random
import datetime
import time
def source(second=1):
while True:
yield {‘value‘: random.randint(1, 100),
‘datetime‘: datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8)))}
time.sleep(second)
def window(iterator, handler, width: int, interval: int):
start = datetime.datetime.strptime(‘20170101 000000 +0800‘, ‘%Y%m%d %H%M%S %z‘)
current = datetime.datetime.strptime(‘20170101 010000 +0800‘, ‘%Y%m%d %H%M%S %z‘)
buffer = []
delta = datetime.timedelta(seconds=width - interval)
while True:
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime‘]
if (current - start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}‘.format(ret))
start = current
buffer = [x for x in buffer if x[‘datetime‘] > current - delta]
def handler(iterable):
return sum(map(lambda item: item[‘value‘], iterable)) / len(iterable)
window(source(), handler, 10, 5)
分發
生產者消費者模型
- 對於一個監控系統,需要處理很多數據,包括日誌。對其中已有數據的采集、分析。
- 被監控對象就是數據的生產者producer,數據的處理程序就是數據的消費者consumer
- 生產者消費者傳統模型
- 傳統的生產者消費者模型,生產者生產,消費者消費。但這種模型有問題
-
開發的代碼耦合高,如果生成規模擴大,不易擴展,生產和消費的速度很難匹配等。
- 解決辦法:隊列
- 作用:解耦、緩沖
- 日誌生產者往往會部署好幾個程序,日誌產生的也很多,而消費者也會有很多個程序,去提取日誌分析處理
- 數據生產是不穩定的。會造成段時間數據的潮湧,需要緩沖
- 消費者消費能力不一樣,有快有慢,消費者可以自己決定消費緩沖區中的數據
- 單機可以使用queue內建的模塊構建進程內的隊列,滿足多個線程間的生產消費需要
- 大型系統可以使用第三方消息中間件:RabbitMQ、RocketMQ、Kafka
queue模塊——隊列
-
queue.Queue(maxsize=0)
- 創建FIFO隊列,返回Queue對象
- maxsize小於等於0,隊列長度沒有限制
-
Queue.get(block=True,timeout=None)
- 從隊列中移除元素並返回這個元素
- block 阻塞,timeout 超時
- 如果block為True,是阻塞,timeout為None就是一直阻塞
- 如果block為True但是timeout有值,就阻塞到一定秒數拋出異常
- block為False,是非阻塞,timeout將被忽略,要麽成功返回一個元素,要麽拋出empty異常
-
Queue.get_nowait()
- 等價於get(False)
-
Queue.put(item,block=True,timeout=None)
- 把一個元素加入到隊列中去
- block=True,timeout=None,一直阻塞直至有空位放元素
- block=True,timeout=5,阻塞5秒就拋出Full異常
- block=True,timeout失效,立刻返回,,一直阻塞直至有空位放元素
- Queue.put_nowait(item)
- 等價於put(item,False)
分發器實現
-
生產者(數據源)生產數據,緩沖到消息隊列中
-
數據處理流程:
數據加載 -> 提取 -> 分析(滑動窗口函數)
- 處理大量數據的時候,可能需要多個消費者處理
- 需要一個分發器(調度器),把數據分發給不同的消費者處理
- 每一個消費者拿到數據後,有自己的處理函數。所以要有一種註冊機制
數據加載 -> 提取 -> 分發 -> 分析函數1&分析函數2
-
分析1和分析2可以是不同的handler、窗口寬度、間隔時間
-
暫時采用輪詢策略,一對多的副本發送,一個數據通過分發器、發送到多個消費者
-
消息隊列
- 在生產者和消費者之間使用消息隊列,那麽所有消費者可以共有一個消息隊列,或各自擁有一個消息隊列
- 公用一個消息隊列需要解決爭搶問題。每個消費者擁有一個隊列較易實現
-
註冊
- 在調度器內部記錄消費者,每一個消費者擁有自己的隊列
- 線程
- 由於一條數據會被多個不同的註冊過的handler處理,所以最好的方式就是多線程
分發器代碼實現
def dispatcher(src):
reg_handler = []
queues = []
def reg(handler, width, interval):
q = Queue()
queues.append(q)
thrd = threading.Thread(target=window, args=(q, handler, width, interval))
reg_handler.append(thrd)
def run():
for i in reg_handler:
i.start()
for item in src:
for q in queues:
q.put(item)
return reg, run
reg, run = dispatcher(load(‘test.log‘))
reg(handler, 10, 5)
run()
整合代碼
- load函數就是從日誌中提取合格的數據生成函數
- 它可以作為dispatcher函數的數據源
import re
from pathlib import Path
import datetime
import time
import threading
from queue import Queue
from user_agents import parse
PATTERN = r‘‘‘(?P<ip>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s"(?P<method>[^"\s]+)\s(?P<url>[^"\s]+)\s(?P<protocol>[^"\s]+)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"(?:.+)"\s"(?P<useragent>[^"]+)"‘‘‘
pattern = re.compile(PATTERN)
def extract(text):
ops = {‘datetime‘: (lambda x: datetime.datetime.strptime(x, ‘%d/%b/%Y:%H:%M:%S %z‘)), ‘status‘: int, ‘size‘: int,
‘useragent‘: lambda x: parse(x)}
mat = pattern.match(text)
return {k: ops.get(k, lambda x: x)(v) for k, v in mat.groupdict().items()}
def openfile(filename):
with open(filename) as f:
for text in f:
fields = extract(text)
time.sleep(2)
if fields:
yield fields
else:
continue
# producer
def load(*pathnames):
for path in pathnames:
pathname = Path(path)
if not pathname.exists():
continue
if pathname.is_file():
yield from openfile(pathname)
elif pathname.is_dir():
for filename in pathname.iterdir():
if filename.is_file():
yield from openfile(filename)
def sum_size_handler(iterable):
return sum(map(lambda x: x[‘size‘], iterable))
def status_handler(iterable):
status = {}
for dic in iterable:
key = dic[‘status‘]
status[key] = status.get(key, 0) + 1
return {k: v / len(iterable) for k, v in status.items()}
d = {}
def ua_handler(iterable):
ua_family = {}
for item in iterable:
val = item[‘useragent‘]
key = (val.browser.family, val.browser.version_string)
ua_family[key] = ua_family.get(key, 0) + 1
d[key] = d.get(key, 0) + 1
return ua_family, d
# consumer
def window(q: Queue, handler, width, interval):
st_time = datetime.datetime.strptime(‘19700101 000000 +0800‘, ‘%Y%m%d %H%M%S %z‘)
cur_time = datetime.datetime.strptime(‘19700101 010000 +0800‘, ‘%Y%m%d %H%M%S %z‘)
buffer = []
while True:
# src = next(iterable)
src = q.get()
print(src)
buffer.append(src)
cur_time = src[‘datetime‘]
if (cur_time - st_time).total_seconds() > interval:
val = handler(buffer)
st_time = cur_time
b, d = val
d = sorted(d.items(), key=lambda x: x[1], reverse=True)
print(val)
print(d)
buffer = [x for x in buffer if x[‘datetime‘] > (cur_time - datetime.timedelta(seconds=width - interval))]
def dispatcher(src):
reg_handler = []
queues = []
def reg(handler, width, interval):
q = Queue()
queues.append(q)
thrd = threading.Thread(target=window, args=(q, handler, width, interval))
reg_handler.append(thrd)
def run():
for i in reg_handler:
i.start()
for item in src:
for q in queues:
q.put(item)
return reg, run
if __name__ == ‘__main__‘:
import sys
# path=sys.argv[1]
path = ‘test.log‘
reg, run = dispatcher(load(‘test.log‘))
# reg(sum_size_handler, 20, 5)
# reg(status_handler, 20, 5)
reg(ua_handler, 20, 5)
run()
完成分析功能
- 分析日誌很重要,通過海量數據分析就能夠知道是否遭受了攻擊,是否被爬取及爬取高峰期,是否有盜鏈等
狀態碼分析
- 狀態碼中包含了很多信息。例如
- 304,服務器收到客戶端提交的請求參數,發現資源未變化,要求瀏覽器使用靜態資源的緩存
- 404,服務器找不到大請求的資源
- 304占比大,說明靜態緩存效果明顯。404占比大,說明網站出現了錯誤鏈接,或者嘗試嗅探網站資源
- 如果400、500占比突然增大,網站一定出了問題。
def status_handler(iterable):
status = {}
for dic in iterable:
key = dic[‘status‘]
status[key] = status.get(key, 0) + 1
return {k: v / len(iterable) for k, v in status.items()}
瀏覽器分析
useragent
- 這裏指的是,軟件按照一定的格式向遠端的服務器提供一個表示自己的字符串
- 在HTTP協議中,使用useragent字段傳送這個字符串
瀏覽器選項中可以修改此設置
信息提取
安裝
pip install pyyaml ua-parser user-agents
數據分析
d = {}
def ua_handler(iterable):
ua_family = {}
for item in iterable:
val = item[‘useragent‘]
key = (val.browser.family, val.browser.version_string)
ua_family[key] = ua_family.get(key, 0) + 1
d[key] = d.get(key, 0) + 1
return ua_family, d
Python第七周 學習筆記(1)