1. 程式人生 > >使用Python完成控制主機與任務節點的互動 [Demo]

使用Python完成控制主機與任務節點的互動 [Demo]



(一)

馬上做一個分散式漏洞掃描與攻擊的專案,這段時間一直選技術路線以及做大量的demo。這篇是記錄我在主控端與各個漏洞掃描節點協調通訊上的一個demo程式碼。我選擇使用類似於WebService的技術,即各個節點暴露WebService介面,主控端去呼叫並且拿到回撥。WebService基於SOAP協議通訊我覺得太麻煩,因為我的需求是主控端分發任務佇列給節點,節點執行,完成之後回撥,主控端進行處理,非同步呼叫,邏輯簡單。最終我試了一下基於GET方法的類似於WebService的方法,自己寫互動的過程,這樣編碼也比較方便: - )這裡我不禁要吐槽下Python開發一個WebService

配置執行環境實在是蛋疼,但是呼叫WebService倒是很容易: - (

(二)

我使用了BaseHTTPServer,在主控端和各個節點上都維護一個多執行緒Server進行互動,這樣編碼也比較簡單,可以把精力主要投放在設計好通訊細節以及處理各種突發事件即可。

這個過程的邏輯如下:

  1. 主控端接收外部呼叫

  2. 判斷呼叫合法性,並重新組裝請求併發給掃描節點

  3. 節點收到請求,進行身份驗證,並提取出資訊,根據資訊(服務名、引數)開啟工作執行緒。

  4. 節點執行完進行回撥。

整個過程非同步實現,畫了張圖如下:

(三)

使用PythonBaseHTTPServer實現,這個模組可以輕易實現一個

HTTP伺服器,使用裡面的do_GET回撥函式執行具體的邏輯,識別是外部呼叫請求還是回撥請求,並對請求進行解析以及處理異常情況。

由於只是探測階段,所以只是簡單做了一個小demo,證明這種處理方式的可行性。下面就是貼程式碼了:

控制端:

#coding=utf-8
'''
控制節點邏輯:
1.接收外部介面的呼叫,格式為:
如控制節點1,執行其內部的print_job任務
http://127.0.0.1:8000/1/print_job

2.接收節點的Callback
如收到:
http://127.0.0.1:8001/SUCCESS/NODE_ONE
表示節點1的任務完成
如果是:
http://127.0.0.1:8001/ERROR/NODE_ONE
表示節點1執行出錯

3.轉發給節點
各個節點分析URL
http://127.0.0.1:8001/1/print_job
表示執行方法為print_job

'''
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn
import urllib
import re
import threading

NODE_ONE = ('127.0.0.1','8001')
NODE_TWO = ('127.0.0.1','8002')

class MyRequestHandler(BaseHTTPRequestHandler):

    '''/1/print|1,2,3(判斷url後面跟的引數列表)'''
    def getArguments(self,path):
        '''
        傳遞引數:/1/print|1,2,3  or  /1/print|
        '''
        if path.count('|') == 0:
            #無引數
            return []
        else:
            args = path[1:].split('/')[1].split('|')[1].split(',')
            return args
    def parseArgs(self,args):
        return str(args).replace('[','').replace(']','')
    def _writeheader(self,wtype):
        if wtype == 'OK':
            self.send_response(200)
        else:
            self.send_response(404)
        self.send_header('Content-type','text/html')
        self.end_headers()

    '''
    給任務節點分配任務(請求中轉)
    http://127.0.0.1:8001/1/print_job|1,2,3
    '''
    def send_work(self,path):
        info = path[1:].split('/')
        node = info[0]  #識別節點
        job = info[1][:info[1].find('|')]   #呼叫節點的函式
        args = self.getArguments(path)
        print 'Node %s will do this: %s, args ====> %s' % (node,job,args)
        if node == '1': #如果是控制節點1
            #判斷功能是否合法
            if job == 'print_job':
                self._writeheader('OK')
                url = 'http://%s:%s/%s/%s|%s' % (NODE_ONE[0],NODE_ONE[1],node,job,self.parseArgs(args))
                urllib.urlopen(url.replace("'",''))
                print 'Assign success!'
                return 'SENT_JOBS'
            elif job == 'xxx':   #新增節點可呼叫的服務列表
                '''write other functions'''
                pass
            else:
                return 'INVALID_JOB'
        elif node == '2':  #如果是節點2
            if job == 'print_job':
                self._writeheader('OK')
                url = 'http://%s:%s/%s/%s|%s' % (NODE_TWO[0],NODE_TWO[1],node,job,self.parseArgs(args))
                urllib.urlopen(url.replace("'",''))
                print 'Assign success!'
                return 'SENT_JOBS'
            else:
                self._writeheader('ERR')
                return 'INVALID_JOB'
        else:
            return 'NODE_NOT_EXISTS'

    def do_GET(self):
        '''callback:http://127.0.0.1:8001/SUCCESS/NODE_ONE'''
        pattern1 = re.compile(r'^/([A-Z]+)/(NODE_[A-Z]+)$')
        '''command:http://127.0.0.1:8002/1/print_job|1,2,3'''
        pattern2 = re.compile(r'^/(\d+)/([a-z_]+)\|([\w,]*?)$')

        callback = pattern1.findall(self.path)  #節點回調
        command = pattern2.findall(self.path)   #外部呼叫

        #收到callback
        if len(callback) == 1:
            status = callback[0][0]  #識別狀態
            who = callback[0][1]   #識別節點
            if status == 'SUCCESS':  
                print '%s : %s Completed!' % (who,status)
                self._writeheader('OK')
            else:
                print '%s : %s Error!' % (who,status)
                self._writeheader('OK') 
                
        #如果收到指令,就分配任務
        elif len(command) == 1:
            self._writeheader('OK')
            job = command[0][0]   #具體的task
            who = command[0][1]   #指定節點
            print 'Send to %s : %s' % (who,job)
            ret = self.send_work(self.path)  #分配task
            print 'send %s' % ret
            
        else:
            #路徑出錯
            print 'Path: %s Error!' % self.path
            self._writeheader('ERR')

class ThreadingHTTPServer(ThreadingMixIn,HTTPServer):
    pass

if __name__ == '__main__':
    serveraddr = ('127.0.0.1',8000)
    myCtrl = ThreadingHTTPServer(serveraddr,MyRequestHandler)
    myCtrl.serve_forever()

節點1

#coding=utf-8

'''
工作節點:
提供的介面:
<a target=_blank href="http://127.0.0.1:8001/1/print_job">http://127.0.0.1:8001/1/print_job</a>
最好的方式:
把整個業務邏輯做成執行緒類

'''

from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn
import urllib
import re

CONTROLLER = "http://127.0.0.1:8000/"

class MyRequestHandler(BaseHTTPRequestHandler):
    
    '''身份驗證(是否為主控節點)'''
    def IDConfirm(self,path):
        req_addr = self.client_address
        if req_addr[0] != '127.0.0.1':
            return False
        else:
            return True

    '''請求響應'''
    def _writeheader(self,wtype):
        if wtype == 'OK':
            self.send_response(200)
        elif wtype == 'INVLID_HOST':  #授權失敗
            self.send_response(401)
        else:
            self.send_response(404)
        self.send_header('Content-type','text/html')
        self.end_headers()

    def getArguments(self,path):
        '''
        傳遞引數:/1/print|1,2,3  or  /1/print|
        '''
        if path.count('|') == 0:
            #無引數
            return []
        else:
            args = path[1:].split('/')[1].split('|')[1].split(',')
            return args


    '''解析請求引數'''
    def parseRequests(self,request):
        #/1/print_job|1,2,3
        info = request[1:].split('/')
        node = info[0]  #識別節點 
        job = info[1].split('|')[0]   #呼叫節點的函式
        args = self.getArguments(self.path)  #獲取引數列表
        return node,job,args

    '''暴露的服務'''
    def print_job(self,name='NoneArgs'):
        print 'Node_ONE: receive jobs from controller,the args is %s' % name
        return 'SUCCESS'

    '''GET回撥'''
    def do_GET(self):
        #授權失敗,拒絕服務 
        if not self.IDConfirm(self.path):
            self._writeheader('INVLID_HOST')
        #授權成功,提供服務
        else:
            print self.path
            '''解析請求並執行服務'''
            (node,job,args) = self.parseRequests(self.path)
            print (node,job,args)
            if job == 'print_job':  #判斷服務合法性
                self._writeheader('OK')
                print 'Print Job Working...'
                ret = self.print_job(args[0])
                urllib.urlopen('%s%s%s' % (CONTROLLER,ret,'/NODE_ONE'))
            elif job == 'xxx':
                pass
            else:
                self._writeheader('ERR')
                print 'Unknown Job...'
                urllib.urlopen('%s%s' % (CONTROLLER,"ERROR/NODE_ONE"))
        

class ThreadingHTTPServer(ThreadingMixIn,HTTPServer):
    pass

if __name__ == '__main__':
    serveraddr = ('127.0.0.1',8001)
    node1 = ThreadingHTTPServer(serveraddr,MyRequestHandler)
    node1.serve_forever()

節點2的程式碼和節點1的程式碼基本一致,除了埠不同,還是貼上去算了。

#coding=utf-8

'''
工作節點:
提供的介面:
http://127.0.0.1:8001/2/print_job
'''

from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn
import urllib
import re

CONTROLLER = "http://127.0.0.1:8000/"

class MyRequestHandler(BaseHTTPRequestHandler):
    
    '''身份驗證(是否為主控節點)'''
    def IDConfirm(self,path):
        req_addr = self.client_address
        if req_addr[0] != '127.0.0.1':
            return False
        else:
            return True

    '''請求響應'''
    def _writeheader(self,wtype):
        if wtype == 'OK':
            self.send_response(200)
        elif wtype == 'INVLID_HOST':  #授權失敗
            self.send_response(401)
        else:
            self.send_response(404)
        self.send_header('Content-type','text/html')
        self.end_headers()

    def getArguments(self,path):
        '''
        傳遞引數:/1/print|1,2,3  or  /1/print|
        '''
        if path.count('|') == 0:
            #無引數
            return []
        else:
            args = path[1:].split('/')[1].split('|')[1].split(',')
            return args


    '''解析請求引數'''
    def parseRequests(self,request):
        #/1/print_job|1,2,3
        info = request[1:].split('/')
        node = info[0]  #識別節點 
        job = info[1].split('|')[0]   #呼叫節點的函式
        args = self.getArguments(self.path)  #獲取引數列表
        return node,job,args

    '''暴露的服務'''
    def print_job(self,name='NoneArgs'):
        print 'Node_TWO: receive jobs from controller,the args is %s' % name
        return 'SUCCESS'

    '''GET回撥'''
    def do_GET(self):
        #授權失敗,拒絕服務 
        if not self.IDConfirm(self.path):
            self._writeheader('INVLID_HOST')
        #授權成功,提供服務
        else:
            print self.path
            '''解析請求並執行服務'''
            (node,job,args) = self.parseRequests(self.path)
            print (node,job,args)
            if job == 'print_job':  #判斷服務合法性
                self._writeheader('OK')
                print 'Print Job Working...'
                ret = self.print_job(args[0])
                urllib.urlopen('%s%s%s' % (CONTROLLER,ret,'/NODE_ONE'))
            elif job == 'xxx':
                pass
            else:
                self._writeheader('ERR')
                print 'Unknown Job...'
                urllib.urlopen('%s%s' % (CONTROLLER,"ERROR/NODE_ONE"))
        

class ThreadingHTTPServer(ThreadingMixIn,HTTPServer):
    pass

if __name__ == '__main__':
    serveraddr = ('127.0.0.1',8002)
    node1 = ThreadingHTTPServer(serveraddr,MyRequestHandler)
    node1.serve_forever()

(四)

以上程式碼經過測試,直譯器順利執行,過程如下:

1)向主控端傳送呼叫資訊:

2)主控端接收訊息:

3)節點1執行: