1. 程式人生 > >介面自動化測試方案詳解

介面自動化測試方案詳解

#!/usr/bin/env python
#coding=utf8

# Todo:介面自動化測試
# Author:歸根落葉
# Blog:http://this.ispenn.com

import json
import http.client,mimetypes
from urllib.parse import urlencode
import random
import time
import re
import logging
import os,sys
try:
    import xlrd
except:
    os.system('pip install -U xlrd')
    import xlrd
try:
    from pyDes import *
except ImportError as e:
    os.system('pip install -U pyDes --allow-external pyDes --allow-unverified pyDes')
    from pyDes import *
import hashlib
import base64
import smtplib  
from email.mime.text import MIMEText 

log_file = os.path.join(os.getcwd(),'log/liveappapi.log')
log_format = '[%(asctime)s] [%(levelname)s] %(message)s'
logging.basicConfig(format=log_format,filename=log_file,filemode='w',level=logging.DEBUG)
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter(log_format)
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)

#獲取並執行測試用例
def runTest(testCaseFile):
    testCaseFile = os.path.join(os.getcwd(),testCaseFile)
    if not os.path.exists(testCaseFile):
        logging.error('測試用例檔案不存在!!!')
        sys.exit()
    testCase = xlrd.open_workbook(testCaseFile)
    table = testCase.sheet_by_index(0)
    errorCase = []
    correlationDict = {}
    correlationDict['${hashPassword}'] = hash1Encode('123456')
    correlationDict['${session}'] = None
    for i in range(1,table.nrows):
        correlationDict['${randomEmail}'] = ''.join(random.sample('abcdefghijklmnopqrstuvwxyz',6)) + '@automation.test'
        correlationDict['${randomTel}'] = '186' + str(random.randint(10000000,99999999))
        correlationDict['${timestamp}'] = int(time.time())
        if table.cell(i,10).value.replace('\n','').replace('\r','') != 'Yes':
            continue
        num = str(int(table.cell(i,0).value)).replace('\n','').replace('\r','')
        api_purpose = table.cell(i,1).value.replace('\n','').replace('\r','')
        api_host = table.cell(i,2).value.replace('\n','').replace('\r','')
        request_url = table.cell(i,3).value.replace('\n','').replace('\r','')
        request_method = table.cell(i,4).value.replace('\n','').replace('\r','')
        request_data_type = table.cell(i,5).value.replace('\n','').replace('\r','')
        request_data = table.cell(i,6).value.replace('\n','').replace('\r','')
        encryption = table.cell(i,7).value.replace('\n','').replace('\r','')
        check_point = table.cell(i,8).value
        correlation = table.cell(i,9).value.replace('\n','').replace('\r','').split(';')
        for key in correlationDict:
            if request_url.find(key) > 0:
                request_url = request_url.replace(key,str(correlationDict[key]))
        if request_data_type == 'Form':
            dataFile = request_data
            if os.path.exists(dataFile):
                fopen = open(dataFile,encoding='utf-8')
                request_data = fopen.readline()
                fopen.close()
            for keyword in correlationDict:
                if request_data.find(keyword) > 0:
                    request_data = request_data.replace(keyword,str(correlationDict[keyword]))
            try:
                if encryption == 'MD5':
                    request_data = json.loads(request_data)
                    status,md5 = getMD5(api_host,urlencode(request_data).replace("%27","%22"))
                    if status != 200:
                        logging.error(num + ' ' + api_purpose + "[ " + str(status) + " ], 獲取md5驗證碼失敗!!!")
                        continue
                    request_data = dict(request_data,**{"sign":md5.decode("utf-8")})
                    request_data = urlencode(request_data).replace("%27","%22")
                elif encryption == 'DES':
                    request_data = json.loads(request_data)
                    request_data = urlencode({'param':encodePostStr(request_data)})
                else:
                    request_data = urlencode(json.loads(request_data))
            except Exception as e:
                logging.error(num + ' ' + api_purpose + ' 請求的資料有誤,請檢查[Request Data]欄位是否是標準的json格式字串!')
                continue
        elif request_data_type == 'Data':
            dataFile = request_data
            if os.path.exists(dataFile):
                fopen = open(dataFile,encoding='utf-8')
                request_data = fopen.readline()
                fopen.close()
            for keyword in correlationDict:
                if request_data.find(keyword) > 0:
                    request_data = request_data.replace(keyword,str(correlationDict[keyword]))
            request_data = request_data.encode('utf-8')
        elif request_data_type == 'File':
            dataFile = request_data
            if not os.path.exists(dataFile):
                logging.error(num + ' ' + api_purpose + ' 檔案路徑配置無效,請檢查[Request Data]欄位配置的檔案路徑是否存在!!!')
                continue
            fopen = open(dataFile,'rb')
            data = fopen.read()
            fopen.close()
            request_data = '''
------WebKitFormBoundaryDf9uRfwb8uzv1eNe
Content-Disposition:form-data;name="file";filename="%s"
Content-Type:
Content-Transfer-Encoding:binary

%s
------WebKitFormBoundaryDf9uRfwb8uzv1eNe--
    ''' % (os.path.basename(dataFile),data)
        status,resp = interfaceTest(num,api_purpose,api_host,request_url,request_data,check_point,request_method,request_data_type,correlationDict['${session}'])
        if status != 200:
            errorCase.append((num + ' ' + api_purpose,str(status),'http://'+api_host+request_url,resp))
            continue
        for j in range(len(correlation)):
            param = correlation[j].split('=')
            if len(param) == 2:
                if param[1] == '' or not re.search(r'^\[',param[1]) or not re.search(r'\]$',param[1]):
                    logging.error(num + ' ' + api_purpose + ' 關聯引數設定有誤,請檢查[Correlation]欄位引數格式是否正確!!!')
                    continue
                value = resp
                for key in param[1][1:-1].split(']['):
                    try:
                        temp = value[int(key)]
                    except:
                        try:
                            temp = value[key]
                        except:
                            break
                    value = temp
                correlationDict[param[0]] = value
    return errorCase

