1. 程式人生 > >Python佇列及在微信機器人中的應用

Python佇列及在微信機器人中的應用

本文來源於i春秋學院,未經允許嚴禁轉載。

最近打算更新微信機器人,發現機器人的作者將程式碼改進了很多,但去掉了sqlite資料庫,需要自己根據需求設計資料庫,跟作者溝通得到的建議是為了防止訊息併發導致資料庫死鎖,建議另開一個程序讀寫資料庫,將訊息加入一個佇列中,因為對Python瞭解有限,佇列和多執行緒更不是我擅長的內容,於是最近瘋狂Google、百度,探索著實現了此功能。寫此文記錄下基本概念和實現方法

0x00 Python佇列
佇列是執行緒中交換資料的形式。
建立一個佇列物件

import Queue

q = Queue.Queue(maxsize = 10) #maxsize是佇列長度,不限制長度可以不不賦值


將一個值放入佇列

q.put(10)


將一個值從佇列取出

q.get()


三種佇列及建構函式

  • FIFO佇列先進先出: class Queue.Queue(maxsize)
  • LIFO類似於堆,即先進後出: class Queue.LifoQueue(maxsize)
  • 優先順序佇列: class Queue.PriorityQueue(maxsize)
     

佇列的常用方法

q.qsize() #返回佇列的大小

q.empty() #如果佇列為空,返回True,反之False

q.full() #如果佇列滿了,返回True,反之False

q.full #與 maxsize 大小對應

q.get([block[, timeout]
]) #獲取佇列,block:是否阻塞等待,timeout等待時間 q.get_nowait() #相當q.get(False) q.put(item[, block[, timeout]) #非阻塞寫入佇列,block:是否阻塞等待,timeout等待時間 q.put_nowait(item) #相當q.put(item, False) q.task_done() #在完成一項工作之後,q.task_done() 函式向任務已經完成的佇列傳送一個訊號 q.join() #等到佇列為空,再執行別的操作


幾個例子

一個執行緒往佇列裡寫入隨機數,另一個執行緒從佇列裡取數字(阻塞等待)

#!/usr/bin/env python
#coding:utf8 import random,threading,time from Queue import Queue #Producer thread classProducer(threading.Thread):     def__init__(self, t_name, queue):         threading.Thread.__init__(self,name=t_name)         self.data=queue     defrun(self):         for i in range(10):  #隨機產生10個數字 ,可以修改為任意大小             randomnum=random.randint(1,99)             print "%s: %s 生成了一個數字 %d 並把它扔進了佇列!" % (time.ctime(), self.getName(), randomnum)             self.data.put(randomnum)  #將資料依次存入佇列             time.sleep(1)         print "%s: %s finished!" %(time.ctime(), self.getName()) #Consumer thread classConsumer_all(threading.Thread):     def__init__(self, t_name, queue):         threading.Thread.__init__(self, name=t_name)         self.data = queue     defrun(self):         while 1:             try:                 # print self.data                 val_even = self.data.get()                  print "%s: %s 從佇列裡取出了 %d !" % (time.ctime(), self.getName(), val_even)                 time.sleep(1)             except  Exception, e:                  print '取資料失敗'                 continue #Main thread defmain():     queue = Queue()     producer = Producer('Pro.', queue)     consumer_all = Consumer_all('Con_all.', queue)     producer.start()     consumer_all.start() if __name__ == '__main__':     main()

執行結果

Wed Aug  3 00:04:18 2016: Pro. 生成了一個數字 74 並把它扔進了佇列!

Wed Aug  3 00:04:18 2016: Con_all. 從佇列裡取出了 74 !

Wed Aug  3 00:04:19 2016: Pro. 生成了一個數字 77 並把它扔進了佇列!

Wed Aug  3 00:04:19 2016: Con_all. 從佇列裡取出了 77 !

Wed Aug  3 00:04:20 2016: Pro. 生成了一個數字 63 並把它扔進了佇列!

Wed Aug  3 00:04:20 2016: Con_all. 從佇列裡取出了 63 !

Wed Aug  3 00:04:21 2016: Pro. 生成了一個數字 96 並把它扔進了佇列!

Wed Aug  3 00:04:21 2016: Con_all. 從佇列裡取出了 96 !

Wed Aug  3 00:04:22 2016: Pro. 生成了一個數字 82 並把它扔進了佇列!

Wed Aug  3 00:04:22 2016: Con_all. 從佇列裡取出了 82 !

Wed Aug  3 00:04:23 2016: Pro. 生成了一個數字 19 並把它扔進了佇列!

Wed Aug  3 00:04:23 2016: Con_all. 從佇列裡取出了 19 !

Wed Aug  3 00:04:24 2016: Pro. 生成了一個數字 56 並把它扔進了佇列!

Wed Aug  3 00:04:24 2016: Con_all. 從佇列裡取出了 56 !

Wed Aug  3 00:04:25 2016: Pro. 生成了一個數字 57 並把它扔進了佇列!

Wed Aug  3 00:04:25 2016: Con_all. 從佇列裡取出了 57 !

