1. 程式人生 > >Python第七周 學習筆記(1)

Python第七周 學習筆記(1)

學習筆記

日誌分析
  • 業務中會生成大量的系統日誌、應用程序日誌、安全日誌等,通過對日誌的分析可以了解服務器的負載、健康狀況,可以分析客戶的分布情況、客戶的行為,甚至基於這些分析可以做出預測

  • 一般采集流程

    • 日誌產出 -> 采集(Logstash、Flume、Scribe)-> 存儲 -> 分析 -> 存儲(數據庫、NoSQL)-> 可視化

    • 開源實時日誌分析ELK平臺
    • Logstash收集日誌,並存放到ElasticSearch集群中,Kibana則從ES集群中查詢數據生成圖表,返回瀏覽器端

分析的前提

半結構化數據

  • 日誌是半結構化數據,是有組織的,有格式的數據。可以分割成行和列,就可以當作表理解和處理,分析裏面的數據

文本分析

  • 日誌是文本文件,需要依賴文件IO、字符串操作、正則表達式等技術
  • 通過這些技術就能夠把日誌中需要的數據提取出來

  • 目標數據形如:

    
    123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    124.```
  • nginx、tomcat等WEB Server都會產生這樣的日誌

提取數據

一、分割


import datetime

line = ‘‘‘
123.125.71.36 - - [06/Apr/2017:18:09:25 +0800] "GET / HTTP/1.1" 200 8642 "-" "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
‘‘‘

CHARS = set(" \t")

def makekey(line: str):
    start = 0
    skip = False
    for i, c in enumerate(line):
        if not skip and c in ‘"[‘:
            start = i + 1
            skip = True
        elif skip and c in ‘"]‘:
            skip = False
            yield line[start:i]
            start = i + 1
            continue

        if skip:
            continue

        if c in CHARS:
            if start == i:
                start = i + 1
                continue
            yield line[start:i]
            start = i + 1

    else:
        if start < len(line):
            yield line[start:]

names = (‘remote‘, ‘‘, ‘‘, ‘datetime‘, ‘request‘, ‘status‘, ‘length‘, ‘‘, ‘useragent‘)

ops = (None, None, None, lambda timestr: datetime.datetime.strptime(timestr, ‘%d/%b/%Y:%H:%M:%S %z‘),
       lambda request: dict(zip([‘method‘, ‘url‘, ‘protocol‘], request.split())),
       int, int, None, None)

def extract(line: str):
    return dict(map(lambda item: (item[0], item[2](item[1]) if item[2] is not None else item[1]),
                    zip(names, makekey(line), ops)))

print(extract(line))

二、正則表達式分割

PATTERN = r‘‘‘(?P<ip>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s"(?P<method>[^"\s]+)\s(?P<url>[^"\s]+)\s(?P<protocol>[^"\s]+)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"(?:.+)"\s"(?P<useragent>[^"]+)"‘‘‘
pattern = re.compile(PATTERN)
ops = {‘datetime‘: (lambda x: datetime.datetime.strptime(x, ‘%d/%b/%Y:%H:%M:%S %z‘)), ‘status‘: int, ‘size‘: int}

def extract(text):

    mat = pattern.match(text)

    return {k: ops.get(k, lambda x: x)(v) for k, v in mat.groupdict().items()}

異常處理

  • 日誌中不免會出現一些不匹配的行,需要處理
  • 這裏使用re.match方法,有可能匹配不上。所以要增加一個判斷
  • 采用拋出異常的方式,讓調用者獲得異常並自行處理

PATTERN = r‘‘‘(?P<ip>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s"(?P<method>[^"\s]+)\s(?P<url>[^"\s]+)\s(?P<protocol>[^"\s]+)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"(?:.+)"\s"(?P<useragent>[^"]+)"‘‘‘
pattern = re.compile(PATTERN)

ops = {‘datetime‘: (lambda x: datetime.datetime.strptime(x, ‘%d/%b/%Y:%H:%M:%S %z‘)), ‘status‘: int, ‘size‘: int}

def extract(text) -> dict:

    mat = pattern.match(text)

    if mat:
        return {k: ops.get(k, lambda x: x)(v) for k, v in mat.groupdict().items()}
    else:
        raise Exception(‘No match‘)
  • 或者返回一個特殊值

def extract(text) -> dict:

    mat = pattern.match(text)

    if mat:
        return {k: ops.get(k, lambda x: x)(v) for k, v in mat.groupdict().items()}
    else:
        return None

滑動窗口

數據載入

def load(path):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue

時間窗口分析

概念

  • 很多數據,例如日誌,都是和時間相關的,都是按照時間順序產生的
  • 產生的數據分析的時候,要按照時間求值

  • interval表示每一次求值的時間間隔
  • width時間窗接口寬度,指一次求值的時間窗口寬度

當width > interval

技術分享圖片

  • 數據求值時會有重疊

當width = interval

技術分享圖片

  • 數據求值沒有重疊

時序數據

  • 運維環境中,日誌、監控等產生的數據都是與時間相關的數據,按照時間先後產生並記錄下來的數據,所以一般按照時間對數據進行分析

數據分析基本程序結構

  • 無限的生成隨機數函數,產生時間相關的數據,返回時間和隨機數字典

import random
import datetime
import time

def source():
    while True:
        yield {‘value‘: random.randint(1, 100), ‘datetime‘: datetime.datetime.now()}
        time.sleep(1)

s = source()
items = [next(s) for _ in range(3)]

def handler(iterable):
    return sum(map(lambda item: item[‘value‘], iterable)) / len(iterable)

print(items)
print("{:.2f}".format(handler(items)))

窗口函數實現

  • 將上面的程序擴展為window函數

import random
import datetime
import time

def source(second=1):
    while True:
        yield {‘value‘: random.randint(1, 100),
               ‘datetime‘: datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8)))}
        time.sleep(second)

def window(iterator, handler, width: int, interval: int):
    start = datetime.datetime.strptime(‘20170101 000000 +0800‘, ‘%Y%m%d %H%M%S %z‘)
    current = datetime.datetime.strptime(‘20170101 010000 +0800‘, ‘%Y%m%d %H%M%S %z‘)

    buffer = []
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        data = next(iterator)
        if data:
            buffer.append(data)
            current = data[‘datetime‘]

        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print(‘{:.2f}‘.format(ret))
            start = current
            buffer = [x for x in buffer if x[‘datetime‘] > current - delta]

def handler(iterable):
    return sum(map(lambda item: item[‘value‘], iterable)) / len(iterable)

window(source(), handler, 10, 5)

技術分享圖片

分發

生產者消費者模型

  • 對於一個監控系統,需要處理很多數據,包括日誌。對其中已有數據的采集、分析。
  • 被監控對象就是數據的生產者producer,數據的處理程序就是數據的消費者consumer
  • 生產者消費者傳統模型
    技術分享圖片
  • 傳統的生產者消費者模型,生產者生產,消費者消費。但這種模型有問題
  • 開發的代碼耦合高,如果生成規模擴大,不易擴展,生產和消費的速度很難匹配等。

  • 解決辦法:隊列
  • 作用:解耦、緩沖
    技術分享圖片
  • 日誌生產者往往會部署好幾個程序,日誌產生的也很多,而消費者也會有很多個程序,去提取日誌分析處理
  • 數據生產是不穩定的。會造成段時間數據的潮湧,需要緩沖
  • 消費者消費能力不一樣,有快有慢,消費者可以自己決定消費緩沖區中的數據
  • 單機可以使用queue內建的模塊構建進程內的隊列,滿足多個線程間的生產消費需要
  • 大型系統可以使用第三方消息中間件:RabbitMQ、RocketMQ、Kafka

queue模塊——隊列

  • queue.Queue(maxsize=0)

    • 創建FIFO隊列,返回Queue對象
    • maxsize小於等於0,隊列長度沒有限制
  • Queue.get(block=True,timeout=None)

    • 從隊列中移除元素並返回這個元素
    • block 阻塞,timeout 超時
    • 如果block為True,是阻塞,timeout為None就是一直阻塞
    • 如果block為True但是timeout有值,就阻塞到一定秒數拋出異常
    • block為False,是非阻塞,timeout將被忽略,要麽成功返回一個元素,要麽拋出empty異常
  • Queue.get_nowait()

    • 等價於get(False)
  • Queue.put(item,block=True,timeout=None)

    • 把一個元素加入到隊列中去
    • block=True,timeout=None,一直阻塞直至有空位放元素
    • block=True,timeout=5,阻塞5秒就拋出Full異常
    • block=True,timeout失效,立刻返回,,一直阻塞直至有空位放元素
  • Queue.put_nowait(item)
    • 等價於put(item,False)

分發器實現

  • 生產者(數據源)生產數據,緩沖到消息隊列中

  • 數據處理流程:

    數據加載 -> 提取 -> 分析(滑動窗口函數)

  • 處理大量數據的時候,可能需要多個消費者處理
  • 需要一個分發器(調度器),把數據分發給不同的消費者處理
  • 每一個消費者拿到數據後,有自己的處理函數。所以要有一種註冊機制

數據加載 -> 提取 -> 分發 -> 分析函數1&分析函數2

  • 分析1和分析2可以是不同的handler、窗口寬度、間隔時間

  • 暫時采用輪詢策略,一對多的副本發送,一個數據通過分發器、發送到多個消費者

  • 消息隊列

    • 在生產者和消費者之間使用消息隊列,那麽所有消費者可以共有一個消息隊列,或各自擁有一個消息隊列
    • 公用一個消息隊列需要解決爭搶問題。每個消費者擁有一個隊列較易實現
  • 註冊

    • 在調度器內部記錄消費者,每一個消費者擁有自己的隊列
  • 線程
    • 由於一條數據會被多個不同的註冊過的handler處理,所以最好的方式就是多線程

分發器代碼實現


def dispatcher(src):
    reg_handler = []
    queues = []

    def reg(handler, width, interval):
        q = Queue()
        queues.append(q)

        thrd = threading.Thread(target=window, args=(q, handler, width, interval))

        reg_handler.append(thrd)

    def run():

        for i in reg_handler:
            i.start()

        for item in src:
            for q in queues:
                q.put(item)

    return reg, run

reg, run = dispatcher(load(‘test.log‘))

reg(handler, 10, 5)
run()

整合代碼

  • load函數就是從日誌中提取合格的數據生成函數
  • 它可以作為dispatcher函數的數據源
import re
from pathlib import Path
import datetime
import time
import threading
from queue import Queue
from user_agents import parse

PATTERN = r‘‘‘(?P<ip>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s"(?P<method>[^"\s]+)\s(?P<url>[^"\s]+)\s(?P<protocol>[^"\s]+)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"(?:.+)"\s"(?P<useragent>[^"]+)"‘‘‘
pattern = re.compile(PATTERN)

def extract(text):
    ops = {‘datetime‘: (lambda x: datetime.datetime.strptime(x, ‘%d/%b/%Y:%H:%M:%S %z‘)), ‘status‘: int, ‘size‘: int,
           ‘useragent‘: lambda x: parse(x)}

    mat = pattern.match(text)

    return {k: ops.get(k, lambda x: x)(v) for k, v in mat.groupdict().items()}

def openfile(filename):
    with open(filename) as f:
        for text in f:
            fields = extract(text)
            time.sleep(2)
            if fields:
                yield fields
            else:
                continue

# producer
def load(*pathnames):
    for path in pathnames:
        pathname = Path(path)
        if not pathname.exists():
            continue

        if pathname.is_file():
            yield from openfile(pathname)

        elif pathname.is_dir():
            for filename in pathname.iterdir():
                if filename.is_file():
                    yield from openfile(filename)

def sum_size_handler(iterable):
    return sum(map(lambda x: x[‘size‘], iterable))

def status_handler(iterable):
    status = {}
    for dic in iterable:
        key = dic[‘status‘]
        status[key] = status.get(key, 0) + 1

    return {k: v / len(iterable) for k, v in status.items()}

d = {}

def ua_handler(iterable):
    ua_family = {}
    for item in iterable:
        val = item[‘useragent‘]
        key = (val.browser.family, val.browser.version_string)
        ua_family[key] = ua_family.get(key, 0) + 1
        d[key] = d.get(key, 0) + 1
    return ua_family, d

# consumer
def window(q: Queue, handler, width, interval):
    st_time = datetime.datetime.strptime(‘19700101 000000 +0800‘, ‘%Y%m%d %H%M%S %z‘)
    cur_time = datetime.datetime.strptime(‘19700101 010000 +0800‘, ‘%Y%m%d %H%M%S %z‘)

    buffer = []

    while True:
        # src = next(iterable)
        src = q.get()
        print(src)
        buffer.append(src)

        cur_time = src[‘datetime‘]

        if (cur_time - st_time).total_seconds() > interval:
            val = handler(buffer)
            st_time = cur_time
            b, d = val
            d = sorted(d.items(), key=lambda x: x[1], reverse=True)
            print(val)
            print(d)
        buffer = [x for x in buffer if x[‘datetime‘] > (cur_time - datetime.timedelta(seconds=width - interval))]

def dispatcher(src):
    reg_handler = []
    queues = []

    def reg(handler, width, interval):
        q = Queue()
        queues.append(q)

        thrd = threading.Thread(target=window, args=(q, handler, width, interval))

        reg_handler.append(thrd)

    def run():

        for i in reg_handler:
            i.start()

        for item in src:
            for q in queues:
                q.put(item)

    return reg, run

if __name__ == ‘__main__‘:
    import sys

    # path=sys.argv[1]
    path = ‘test.log‘

reg, run = dispatcher(load(‘test.log‘))

# reg(sum_size_handler, 20, 5)
# reg(status_handler, 20, 5)
reg(ua_handler, 20, 5)
run()

完成分析功能

  • 分析日誌很重要,通過海量數據分析就能夠知道是否遭受了攻擊,是否被爬取及爬取高峰期,是否有盜鏈等

狀態碼分析

  • 狀態碼中包含了很多信息。例如
    • 304,服務器收到客戶端提交的請求參數,發現資源未變化,要求瀏覽器使用靜態資源的緩存
    • 404,服務器找不到大請求的資源
    • 304占比大,說明靜態緩存效果明顯。404占比大,說明網站出現了錯誤鏈接,或者嘗試嗅探網站資源
    • 如果400、500占比突然增大,網站一定出了問題。
def status_handler(iterable):
    status = {}
    for dic in iterable:
        key = dic[‘status‘]
        status[key] = status.get(key, 0) + 1

    return {k: v / len(iterable) for k, v in status.items()}

瀏覽器分析

useragent

  • 這裏指的是,軟件按照一定的格式向遠端的服務器提供一個表示自己的字符串
  • 在HTTP協議中,使用useragent字段傳送這個字符串

瀏覽器選項中可以修改此設置

信息提取

安裝

pip install pyyaml ua-parser user-agents

數據分析

d = {}

def ua_handler(iterable):
    ua_family = {}
    for item in iterable:
        val = item[‘useragent‘]
        key = (val.browser.family, val.browser.version_string)
        ua_family[key] = ua_family.get(key, 0) + 1
        d[key] = d.get(key, 0) + 1
    return ua_family, d

Python第七周 學習筆記(1)