1. 程式人生 > >進程管理工具Supervisor(二)Events

進程管理工具Supervisor(二)Events

local server self. command ecs stdout write 失敗 messages

supervisor可以當做一個簡單的進程啟動、重啟、控制工具使用,也可以作為一個進程監控框架使用,作為後者,需要使用supervisor的Events機制。

Event Listeners

supervisor對子程序的監控通過叫做event listener的程序實現。supervisor控制的子程序狀態發生變化時,就會產生一些事件通知,event listener可以對這些事件通知進行訂閱。

event listener本身也是作為supervisor的子程序運行的。事件通知協議的實現基於event listener子程序的stdin和stdout。supervisor發送特定格式的信息到event listener的stdin,然後從event listener的stdout獲得特定格式的輸出,從而形成一個請求/應答循環。

配置

event listener的配置放置於配置文件中的[eventlistener:x]塊中。

[eventlistener:mylistener]
command=my_custom_listener.py
events=PROCESS_STATE,TICK_60

x是listener的名稱,command是執行listener腳本的命令,events是要監控的事件類型。

event listener本身是作為supervisor的子程序運行的,所以與配置子程序[program:x]塊類似,官網例子:

[eventlistener:theeventlistenername]
command=/bin/eventlistener
process_name=%(program_name)s_%(process_num)02d
numprocs=5
events=PROCESS_STATE
buffer_size=10
directory=/tmp
umask=022
priority=-1
autostart=true
autorestart=unexpected
startsecs=1
startretries=3
exitcodes=0,2
stopsignal=QUIT
stopwaitsecs=10
stopasgroup=false
killasgroup=false
user=chrism
redirect_stderr=false
stdout_logfile=/a/path
stdout_logfile_maxbytes=1MB
stdout_logfile_backups=10
stdout_events_enabled=false
stderr_logfile=/a/path
stderr_logfile_maxbytes=1MB
stderr_logfile_backups=10
stderr_events_enabled=false
environment=A="1",B="2"
serverurl=AUTO

事件通知協議

一個event listener可以處於三種狀態,ACKNOWLEDGED、READY、BUSY,只有在READY狀態下才可以接收事件通知。

event listener啟動時處於ACKNOWLEDGED狀態,直到event listener向stdout中輸出“READY\n”字符串為止。

event listener向stdout中輸出“READY\n”之後就處於READY狀態,supervisor會向處於READY狀態的listener發送listener訂閱的事件通知。

listener接收事件通知之後就處於BUSY狀態,期間listener對接收到的事件通知進行處理,處理結束後向stdout輸出

RESULT 2\nOK或者RESULT 4\nFAIL”,前者代表處理成功,後者代表處理失敗。

supervisor收到OK或者FAIL輸出後,就將event listener的狀態置於ACKNOWLEDGED。FAIL的事件通知會被緩存然後再次發送。

event listener的狀態處於ACKNOWLEDGED後可以退出執行,也可以繼續執行,繼續執行就可以向stdout輸出“READY\n”形成一個循環。

supervisor向listener發送的事件通知由兩部分組成,header和body,由"\n"換行符分開。

一個header例子:

ver:3.0 server:supervisor serial:21 pool:listener poolserial:10 eventname:PROCESS_COMMUNICATION_STDOUT len:54

ver:協議版本

server:supervisor的標識符,由[supervisord]塊中的identifier選項設置。

serial:event的序列號

pool:listener的pool的名字。

poolserial:event在pool中的的序列號

eventname:event類型名稱

len:header後面的body長度。

一個body例子:

processname:foo groupname:bar pid:123
This is the data that was sent between the tags

processname:事件所屬的子進程名字

groupname:子進程所屬組名

pid:子進程pid

一個簡單的listener腳本,listener.py:

import sys

def write_stdout(s):
    # only eventlistener protocol messages may be sent to stdout
    sys.stdout.write(s)
    sys.stdout.flush()

def write_stderr(s):
    sys.stderr.write(s)
    sys.stderr.flush()

def main():
    while True:
        # 進入READY狀態
    ┆   write_stdout(‘READY\n‘)
    ┆   # 讀取事件通知的header
    ┆   line = sys.stdin.readline()
    ┆   write_stderr(line)
        # 獲取body長度,讀取body
    ┆   headers=dict([x.split(‘:‘) for x in line.split() ])
    ┆   data = sys.stdin.read(int(headers[‘len‘]))
    ┆   write_stderr(data+‘\n‘)
        # 發送OK進入ACKNOWLEDGED狀態
    ┆   write_stdout(‘RESULT 2\nOK‘)