Wed Aug  3 00:04:26 2016: Pro. 生成了一個數字 42 並把它扔進了佇列!

Wed Aug  3 00:04:26 2016: Con_all. 從佇列裡取出了 42 !

Wed Aug  3 00:04:27 2016: Pro. 生成了一個數字 7 並把它扔進了佇列!

Wed Aug  3 00:04:27 2016: Con_all. 從佇列裡取出了 7 !

Wed Aug  3 00:04:28 2016: Pro. finished!

如果將入隊的時間間隔修改,出隊的程式將阻塞執行,將“ time.sleep(1)”改為“ time.sleep(10)”,再次執行

Wed Aug  3 00:10:46 2016: Pro. 生成了一個數字 45 並把它扔進了佇列!

Wed Aug  3 00:10:46 2016: Con_all. 從佇列裡取出了 45 !

Wed Aug  3 00:10:56 2016: Pro. 生成了一個數字 75 並把它扔進了佇列!

Wed Aug  3 00:10:56 2016: Con_all. 從佇列裡取出了 75 !

Wed Aug  3 00:11:06 2016: Pro. 生成了一個數字 26 並把它扔進了佇列!

Wed Aug  3 00:11:06 2016: Con_all. 從佇列裡取出了 26 !

Wed Aug  3 00:11:16 2016: Pro. 生成了一個數字 67 並把它扔進了佇列!

Wed Aug  3 00:11:16 2016: Con_all. 從佇列裡取出了 67 !

Wed Aug  3 00:11:26 2016: Pro. 生成了一個數字 60 並把它扔進了佇列!

Wed Aug  3 00:11:26 2016: Con_all. 從佇列裡取出了 60 !

Wed Aug  3 00:11:36 2016: Pro. 生成了一個數字 83 並把它扔進了佇列!

Wed Aug  3 00:11:36 2016: Con_all. 從佇列裡取出了 83 !

Wed Aug  3 00:11:46 2016: Pro. 生成了一個數字 33 並把它扔進了佇列!

Wed Aug  3 00:11:46 2016: Con_all. 從佇列裡取出了 33 !

Wed Aug  3 00:11:56 2016: Pro. 生成了一個數字 65 並把它扔進了佇列!

Wed Aug  3 00:11:56 2016: Con_all. 從佇列裡取出了 65 !

Wed Aug  3 00:12:06 2016: Pro. 生成了一個數字 44 並把它扔進了佇列!

Wed Aug  3 00:12:06 2016: Con_all. 從佇列裡取出了 44 !

Wed Aug  3 00:12:16 2016: Pro. 生成了一個數字 28 並把它扔進了佇列!

Wed Aug  3 00:12:16 2016: Con_all. 從佇列裡取出了 28 !

Wed Aug  3 00:12:26 2016: Pro. finished!

會發現當佇列沒有值的時候程式會阻塞等待佇列有值才繼續執行。
稍微修改程式,一個執行緒往佇列裡寫入隨機數,另一個執行緒從佇列裡取數字(非阻塞等待)

#!/usr/bin/env python

#coding:utf8

import random,threading,time

from Queue import Queue

#Producer thread

classProducer(threading.Thread):

    def__init__(self, t_name, queue):

        threading.Thread.__init__(self,name=t_name)

        self.data=queue

    defrun(self):

        for i in range(10):  #隨機產生10個數字 ,可以修改為任意大小

            randomnum=random.randint(1,99)

            print "%s: %s 生成了一個數字 %d 並把它扔進了佇列!" % (time.ctime(), self.getName(), randomnum)

            self.data.put(randomnum)  #將資料依次存入佇列

            time.sleep(10)

        print "%s: %s finished!" %(time.ctime(), self.getName())



#Consumer thread