# 介面測試
def interfaceTest(num,api_purpose,api_host,request_url,request_data,check_point,request_method,request_data_type,session):  
    headers = {'Content-Type':'application/x-www-form-urlencoded; charset=UTF-8',
               'X-Requested-With':'XMLHttpRequest',
               'Connection':'keep-alive',
               'Referer':'http://' + api_host,
               'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.134 Safari/537.36'}
    if session is not None:
        headers['Cookie'] = 'session=' + session
        if request_data_type == 'File':
            headers['Content-Type'] = 'multipart/form-data;boundary=----WebKitFormBoundaryDf9uRfwb8uzv1eNe;charset=UTF-8'
        elif request_data_type == 'Data':
            headers['Content-Type'] = 'text/plain; charset=UTF-8'

    conn = http.client.HTTPConnection(api_host)
    if request_method == 'POST':
        conn.request('POST',request_url,request_data,headers=headers)
    elif request_method == 'GET':
        conn.request('GET',request_url+'?'+request_data,headers=headers)
    else:
        logging.error(num + ' ' + api_purpose + ' HTTP請求方法錯誤,請確認[Request Method]欄位是否正確!!!')
        return 400,request_method
    response = conn.getresponse()
    status = response.status
    resp = response.read()
    if status == 200:
        resp = resp.decode('utf-8')
        if re.search(check_point,str(resp)):
            logging.info(num + ' ' + api_purpose + ' 成功, ' + str(status) + ', ' + str(resp))
            return status,json.loads(resp)
        else:
            logging.error(num + ' ' + api_purpose + ' 失敗!!!, [ ' + str(status) + ' ], ' + str(resp))
            return 2001,resp
    else:
        logging.error(num + ' ' + api_purpose + ' 失敗!!!, [ ' + str(status) + ' ], ' + str(resp))
        return status,resp.decode('utf-8')

#獲取md5驗證碼
def getMD5(url,postData):
    headers = {'Content-Type':'application/x-www-form-urlencoded; charset=UTF-8',
               'X-Requested-With':'XMLHttpRequest'}
    conn = http.client.HTTPConnection('this.ismyhost.com')
    conn.request('POST','/get_isignature',postData,headers=headers)
    response = conn.getresponse()
    return response.status,response.read()

# hash1加密
def hash1Encode(codeStr):
    hashobj = hashlib.sha1()
    hashobj.update(codeStr.encode('utf-8'))
    return hashobj.hexdigest()

# DES加密
def desEncode(desStr):
    k = des('secretKEY', padmode=PAD_PKCS5)
    encodeStr = base64.b64encode(k.encrypt(json.dumps(desStr)))
    return encodeStr

# 字典排序
def encodePostStr(postData):
    keyDict = {'key':'secretKEY'}
    mergeDict = dict(postData, **keyDict)
    mergeDict = sorted(mergeDict.items())
    postStr = ''
    for i in mergeDict:
        postStr = postStr + i[0] + '=' + i[1] + '&'
    postStr = postStr[:-1]
    hashobj = hashlib.sha1()
    hashobj.update(postStr.encode('utf-8'))
    token = hashobj.hexdigest()
    postData['token'] = token
    return desEncode(postData)

#傳送通知郵件
def sendMail(text):
    sender = '
[email protected]
'  
    receiver = ['[email protected]']
    mailToCc = ['[email protected]']
    subject = '[AutomantionTest]介面自動化測試報告通知'  
    smtpserver = 'smtp.exmail.qq.com'  
    username = '[email protected]'  
    password = 'password'  
    
    msg = MIMEText(text,'html','utf-8')      
    msg['Subject'] = subject  
    msg['From'] = sender
    msg['To'] = ';'.join(receiver)
    msg['Cc'] = ';'.join(mailToCc)
    smtp = smtplib.SMTP()  
    smtp.connect(smtpserver)  
    smtp.login(username, password)  
    smtp.sendmail(sender, receiver + mailToCc, msg.as_string())  
    smtp.quit() 

def main():
    errorTest = runTest('TestCase/TestCasePre.xlsx')
    if len(errorTest) > 0:
        html = '<html><body>介面自動化定期掃描,共有 ' + str(len(errorTest)) + ' 個異常介面,列表如下:' + '<table><tr><th style="width:100px;">介面</th><th style="width:50px;">狀態</th><th style="width:200px;">介面地址</th><th>介面返回值</th></tr>'
        for test in errorTest:
            html = html + '<tr><td>' + test[0] + '</td><td>' + test[1] + '</td><td>' + test[2] + '</td><td>' + test[3] + '</td></tr>'
        html = html + '</table></body></html>'
        #sendMail(html)

if __name__ == '__main__':
    main()