1. 程式人生 > >閱讀sqlmap源代碼,編寫burpsuite插件--sqlmapapi(二)

閱讀sqlmap源代碼,編寫burpsuite插件--sqlmapapi(二)

模塊 ini IT pos back evel data active target

burpsuite插件編寫---sql injection

0x00 概要

在安全測試過程中,大部分人會使用burpsuite的scanner模塊進行測試,可以發現一些淺顯的漏洞:比如xss、sql injection、cf、xxe、Arbitrary file existence disclosure in Act、明文傳輸等。
說到sql injection,測試人員都會有一種想法是否存在一款自動化工具,可以將某一網站的所有鏈接都去嘗試一邊,盡可能的發現所有的sql injection。有了這種想法後大家會去想解決方案,有一種解決方案是編寫burpsuite插件。
本文接上一篇文章,上一文記錄到繼承IHttpListener接口編寫插件,影響測試效率,本文將介紹另一種方式,將sqlmapapi 和burpsuite進行對比

0x01 繼承IScannerCheck接口(方法2)

先上插件代碼:

> from burp import IBurpExtender
> from burp import IScannerCheck
> from java.io import PrintWriter
> import re
> import urllib
> import urllib2
> import time
> import json
> from threading import Thread
> import requests
> 
> 
> 
> 
> class BurpExtender(IBurpExtender, IScannerCheck):
> 
>     #
>     #implement IBurpExtender
>     #
>     def    registerExtenderCallbacks(self, callbacks):
>         # keep a reference to our callbacks object
>         self._callbacks = callbacks
> 
>         # set our extension name
>         callbacks.setExtensionName("fanyingjie")
> 
>         # obtain our output stream
>         self._stdout = PrintWriter(callbacks.getStdout(), True)
> 
>         self._helpers  = callbacks.getHelpers()
> 
>         # register ourselves as an
>         callbacks.registerScannerCheck(self)
> 
> 
>     def doActiveScan(self, baseRequestResponse, insertionPoint):
>         pass
>     def doPassiveScan(self, baseRequestResponse):
>         a=self._helpers.analyzeRequest(baseRequestResponse)
>         method=a.getMethod()
>         url=str(a.getUrl())
>         if(("?" in url) and (method=="GET")):
>             self._stdout.println("start")
>             t=AutoSqli(target=url,stdout=self._stdout,method=method)
>             t.run()
> 
>     def consolidateDuplicateIssues(self, existingIssue, newIssue):
>         pass
> 
>             
> 
> class AutoSqli(Thread):
>     def __init__(self,target,stdout,method):
>         self.server="http://192.168.159.134:8775"
>         self.taskid = ‘‘
>         self.target=target
>         self.method=method
>         self._stdout=stdout
>         self.start_time = time.time()
> 
>     def task_new(self):
>         self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
>         self._stdout.println(‘Created new task: ‘ + self.taskid )
>         if len(self.taskid) > 0:
>             return True
>         return False
> 
>     def task_delete(self):
>         if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
>             self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
>             return True
>         return False
> 
> 
>     def scan_start(self):
>         headers = {‘Content-Type‘: ‘application/json‘}
>         payload = {‘url‘:self.target}
>         url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
>         #t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)
>         
>         req=urllib2.Request(url,data=json.dumps(payload),headers=headers)
>         t=json.loads(urllib2.urlopen(req).read())
>         self._stdout.println("start "+ self.taskid)
> 
>         if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
>             return True
>         return False
> 
>     def scan_status(self):
>         status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
>         if status == ‘running‘:
>             return ‘running‘
>         if status == ‘terminated‘:
>             return ‘terminated‘
>         return "error"
> 
>     def scan_data(self):
>         
>         data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
>         if len(data) == 0:
>             self._stdout.println(‘not injection:\t‘ + self.target)
>             return False
>         else:
>             self._stdout.println(‘injection:\t‘ + self.target)
>             return True
> 
>     def scan_kill(self):
>         json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
>         self._stdout.println("%s kill")%(self.taskid)
> 
> 
>     def scan_stop(self):
>         json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
>         self._stdout.println("%s stop")%(self.taskid)
> 
>     def run(self):
>         try:
>             if not self.task_new():
>                 return False
>             if not self.scan_start():
>                 return False
>             while True:
>                 if self.scan_status() == ‘running‘:
>                     time.sleep(10)
>                 elif self.scan_status() == ‘terminated‘:
>                     break
>                 else:
>                     break
>                 #print self.target + ":\t" + str(time.time() - self.start_time)
>                 if time.time() - self.start_time > 500:
>                     self.scan_stop()
>                     self.scan_kill()
>                     break
>             self.scan_data()
>             #self.task_delete()
> 
>         except Exception as e:
>             pass
> 

使用burp scanner模塊和上面寫的插件對http://172.16.173.136/sqli-labs/Less-8/?id=234 進行sql註入測試,burp scanner模塊可以測試出存在註入,但是插件無法測試出;
因此插件進行改寫,修改默認配置,在插件中添加以下代碼,我只修改了level和risk

    def optionSet(self):
        headers = {‘Content-Type‘: ‘application/json‘}
        payload={"level":"5","risk":"3"}
        url = self.server + ‘/option/‘ + self.taskid + ‘/set‘
        req = urllib2.Request(url, data=json.dumps(payload), headers=headers)
        t = json.loads(urllib2.urlopen(req).read())
        self._stdout.println("set option " + self.taskid)

