日誌服務Python消費組實戰(三):實時跨域監測多日誌庫資料
解決問題
使用日誌服務進行資料處理與傳遞的過程中,你是否遇到如下監測場景不能很好的解決:
- 特定資料上傳到日誌服務中需要檢查資料內的異常情況,而沒有現成監控工具?
- 需要檢索資料裡面的關鍵字,但資料沒有建立索引,無法使用日誌服務的告警功能?
- 資料監測要求實時性(<5秒,例如Web訪問500錯誤),而特定功能都有一定延遲(1分鐘以上)?
- 存在多個域的多個日誌庫(例如每個Region的錯誤檔案對應的日誌庫),資料量不大,但監控邏輯類似,每個目標都要監控與配置,比較繁瑣?
如果是的,您可以考慮使用日誌服務Python消費組進行跨域實時資料監控,本文主要介紹如何使用消費組實時監控多個域中的多個日誌庫中的異常資料,並進行下一步告警動作。可以很好解決以上問題,並利用消費組的特點,達到自動平衡、負載均衡和高可用性。
基本概念
協同消費庫(Consumer Library)是對日誌服務中日誌進行消費的高階模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行資料讀取的區別在於,使用者無需關心日誌服務的實現細節,只需要專注於業務邏輯,另外,消費者之間的負載均衡、failover等使用者也都無需關心。
消費組(Consumer Group) - 一個消費組由多個消費者構成,同一個消費組下面的消費者共同消費一個logstore中的資料,消費者之間不會重複消費資料。
消費者(Consumer) - 消費組的構成單元,實際承擔消費任務,同一個消費組下面的消費者名稱必須不同。
在日誌服務中,一個logstore下面會有多個shard,協同消費庫的功能就是將shard分配給一個消費組下面的消費者,分配方式遵循以下原則:
- 每個shard只會分配到一個消費者。
- 一個消費者可以同時擁有多個shard。
新的消費者加入一個消費組,這個消費組下面的shard從屬關係會調整,以達到消費負載均衡的目的,但是上面的分配原則不會變,分配過程對使用者透明。
協同消費庫的另一個功能是儲存checkpoint,方便程式故障恢復時能接著從斷點繼續消費,從而保證資料不會被重複消費。
使用消費組進行實時分發
這裡我們描述用Python使用消費組進行程式設計,實時跨域監測多個域的多個日誌庫,全文或特定欄位檢查
注意:本篇文章的相關程式碼可能會更新,最新版本在這裡可以找到:
安裝
環境
- 建議程式執行在靠近源日誌庫同Region下的ECS上,並使用區域網服務入口,這樣好處是網路速度最快,其次是讀取沒有外網費用產生。
- 強烈推薦PyPy3來執行本程式,而不是使用標準CPython直譯器。
- 日誌服務的Python SDK可以如下安裝:
pypy3 -m pip install aliyun-log-python-sdk -U
更多SLS Python SDK的使用手冊,可以參考這裡
程式配置
如下展示如何配置程式:
- 配置程式日誌檔案,以便後續測試或者診斷可能的問題(跳過,具體參考樣例)。
- 基本的日誌服務連線與消費組的配置選項。
- 目標Logstore的一些連線資訊
請仔細閱讀程式碼中相關注釋並根據需要調整選項:
#encoding: utf8
def get_option():
##########################
# 基本選項
##########################
# 從環境變數中載入SLS引數與選項,endpoint、project、logstore可以多個並配對
endpoints = os.environ.get('SLS_ENDPOINTS', '').split(";") # ;分隔
projects = os.environ.get('SLS_PROJECTS', '').split(";") # ;分隔
logstores = os.environ.get('SLS_LOGSTORES', '').split(";") # ;分隔,同一個Project下的用,分隔
accessKeyId = os.environ.get('SLS_AK_ID', '')
accessKey = os.environ.get('SLS_AK_KEY', '')
consumer_group = os.environ.get('SLS_CG', '')
# 消費的起點。這個引數在第一次跑程式的時候有效,後續再次執行將從上一次消費的儲存點繼續。
# 可以使”begin“,”end“,或者特定的ISO時間格式。
cursor_start_time = "2018-12-26 0:0:0"
# 一般不要修改消費者名,尤其是需要併發跑時
consumer_name = "{0}-{1}".format(consumer_group, current_process().pid)
# 設定共享執行器
exeuctor = ThreadPoolExecutor(max_workers=2)
# 構建多個消費組(每個logstore一個)
options = []
for i in range(len(endpoints)):
endpoint = endpoints[i].strip()
project = projects[i].strip()
if not endpoint or not project:
logger.error("project: {0} or endpoint {1} is empty, skip".format(project, endpoint))
continue
logstore_list = logstores[i].split(",")
for logstore in logstore_list:
logstore = logstore.strip()
if not logstore:
logger.error("logstore for project: {0} or endpoint {1} is empty, skip".format(project, endpoint))
continue
option = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
consumer_name, cursor_position=CursorPosition.SPECIAL_TIMER_CURSOR,
cursor_start_time=cursor_start_time, shared_executor=exeuctor)
options.append(option)
# 設定檢測目標欄位與目標值,例如這裡是檢測status欄位是否有500等錯誤
keywords = {'status': r'5\d{2}'}
return exeuctor, options, keywords
注意,配置了多個endpoint、project、logstore,需要用分號分隔,並且一一對應;如果一個project下有多個logstore需要檢測,可以將他們直接用逗號分隔。如下是一個檢測3個Region下的4個Logstore的配置:
export SLS_ENDPOINTS=cn-hangzhou.log.aliyuncs.com;cn-beijing.log.aliyuncs.com;cn-qingdao.log.aliyuncs.com
export SLS_PROJECTS=project1;project2;project3
export SLS_LOGSTORES=logstore1;logstore2;logstore3_1,logstore3_2
資料監測
如下程式碼展示如何構建一個關鍵字檢測器,針對資料中的目標欄位進行檢測,您也可以修改邏輯設定為符合需要的場景(例如多個欄位的組合關係等)。
class KeywordMonitor(ConsumerProcessorBase):
"""
this consumer will keep monitor with k-v fields. like {"content": "error"}
"""
def __init__(self, keywords=None, logstore=None):
super(KeywordMonitor, self).__init__() # remember to call base init
self.keywords = keywords
self.kw_check = {}
for k, v in self.keywords.items():
self.kw_check[k] = re.compile(v)
self.logstore = logstore
def process(self, log_groups, check_point_tracker):
logs = PullLogResponse.loggroups_to_flattern_list(log_groups)
match_count = 0
sample_error_log = ""
for log in logs:
m = None
for k, c in self.kw_check.items():
if k in log:
m = c.search(log[k])
if m:
logger.debug('Keyword detected for shard "{0}" with keyword: "{1}" in field "{2}", log: {3}'
.format(self.shard_id, log[k], k, log))
if m:
match_count += 1
sample_error_log = log
if match_count:
logger.info("Keyword detected for shard {0}, count: {1}, example: {2}".format(self.shard_id, match_count, sample_error_log))
# TODO: 這裡新增通知下游的程式碼
else:
logger.debug("No keyword detected for shard {0}".format(self.shard_id))
self.save_checkpoint(check_point_tracker)
控制邏輯
如下展示如何控制多個消費者,並管理退出命令:
def main():
exeuctor, options, keywords = get_monitor_option()
logger.info("*** start to consume data...")
workers = []
for option in options:
worker = ConsumerWorker(KeywordMonitor, option, args=(keywords,) )
workers.append(worker)
worker.start()
try:
for i, worker in enumerate(workers):
while worker.is_alive():
worker.join(timeout=60)
logger.info("worker project: {0} logstore: {1} exit unexpected, try to shutdown it".format(
options[i].project, options[i].logstore))
worker.shutdown()
except KeyboardInterrupt:
logger.info("*** try to exit **** ")
for worker in workers:
worker.shutdown()
# wait for all workers to shutdown before shutting down executor
for worker in workers:
while worker.is_alive():
worker.join(timeout=60)
exeuctor.shutdown()
if __name__ == '__main__':
main()
啟動
假設程式命名為"monitor_keyword.py",可以如下啟動:
export SLS_ENDPOINTS=cn-hangzhou.log.aliyuncs.com;cn-beijing.log.aliyuncs.com;cn-qingdao.log.aliyuncs.com
export SLS_PROJECTS=project1;project2;project3
export SLS_LOGSTORES=logstore1;logstore2;logstore3_1,logstore3_2
export SLS_AK_ID=<YOUR AK ID>
export SLS_AK_KEY=<YOUR AK KEY>
export SLS_CG=<消費組名,可以簡單命名為"dispatch_data">
pypy3 monitor_keyword.py
效能考慮
啟動多個消費者
如果您的目標logstore存在多個shard,或者您的目標監測日誌庫較多,您可以進行一定劃分並並啟動多次程式:
# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES
nohup pypy3 dispatch_data.py &
# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES
nohup pypy3 dispatch_data.py &
# export SLS_ENDPOINTS, SLS_PROJECTS, SLS_LOGSTORES
nohup pypy3 dispatch_data.py &
...
注意:
所有消費者使用了同一個消費組的名字和不同的消費者名字(因為消費者名以程序ID為字尾)。
但資料量較大或者目標日誌庫較多時,單個消費者的速度可能無法滿足需求,且因為Python的GIL的原因,只能用到一個CPU核。強烈建議您根據目標日誌庫的Shard數以及CPU的數量進行劃分,啟動多次以便重複利用CPU資源。
效能吞吐
基於測試,在沒有頻寬限制、接收端速率限制(如Splunk端)的情況下,以推進硬體用pypy3
執行上述樣例,單個消費者佔用大約10%的單核CPU
下可以消費達到5 MB/s
原始日誌的速率。因此,理論上可以達到50 MB/s
原始日誌每個CPU核
,也就是每個CPU核每天可以消費4TB原始日誌
。
注意: 這個資料依賴頻寬、硬體引數等。
高可用性
消費組會將檢測點(check-point)儲存在伺服器端,當一個消費者停止,另外一個消費者將自動接管並從斷點繼續消費。
可以在不同機器上啟動消費者,這樣當一臺機器停止或者損壞的清下,其他機器上的消費者可以自動接管並從斷點進行消費。
理論上,為了備用,也可以啟動大於shard數量的消費者。
其他
限制與約束
每一個日誌庫(logstore)最多可以配置10個消費組,如果遇到錯誤ConsumerGroupQuotaExceed
則表示遇到限制,建議在控制檯端刪除一些不用的消費組。
監測
Https
如果服務入口(endpoint)配置為https://
字首,如https://cn-beijing.log.aliyuncs.com
,程式與SLS的連線將自動使用HTTPS加密。
伺服器證書*.aliyuncs.com
是GlobalSign簽發,預設大多數Linux/Windows的機器會自動信任此證書。如果某些特殊情況,機器不信任此證書,可以參考這裡下載並安裝此證書。
更多案例
- 日誌服務Python消費組實戰(一):日誌服務與SIEM(如Splunk)整合實戰
- 日誌服務Python消費組實戰(二):實時日誌分發
- 日誌服務Python消費組實戰(三):實時跨域監測多日誌庫資料
- 本文Github樣例