1. 程式人生 > >介面返回資料與資料資料的對比、資料表之間的拆分重組對比

介面返回資料與資料資料的對比、資料表之間的拆分重組對比

import pymysql
import datetime
import json
import decimal
import requests
import warnings

"""
超市供應鏈資料測試:1、product表和supplier表拆分重組與supply_introduce表對比
                    2、介面返回資料和supply_introduce表資料是否一致
                    ps:測試賬號擁有所有區域、所有商品品類許可權
"""
class DecimalEncoder(json.JSONEncoder):
    def default(self,obj):
        if isinstance(obj,decimal.Decimal):#decimal型別轉換,使其可以轉換為json格式資料
            return float(obj)
        elif isinstance(obj,datetime.datetime):#datetime型別轉換,使其可以轉換為json格式資料
            return obj.__str__()
        return super(DecimalEncoder,self).default(obj)

class RUNSQL(object):
    def run_sql(self):
        db=pymysql.connect(host='192.168.147.192',port=3306,user='order_base',password='JMZet0h0fmKsTt2n',db='supply_chain',charset='utf8')
        self.db_cursor1=db.cursor()
        self.db_cursor2=db.cursor()
        self.db_cursor3=db.cursor()
        self.db_cursor4=db.cursor()

        self.db_cursor1.execute("SELECT id,cooperate_area_ids FROM `supplier`;")
        self.db_cursor2.execute("SELECT id,supplier_id FROM `product`;")
        self.db_cursor3.execute("SELECT product_id,area_id FROM `supply_introduce`;")
        self.db_cursor4.execute("SELECT product_id,area_id,product_status FROM supply_introduce;")

        db.close()

    def run_data(self):
        self.run_sql()
        sup_data = []
        new_sup_data=[]
        #操作supplier表
        for i in self.db_cursor1.fetchall():
            if '1' in i[1] and '2' not in i[1]:
                str_s=json.dumps(i[1],cls=DecimalEncoder,ensure_ascii=False)
                str_s=str_s.replace('1','2')
                sup_data.append([i[0],str_s.strip('"')])
            elif '1'in i[1]and '2'in i[1]:
                str_s = json.dumps(i[1], cls=DecimalEncoder, ensure_ascii=False)
                if '1,' in i[1]:
                    str_s1=str_s.replace('1,','')
                    sup_data.append([i[0], str_s1.strip('"')])
                if ',1' in i[1]:
                    str_s2=str_s.replace(',1','')
                    sup_data.append([i[0],str_s2.strip('"')])
            else:
                sup_data.append([i[0],i[1]])

        # print(data)
        # print(len(data))

        for line in sup_data:
            if ','in line[1]:
                lines=line[1].split(',')
                # print(lines)
                for j in lines:
                    new_sup_data.append([line[0],j])
            else:
                new_sup_data.append([line[0],line[1]])

        # print(new_sup_data)
        # print(len(new_sup_data))

        #操作product表,先驗證supplier表和product表的關聯id是否都存在
        a,b=[],[]
        sql2=self.db_cursor2.fetchall()
        for i in sql2:
            a.append(i[1])
        for i in new_sup_data:
            b.append(i[0])
        for i in a:
            if i not in b:
                print('supplier表中不存在的id:',i)
        for i in b:
            if i not in a:
                print('product表中不存在的supplier_id:',i)

        pro_data=[]
        for i in sql2:
            for line in new_sup_data:
                if i[1]==line[0]:#存在相等就新增至列表
                    # pro_data.append([i[0],line[0],line[1]])
                    pro_data.append([i[0],line[1]])
                else:
                    pass

        # print(pro_data)
        # print(len(pro_data))

        #操作supply_introduce表
        supin_data=[]
        for i in self.db_cursor3.fetchall():
            supin_data.append([i[0],str(i[1])])
        # print(supin_data)
        # print(len(supin_data))

        #對比資料
        for i in pro_data:
            if i not in supin_data:
                print('pro_data對比supin_data(supply_introduce中不含的資料):',i)
            else:
                pass

        for i in supin_data:
            if i not in pro_data:
                print('supin_data對比pro_data(product表中不含的資料):',i)
            else:
                pass

        #管理後臺資料與supply_introduce表資料對比
        supin_datas=[]
        for i in self.db_cursor4.fetchall():
            supin_datas.append([i[0],str(i[1]),i[2]])

        warnings.filterwarnings('ignore')
        url = 'https://dev-hlj.rainbowcn.com/api/login-adapter/user/login'
        data = {
            "account": "127458",
            "password": "LU123456"
        }
        session = requests.session()  # 保持登陸狀態
        session.post(url, verify=False, json=data)
        # url_text=session.post(url,json=data)
        # print(json.dumps(url_text.json(),indent=4,ensure_ascii=False,sort_keys=True))
        # Token=url_text.json()['data']['token']
        # Token={'token':Token}
        # return Token
        all_data = []
        url_1 = 'https://dev-hlj.rainbowcn.com/scm-admin-api/supplier/listByPager'
        url_text_1 = session.post(url_1, verify=False, json={})
        for i in range(1, int(url_text_1.json()["pagerInfo"]["pages"]) + 1):
            url_text_all = session.post(url_1, verify=False, json={"pageNum": i})
            for j in range(len(url_text_all.json()["pagerInfo"]["list"])):
                # print(url_text_all.json()["pagerInfo"]["list"][j]["productId"],url_text_all.json()["pagerInfo"]["list"][j]["cooperateAreaIds"],url_text_all.json()["pagerInfo"]["list"][j]["cooperateStatus"])
                all_data.append([url_text_all.json()["pagerInfo"]["list"][j]["productId"],
                                 url_text_all.json()["pagerInfo"]["list"][j]["cooperateAreaIds"],
                                 url_text_all.json()["pagerInfo"]["list"][j]["cooperateStatus"]])

        url_text_2 = session.post(url_1, verify=False, json={"cooperateStatus": 100})
        for i in range(1, int(url_text_2.json()["pagerInfo"]["pages"]) + 1):
            url_text_all = session.post(url_1, verify=False, json={"cooperateStatus": 100, "pageNum": i})
            for j in range(len(url_text_all.json()["pagerInfo"]["list"])):
                # print(url_text_all.json()["pagerInfo"]["list"][j]["productId"],url_text_all.json()["pagerInfo"]["list"][j]["cooperateAreaIds"],url_text_all.json()["pagerInfo"]["list"][j]["cooperateStatus"])
                all_data.append([url_text_all.json()["pagerInfo"]["list"][j]["productId"],
                                 url_text_all.json()["pagerInfo"]["list"][j]["cooperateAreaIds"],
                                 url_text_all.json()["pagerInfo"]["list"][j]["cooperateStatus"]])

        # print(sorted(all_data))
        # print(sorted(supin_datas))

        if sorted(all_data)==sorted(supin_datas):
            print('管理後臺資料與supply_introduce表資料一致')
        else:
            if len(all_data)>=len(supin_datas):
                for i in all_data:
                    if i not in supin_datas:
                        print('supply_introduce表無資料:',i)
            else:
                for i in supin_datas:
                    if i not in all_data:
                        print('管理後臺無資料:',i)


if __name__=='__main__':
    s=RUNSQL()
    s.run_data()