1. 程式人生 > >介面自動化指令碼實現(蟲師-Django介面測試例項)

介面自動化指令碼實現(蟲師-Django介面測試例項)

一、編寫指令碼前分析專案架構

需求:python指令碼實現【添加發佈會資訊】的介面測試,以郵件形式傳送測試報告

1.新建一個case目錄,存放測試用例

2.新建一個config目錄,存放配置資訊和讀取配置資訊

3.新建一個db_fixture目錄,初始化資料

4.新建report目錄,用來存放生成的測試報告

5.run_main.py檔案,執行介面測試指令碼

(一般還會新建一個common目錄,存放一些公共方法,例如讀取excel方法等)

 

二、編寫指令碼

1.config部分

(1)db_config.ini 資料庫配置資訊(此處需注意,這裡的資料庫需要與實際專案的資料庫一致)

# coding:utf-8
# 資料庫配置資訊

[mysqlconf]
host = 47.xx.xxx.xx
port = 3306
user = root
password = 123456
db = guest

 

(2) readDbConfig.py 讀取資料庫資訊,封裝清除資料和插入資料的方法,以便進行初始化資料的操作

#coding:utf-8

from pymysql import connect,cursors
from pymysql.err import OperationalError
import os
import configparser

# ================讀取db_config.ini檔案設定=================


cur_path = os.path.dirname(os.path.realpath(__file__))
configPath = os.path.join(cur_path,"db_config.ini")
cf = configparser.ConfigParser()
cf.read(configPath,encoding='UTF-8')

host = cf.get("mysqlconf","host")
port = cf.get("mysqlconf","port")
db = cf.get("mysqlconf","db")
user = cf.get("mysqlconf","user")
password = cf.get("mysqlconf","password")


# ================封裝MySQL基本操作=================
class DB:
    def __init__(self):
        try:
            # 連線資料庫
            self.conn = connect(
                host = host,
                user = user,
                password = password,
                db = db,
                charset = "utf8mb4",
                cursorclass = cursors.DictCursor
                )
        except OperationalError as e:
            print("Mysql Error %d:%s"%(e.args[0],e.args[1]))

    # 清除表資料
    def clear(self,tabel_name):
        real_sql = "delete from "+ tabel_name +";"
        with self.conn.cursor() as cursor:
            cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
            cursor.execute(real_sql)
        self.conn.commit()

    # 插入表資料
    def insert(self,table_name,table_data):
        for key in table_data:
            table_data[key] = "'"+str(table_data[key])+"'"
        key = ','.join(table_data.keys())
        value = ",".join(table_data.values())
        real_sql = "INSERT INTO " + table_name + " (" + key + ") VALUES (" + value +")" + ";"
        print(real_sql)
        with self.conn.cursor() as cursor:
            cursor.execute(real_sql)
        self.conn.commit()

    # 關閉資料庫連線
    def close(self):
        self.conn.close()

if __name__ == '__main__':
    db = DB()
    table_name = "sign_event"
    data = {"id":12,
            "name":"紅米",
            "limit2":2000,
            "status":1,
            "address":"北京會展中心",
            "start_time":"2018-10-30 08:00:00",
            }
    db.clear(table_name)
    db.insert(table_name,data)
    db.close()

 

(3) email_config.ini 郵箱資訊

# coding:utf-8
# 郵箱配置資訊

[email]
smtp_server = smtp.qq.com
port = 465
sender = [email protected]
psw = xxxxxxxx
receiver = [email protected]

 

(4) readEmailConfig.py 讀取郵件資訊

#coding:utf-8

# ================讀取cfg.ini檔案設定=================

import os
import configparser

# os.path.realpath(__file__):返回當前檔案的絕對路徑
# os.path.dirname(): 返回()所在目錄
cur_path = os.path.dirname(os.path.realpath(__file__)) # 當前檔案的所在目錄
configPath = os.path.join(cur_path,"email_config.ini") # 路徑拼接:/config/email_config.ini
conf = configparser.ConfigParser()
conf.read(configPath,encoding='UTF-8') # 讀取/config/email_config.ini 的內容