if __name__ == ‘__main__‘:
    main()

在conf.d目錄中建立一個listener配置文件mylistener.conf:

[eventlistener:mylistener]
command=python listener.py
directory=/thedirectoroflistener.py
user=user
events=PROCESS_STATE,TICK_5
stdout_logfile=/path/to/mylistener_stdout.log
stderr_logfile=/path/to/mylistener_stderr.log

啟動:

ubuntu:$ sudo supervisorctl start all
mylistener: started
celerybeat: started
ubuntu:$ sudo supervisorctl status
celerybeat                       RUNNING   pid 87729, uptime 0:00:20
mylistener                       RUNNING   pid 87728, uptime 0:00:20

監控就開始了,可以到日誌中查看事件通知的內容:

ver:3.0 server:supervisor serial:15361 pool:mylistener poolserial:15361 eventname:PROCESS_STATE_RUNNING len:73
processname:mylistener groupname:mylistener from_state:STARTING pid:87728
ver:3.0 server:supervisor serial:15362 pool:mylistener poolserial:15362 eventname:TICK_5 len:15
when:1514313560
ver:3.0 server:supervisor serial:15364 pool:mylistener poolserial:15364 eventname:PROCESS_STATE_RUNNING len:73
processname:celerybeat groupname:celerybeat from_state:STARTING pid:87729

可以根據自己的需要設定監控的事件類型,然後根據不同的事件類型和內容做出不同的應變,具體的事件類型可以官網查看。

python的supervisor.childutils模塊對header和body的處理進行了包裝:

def get_headers(line):
    return dict([ x.split(‘:‘) for x in line.split() ])

def eventdata(payload):
    headerinfo, data = payload.split(‘\n‘, 1)
    headers = get_headers(headerinfo)
    return headers, data

def get_asctime(now=None):
    if now is None: # for testing
        now = time.time() # pragma: no cover
    msecs = (now - long(now)) * 1000
    part1 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now))
    asctime = ‘%s,%03d‘ % (part1, msecs)
    return asctime

class ProcessCommunicationsProtocol:
    def send(self, msg, fp=sys.stdout):
        fp.write(ProcessCommunicationEvent.BEGIN_TOKEN)
        fp.write(msg)
        fp.write(ProcessCommunicationEvent.END_TOKEN)

    def stdout(self, msg):
        return self.send(msg, sys.stdout)

    def stderr(self, msg):
        return self.send(msg, sys.stderr)

pcomm = ProcessCommunicationsProtocol()

class EventListenerProtocol:
    def wait(self, stdin=sys.stdin, stdout=sys.stdout):
        self.ready(stdout)
        line = stdin.readline()
        headers = get_headers(line)
        payload = stdin.read(int(headers[‘len‘]))
        return headers, payload

    def ready(self, stdout=sys.stdout):
        stdout.write(PEventListenerDispatcher.READY_FOR_EVENTS_TOKEN)
        stdout.flush()

    def ok(self, stdout=sys.stdout):
        self.send(‘OK‘, stdout)

    def fail(self, stdout=sys.stdout):
        self.send(‘FAIL‘, stdout)

    def send(self, data, stdout=sys.stdout):
        resultlen = len(data)
        result = ‘%s%s\n%s‘ % (PEventListenerDispatcher.RESULT_TOKEN_START,
                               str(resultlen),
                               data)
        stdout.write(result)
        stdout.flush()

listener = EventListenerProtocol()

listener腳本可以方便的寫做:

import sys

from supervisor import childutils

def write_stdout(s):
    # only eventlistener protocol messages may be sent to stdout
    sys.stdout.write(s)
    sys.stdout.flush()

def write_stderr(s):
    sys.stderr.write(s)
    sys.stderr.flush()

def main():
    while True:
    ┆   headers, payload = childutils.listener.wait()
    ┆   write_stderr(payload+‘\n‘)
    ┆   childutils.listener.ok(sys.stdout)

if __name__ == ‘__main__‘:
    main()

  

進程管理工具Supervisor(二)Events