然後在新建掃描後,調用配置設置函數,修改後的代碼:

from burp import IBurpExtender
from burp import IScannerCheck
from java.io import PrintWriter
import re
import urllib
import urllib2
import time
import json
from threading import Thread
import requests

class BurpExtender(IBurpExtender, IScannerCheck):

    #
    # implement IBurpExtender
    #
    def registerExtenderCallbacks(self, callbacks):
        # keep a reference to our callbacks object
        self._callbacks = callbacks

        # set our extension name
        callbacks.setExtensionName("fanyingjie")

        # obtain our output stream
        self._stdout = PrintWriter(callbacks.getStdout(), True)

        self._helpers = callbacks.getHelpers()

        # register ourselves as an
        callbacks.registerScannerCheck(self)

    def doActiveScan(self, baseRequestResponse, insertionPoint):
        pass

    def doPassiveScan(self, baseRequestResponse):
        a = self._helpers.analyzeRequest(baseRequestResponse)
        method = a.getMethod()
        url = str(a.getUrl())
        if (("?" in url) and (method == "GET")):
            self._stdout.println("start")
            t = AutoSqli(target=url, stdout=self._stdout, method=method)
            t.run()

    def consolidateDuplicateIssues(self, existingIssue, newIssue):
        pass

class AutoSqli(Thread):
    def __init__(self, target, stdout, method):
        self.server = "http://172.16.173.136:8775"
        self.taskid = ‘‘
        self.target = target
        self.method = method
        self._stdout = stdout
        self.start_time = time.time()

    def task_new(self):
        self.taskid = json.loads(urllib2.urlopen(self.server + ‘/task/new‘).read())[‘taskid‘]
        self._stdout.println(‘Created new task: ‘ + self.taskid)
        if len(self.taskid) > 0:
            return True
        return False

    def task_delete(self):
        if json.loads(urllib2.urlopen(self.server + ‘/task/‘ + self.taskid + ‘/delete‘).read())[‘success‘]:
            self._stdout.println(‘[%s] Deleted task‘ % (self.taskid))
            return True
        return False
    def optionSet(self):
        headers = {‘Content-Type‘: ‘application/json‘}
        payload={"level":"5","risk":"3"}
        url = self.server + ‘/option/‘ + self.taskid + ‘/set‘
        req = urllib2.Request(url, data=json.dumps(payload), headers=headers)
        t = json.loads(urllib2.urlopen(req).read())
        self._stdout.println("set option " + self.taskid)

    def scan_start(self):
        headers = {‘Content-Type‘: ‘application/json‘}
        payload = {‘url‘: self.target}
        url = self.server + ‘/scan/‘ + self.taskid + ‘/start‘
        # t = json.loads(requests.post(url, data=json.dumps(payload), headers=headers).text)

        req = urllib2.Request(url, data=json.dumps(payload), headers=headers)
        t = json.loads(urllib2.urlopen(req).read())
        self._stdout.println("start " + self.taskid)

        if len(str(t[‘engineid‘])) > 0 and t[‘success‘]:
            return True
        return False

    def scan_status(self):
        status = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/status‘).read())[‘status‘]
        if status == ‘running‘:
            return ‘running‘
        if status == ‘terminated‘:
            return ‘terminated‘
        return "error"

    def scan_data(self):

        data = json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/data‘).read())[‘data‘]
        if len(data) == 0:
            self._stdout.println(‘not injection:\t‘ + self.target)
            return False
        else:
            self._stdout.println(‘injection:\t‘ + self.target)
            return True

    def scan_kill(self):
        json.loads(rurllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/kill‘).read())[‘success‘]
        self._stdout.println("%s kill") % (self.taskid)

    def scan_stop(self):
        json.loads(urllib2.urlopen(self.server + ‘/scan/‘ + self.taskid + ‘/stop‘).read())[‘success‘]
        self._stdout.println("%s stop") % (self.taskid)

    def run(self):
        try:
            if not self.task_new():
                return False
            self.optionSet()
            if not self.scan_start():
                return False
            while True:
                if self.scan_status() == ‘running‘:
                    time.sleep(10)
                elif self.scan_status() == ‘terminated‘:
                    break
                else:
                    break
                # print self.target + ":\t" + str(time.time() - self.start_time)
                if time.time() - self.start_time > 500:
                    self.scan_stop()
                    self.scan_kill()
                    break
            self.scan_data()
            # self.task_delete()

        except Exception as e:
            pass

再次對上面代碼進行測試,可以測試出存在註入
插件的輸出:

start
Created new task: 42f685742c94a985
set option 42f685742c94a985
start 42f685742c94a985
injection: http://172.16.173.136:80/sqli-labs/Less-8/?id=234


比較sqlmapapi和burpsuite scanner模塊:
通過修改sqlmapapi默認設置可以提高註入檢測的準確率,但是修改默認配置後sqlmapapi不見的比burpsuite scanner模塊更厲害,而且sqlmapapi只能檢測get請求類型,且不能檢測登錄後的get請求,因此看來在安全測試過程中,如果目的只是發現網站是否存在sql註入,還是使用burpsuite scanner 模塊比較好用;在知道存在sql註入後,可以使用sqlmap來進行後續的操作。

閱讀sqlmap源代碼,編寫burpsuite插件--sqlmapapi(二)