# get(section,option) 得到section中option的值,返回為string型別
smtp_server = conf.get("email","smtp_server")
sender = conf.get("email","sender")
psw = conf.get("email","psw")
receiver = conf.get("email","receiver")
port = conf.get("email","port")

2.db_fixture部分

(1)test_data.py 新增測試資料

# coding:utf-8

from config.readDbConfig import DB
# 建立測試資料
datas = {
    # 釋出會表資料
    "sign_event":[
        {"id":1,"name":"紅米Pro釋出會","limit2":2000,"status":1,"address":"北京會展中心","start_time":"2018-10-31 08:00:00"},
        {"id":2,"name":"可參加人數為0","limit2":0,"status":1,"address":"北京會展中心","start_time":"2018-10-31 08:00:00"},
        {"id":3,"name":"當前狀態為0關閉","limit2":2000,"status":0,"address":"北京會展中心","start_time":"2018-10-31 08:00:00"},
        {"id":4,"name":"釋出會已結束","limit2":2000,"status":1,"address":"北京會展中心","start_time":"2017-10-31 08:00:00"},
        {"id":5,"name":"小米5釋出會","limit2":2000,"status":1,"address":"北京會展中心","start_time":"2018-10-31 08:00:00"}
    ]
}

# 將測試資料插入表
def init_data():
    db = DB()
    for table,data in datas.items():
        db.clear(table)
        for d in data:
            db.insert(table,d)
    db.close()

if __name__ == '__main__':
    init_data()

 

3.case部分

(1)test_add_event.py 測試用例

# coding:utf-8
import unittest
import requests
import os,sys

cur_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0,cur_path)
from db_fixture import test_data

class TestAddEvent(unittest.TestCase):
    """添加發佈會"""
    def setUp(self):
        self.base_url = "http://47.xx.xxx.xx:8000/api/add_event/"

    def tearDown(self):
        print("")

    def test_add_event_all_null(self):
        """所有引數為空新增"""
        payload = {"eid":"","name":"","limit2":"","address":"","start_time":""}
        r = requests.post(self.base_url,data=payload)
        self.result = r.json()
        self.assertEqual(self.result["status"],10021)
        self.assertEqual(self.result["message"],"parameter error")

    def test_add_event_eid_exist(self):
        """id已經存在"""
        payload = {"eid":1,"name":"一加4釋出會","limit2":2000,"address":"深圳寶體","start_time":"2018-11-01 08:00:00"}
        r = requests.post(self.base_url, data=payload)
        self.result = r.json()
        self.assertEqual(self.result["status"], 10022)
        self.assertEqual(self.result["message"], "event id already exists")

    def test_add_event_name_exists(self):
        """名稱已經存在"""
        payload = {"eid": 88, "name": "紅米Pro釋出會", "limit2": 2000, "address": "深圳寶體", "start_time": "2018-11-01 08:00:00"}
        r = requests.post(self.base_url, data=payload)
        self.result = r.json()
        self.assertEqual(self.result["status"], 10023)
        self.assertEqual(self.result["message"], "event name already exists")

    def test_add_event_data_type_error(self):
        """日期格式錯誤"""
        payload = {"eid": 102, "name": "一加4釋出會", "limit2": 2000, "address": "深圳寶體", "start_time": "2018"}
        r = requests.post(self.base_url, data=payload)
        self.result = r.json()
        self.assertEqual(self.result["status"], 10024)


    def test_add_event_success(self):
        """新增成功"""
        payload = {"eid": 88, "name": "孫小二釋出會", "limit2": 2000, "address": "深圳寶體", "start_time": "2018-11-01 08:00:00"}
        r = requests.post(self.base_url, data=payload)
        self.result = r.json()
        print(self.result)
        self.assertEqual(self.result["status"], 200)


