1. 程式人生 > >python 結合redis 佇列 做一個例子

python 結合redis 佇列 做一個例子

結合redis 佇列 做了一個例子

#!/usr/bin/env python
# coding: utf-8
# @Time    : 2018/12/21 0021 13:57
# @Site    : 
# @File    : demos.py
# @Software: PyCharm
import MySQLdb
import redis
import json
import os, time
import threading
from multiprocessing import Pool, Process
import os, time, random
import sys

reload(sys)
sys.setdefaultencoding('utf8')


class InsertData():
    def __init__(self):
        # 去掉一些無用資訊
        self.__list_industry = []
        self.__has_many = []
        self.__list_xczx = []
        self.__list_cxcy = []
        self.__list_industry_dict = {'test': self.__list_xczx }
        self.__dict_industry = {'test': 212}
        self.db = MySQLdb.connect(host="127.0.0.1", port=3306, user="root", passwd="123456", db="ww",
                                  charset='utf8')
        redisPool = redis.ConnectionPool(host='localhost', port=6379)
        self.re_queue = redis.Redis(connection_pool=redisPool)
        self.re_queue2 = redis.Redis(connection_pool=redisPool)

    def __get_dict_industry(self):
        industry_name = self.__list_industry_dict.keys()
        if len(industry_name) == 1:
            industry_name = str(tuple(industry_name)).replace(",","")
        elif len(industry_name) > 1:
            industry_name = str(tuple(industry_name))
        else:
            return
        sql_industry = "select industry_name,industry_id from zzh_industry where industry_name in {}".format(industry_name)
        cursor3 = self.db.cursor()
        cursor3.execute(sql_industry)
        result_list = cursor3.fetchall()
        for result in result_list:
            self.__dict_industry[result[0]] = result[1]
        cursor3.close()

    def inser_industry(self):
        dta = """xx、xxx"""
        data = dta.split("、")
        for index, da in enumerate(data):
            industry_code = 100001 + index
            sqlStr = """insert into xx(industry_name,industry_pid,industry_code,industry_sort,is_lock) VALUES('{industry_name}',211,'{industry_code}',{industry_sort},1) ;""".format(
                industry_name=da, industry_code=industry_code, industry_sort=index + 1)
            print sqlStr

    def put_redis(self):
        cursor = self.db.cursor()
        item_sql = """SELECT item_title,item_id from xxx"""
        cursor.execute(item_sql)
        result_list = cursor.fetchall()
        num = 1
        for result in result_list:
            data = {"itemTitle": result[0], "itemId": result[1]}
            self.re_queue.lpush("item", json.dumps(data))
            num += 1
        print ("put over", num)

    def get_redis(self):
        nums = 1
        resultNum = 0
        cursor_get = self.db.cursor()

        while True:
            result = self.re_queue.rpop("item")
            if not result:
                time.sleep(1)
                if resultNum == 10:
                    break
                else:
                    print "resultNum", resultNum
                    resultNum += 1
                continue
            try:
                resultNum = 0
                result = json.loads(result)
                value_list = []
                for strkey in self.__list_industry_dict.keys():
                    if strkey in self.__has_many:
                        for __strkey in self.__list_industry_dict[strkey]:
                            if __strkey in result["itemTitle"]:
                                value_list.append(strkey)
                                break
                    if strkey in result["itemTitle"]:
                        value_list.append(strkey)
                value_list = set(value_list)
                item_id = result["itemId"]
                if value_list:
                    print result["itemTitle"]
                for value in value_list:
                    nums += 1
                    # select_sql = "select id from zzh_industry_item where item_id={} and industry_id={} limit 1".format(item_id,self.__dict_industry[value])
                    # cursor_get.execute(select_sql)
                    # if cursor_get.fetchone():
                    #     print ("reseat",select_sql)
                    #     continue
                    sql_insert = "insert into zzh_industry_item(item_id,industry_id)values ({item_id},{industry_id})".format(
                        item_id=item_id, industry_id=self.__dict_industry[value])
                    self.re_queue2.lpush("sqls", str(sql_insert))
            except Exception as e:
                print e
        cursor_get.close()
        print ("put over")

    def test(self):
        cursor2 = self.db.cursor()
        count = 0
        breakNum = 0
        num = 0
        try:
            while True:
                sql = self.re_queue2.rpop("sqls")
                if sql:
                    num += 1
                    breakNum = 0
                    print sql
                    try:
                        cursor2.execute(sql)
                        if count == 500:
                            self.db.commit()
                            count = 0
                        else:
                            count += 1
                    except Exception as e:
                        print e
                if not sql:
                    time.sleep(1)
                    if breakNum == 10:
                        break
                    else:
                        print "breakNum", breakNum
                        breakNum += 1
        finally:
            print ("insertSql", num)
            self.db.commit()
            self.db.close()


if __name__ == '__main__':
    items = InsertData()
    print('Parent process %s.' % os.getpid())
    t1 = threading.Thread(target=items.put_redis)
    t2 = threading.Thread(target=items.get_redis)
    t3 = threading.Thread(target=items.test)
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()