classConsumer_all(threading.Thread):

    def__init__(self, t_name, queue):

        threading.Thread.__init__(self, name=t_name)

        self.data = queue



    defrun(self):

        while 1:

            try:

                # print self.data

                val_even = self.data.get(1,5# get(self, block=True, timeout=None) ,1就是阻塞等待,5是超時5秒

                print "%s: %s 從佇列裡取出了 %d !" % (time.ctime(), self.getName(), val_even)

                time.sleep(1)

            except  Exception, e:  # 等待輸入,超過5秒  就報異常

                print '取資料失敗'

                continue



#Main thread

defmain():

    queue = Queue()

    producer = Producer('Pro.', queue)

    consumer_all = Consumer_all('Con_all.', queue)

    producer.start()

    consumer_all.start()



if __name__ == '__main__':

    main()
Wed Aug  3 00:20:58 2016: Pro. 生成了一個數字 57 並把它扔進了佇列!

Wed Aug  3 00:20:58 2016: Con_all. 從佇列裡取出了 57 !

取資料失敗

Wed Aug  3 00:21:08 2016: Pro. 生成了一個數字 83 並把它扔進了佇列!

Wed Aug  3 00:21:08 2016: Con_all. 從佇列裡取出了 83 !

取資料失敗

Wed Aug  3 00:21:18 2016: Pro. 生成了一個數字 85 並把它扔進了佇列!

Wed Aug  3 00:21:18 2016: Con_all. 從佇列裡取出了 85 !

取資料失敗

Wed Aug  3 00:21:28 2016: Pro. 生成了一個數字 64 並把它扔進了佇列!

Wed Aug  3 00:21:28 2016: Con_all. 從佇列裡取出了 64 !

取資料失敗

Wed Aug  3 00:21:38 2016: Pro. 生成了一個數字 23 並把它扔進了佇列!

Wed Aug  3 00:21:38 2016: Con_all. 從佇列裡取出了 23 !

取資料失敗

Wed Aug  3 00:21:48 2016: Pro. 生成了一個數字 98 並把它扔進了佇列!

Wed Aug  3 00:21:48 2016: Con_all. 從佇列裡取出了 98 !

取資料失敗

Wed Aug  3 00:21:58 2016: Pro. 生成了一個數字 12 並把它扔進了佇列!

Wed Aug  3 00:21:58 2016: Con_all. 從佇列裡取出了 12 !

取資料失敗

Wed Aug  3 00:22:08 2016: Pro. 生成了一個數字 27 並把它扔進了佇列!

Wed Aug  3 00:22:08 2016: Con_all. 從佇列裡取出了 27 !

取資料失敗

Wed Aug  3 00:22:18 2016: Pro. 生成了一個數字 60 並把它扔進了佇列!

Wed Aug  3 00:22:18 2016: Con_all. 從佇列裡取出了 60 !

取資料失敗

Wed Aug  3 00:22:28 2016: Pro. 生成了一個數字 69 並把它扔進了佇列!

Wed Aug  3 00:22:28 2016: Con_all. 從佇列裡取出了 69 !

取資料失敗

Wed Aug  3 00:22:38 2016: Pro. finished!

取資料失敗

取資料失敗

取資料失敗

可以看出,取資料的執行緒在超過設定的等待時間後會丟擲異常並繼續往下執行。

0x01 使用佇列將微信機器人訊息存入MongoDB

使用上面的例子,稍作修改,將接收到的訊息扔進佇列,開啟另一個執行緒取資料,取到後將訊息格式化並存入資料庫

#!/usr/bin/env python

#coding:utf8

import threading,time,pymongo

from pymongo import MongoClient

from Queue import Queue

#訊息入隊

classMsgInQueue():

    def__init__(self, queue):

        threading.Thread.__init__(self)

        self.data=queue

    defputmsgqueue(self,msg):

        self.data.put(msg)

        print 'put msg in queue success:'+msg['Content']



###訊息出隊並存入資料庫

classMsgOutQueue2db(threading.Thread):

    def__init__(self, queue):

        threading.Thread.__init__(self)

        self.data = queue

        #建立MongoDB連線

        self.conn = MongoClient()

        #資料庫

        self.db = self.conn.wechatRobot

        #資料表

        self.messages = self.db.messages

    defrun(self):

        while 1:

            try:

                # print self.data

                #從佇列裡取訊息

                msg = self.data.get(1, 5# get(self, block=True, timeout=None) ,1就是阻塞等待,5是超時5秒

                print "%s: %s get %s from queue !" % (time.ctime(), self.getName(), msg['Content'].encode('utf-8'))

                try:

                    #格式化訊息資料

                    m = dict(groupname=msg['FromUserName'].encode('utf-8'),

                             time=msg['CreateTime'],

                             username=msg['ActualUserName'],

                             usernickname=msg['ActualNickName'].encode('utf-8'),

                             message=msg['Content'].encode('utf-8'),

                             messagetype=msg['MsgType']

                             )

                    print m

                    #存入資料庫

                    self.db.messages.insert(m)

                    time.sleep(1)

                except  Exception, e:

                    print e

                    continue

            except  Exception, e:

                continue

主執行緒

defcomplex_reply():

    queue = Queue()

    outqueue = MsgOutQueue2db(queue)#例項化出隊入庫類

    outqueue.start()#開啟執行緒



    @itchat.msg_register('Text', isGroupChat = True)

    deftext_reply(msg):

        # print itchat.__client.storageClass.groupDict

        print itchat.__client.storageClass.chatroomList

        print msg

        inqueue=MsgInQueue(queue)#例項化入隊類

        inqueue.putmsgqueue(msg)#訊息入隊

        if msg['isAt']:

            print msg

            itchat.send(u'@%s\u2005I received: %s'%(msg['ActualNickName'], msg['Content']), msg['FromUserName'])

訊息存入資料庫:

通過多執行緒和MongoDB的結合,有效防止訊息過多導致資料庫死鎖的問題,也更加模組化,可以根據真實需求更換其他資料庫。後面我將結合MongoDB插入、更新資料快的特點寫一下我如何設計群聊統計功能,在Python方面和MongoDB方面我都是小白,如有更好的建議請多指教,我們共同學習!有關Python多執行緒的課程請參考《python安全程式設計入門》