if __name__ == '__main__':
    test_data.init_data() # 初始化介面測試資料
    unittest.main()

 

4.run_main.py 執行指令碼

# coding:utf-8
import os
import unittest
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from db_fixture import test_data
import HTMLTestRunnerCN_test
from config import readEmailConfig

# 當前指令碼所在檔案真實路徑
cur_path = os.path.dirname(os.path.realpath(__file__))

def add_case(caseName="case",rule="test_*.py"):
    """第一步:載入所有測試用例"""
    case_path = os.path.join(cur_path,caseName) # 用例資料夾
    # 資料夾不存在就建立一個資料夾
    if not os.path.exists(case_path):os.mkdir(case_path)

    # 定義discover載入所有測試用例
    # case_path:執行用例的目錄;pattern:匹配指令碼名稱的規則;top_level_dir:預設為None
    discover = unittest.defaultTestLoader.discover(case_path,pattern=rule,top_level_dir=None)
    return discover

def run_case(all_case,reportName="report"):
    """第二步:執行所有的用例,並把結果寫入到html測試報告中"""
    now = time.strftime("%Y_%m_%d_%H_%M_%S")
    report_path = os.path.join(cur_path,reportName)
    if not os.path.exists(report_path):os.mkdir(report_path)
    report_abspath = os.path.join(report_path,now+"result.html")
    print("report path:%s"%report_abspath)

    fp = open(report_abspath,"wb")
    runner = HTMLTestRunnerCN_test.HTMLTestRunner(stream=fp,title="自動化介面測試報告,測試結果如下:",
                                           description="用例執行情況")
    # 呼叫add_case函式
    runner.run(all_case)
    fp.close()


def get_report_file(report_path):
    """第三步:獲取最新的測試報告"""
    lists = os.listdir(report_path)
    lists.sort(key=lambda fn:os.path.getmtime(os.path.join(report_path,fn)))
    print(u"最新測試生成的報告:"+lists[-1])
    # 找到生成最新的報告檔案
    report_file = os.path.join(report_path,lists[-1])
    return report_file

def send_mail(sender,psw,receiver,smtpserver,report_file,port):
    """第四步:傳送最新的測試報告內容"""
    with open(report_file,"rb") as f:
        mail_body = f.read()

    # 定義郵件內容
    msg = MIMEMultipart()
    body = MIMEText(mail_body,_subtype="html",_charset="utf-8")
    msg["Subject"] = "自動化測試報告"
    msg["from"] = sender
    msg["to"] = receiver
    msg.attach(body)

    # 新增附件
    att = MIMEText(open(report_file,"rb").read(),"base64","utf-8")
    att["Content-Type"] = "application/octet-stream"
    att["Content-Disposition"] = "attachment;filename = 'report.html'"
    msg.attach(att)
    try:
        smtp = smtplib.SMTP_SSL(smtpserver,port)
    except:
        smtp = smtplib.SMTP()
        smtp.connect(smtpserver,port)

    # 使用者名稱密碼
    smtp.login(sender,psw)
    smtp.sendmail(sender,receiver,msg.as_string())
    smtp.quit()
    print("test report email has send out")

if __name__ == '__main__':

    test_data.init_data() # 初始化介面測試資料

    all_case = add_case() # 載入用例
    run_case(all_case)  # 執行用例

    report_path = os.path.join(cur_path,"report")
    report_file = get_report_file(report_path)

    # 郵箱配置,郵箱資訊獲取
    sender = readEmailConfig.sender
    psw = readEmailConfig.psw
    smtp_server = readEmailConfig.smtp_server
    port = readEmailConfig.port
    receiver = readEmailConfig.receiver
    send_mail(sender,psw,receiver,smtp_server,report_file,port) # 呼叫傳送郵件方法

三、執行指令碼

1.執行run_main.py 指令碼會在report目錄生成一個html檔案

2.用例全部執行通過