1. 程式人生 > >程序間通訊:佇列,管道,檔案,共享記憶體,訊號量,事件,互斥鎖,socket

程序間通訊:佇列,管道,檔案,共享記憶體,訊號量,事件,互斥鎖,socket

2017/11/4 程序間通訊,程序池
程序間通訊(IPC,inter-process communication):生產程序生產食物,消費程序購買食物,消費程序一直監視生產狀況,只要一有食物就將其取出來,如果取到食物None,兩者關係結束,於是主程序也結束。
遠端過程呼叫協議(remote procedure call protocal),需要某些傳輸協議
一般情況下 父程序會等子程序結束再結束


=====================================================================
1.程序間通訊常用方式:
程序間傳遞資料
(1)佇列:
from multiprocessing import Queue(multiprocessing.Queue)
(前一章:from queue import PriorityQueue


A.from multiprocessing import Queue(multiprocessing.Queue):
在多執行緒中傳遞資料,交換資料,進行通訊;
它與一般的資料結構中的佇列的差別為更加符合多程序的特性:序列化和反序列化;
對資料做了一些通訊商的加工,保證多程序的安全性;
在這裡使用多程序的資料通訊,不需要考慮多程序可能帶來的安全隱患,直接使用;
multiprocessing.Queue在程序間通訊是python比價提倡的一種方式


B.from queue import PriorityQueue(queue.PriorityQueue):
儲存資料的一種資料結構


(2)管道:
管道中,子程序間只能單向通訊,一個子程序在定義管道後只能get或者put;
conn1只能接訊息,conn2只能傳送訊息,不用的一端close(原因在於單快取),如果要實現雙向通訊,就設定兩根管道
但是佇列中子程序既可以get也可以put;
匿名管道:只能在父子程序中使用
命名管道:沒有沁園關係的程序之間也可以通訊


(3)檔案:
open,read,write,seek,tell,close
mode:r,w,a,b
with open('test.txt') as f:  #該操作方式可以自行關閉檔案 不必f.close()
早期時各種硬體裝置都是私有介面,unix系統將裝置檔案化,用r,w,a,b等api介面對裝置進行操作
(4)共享記憶體
-------------------------------
程序間傳遞訊號
(5)訊號量
(6)事件
(7)互斥量
from multiprocessing import Lock
lock.acquire() 
money.value += 1      #注意鎖的粒度,儘快最小話,開銷最小;粒度最小原則
lock.release()
--------------------------------
(8)Socket




=====================================================================
作業:
(1)佇列實現生產者消費者模型;生產者生產[0,1,2,3,4,5,6,7,8,9,10]
消費者能夠在處理完list序列後退出
(2)使用互斥鎖去完成銀行存取款操作,存款10000,A取10次,每次取100;B存5次,每次存200
(3)面向物件方法實現第一個作業的第二題myTimer


(1)
from multiprocessing import Queue
from multiprocessing import Process
import time


def consumer(input_q): 
     while True:
         item = input_q.get()
         if item == None:               
             break
         print(item)


def producer(sequence,output_p):
     for i in sequence:
         time.sleep(1)
         output_p.put(i)


if __name__ == '__main__':
     q= Queue()
     con_p = Process(target=consumer,args=(q,))  
     con_p.start()
     sequence =[1,2,3,4,5,6,7,8,9,10]
     producer(sequence,q)  
     q.put(None) 


(2)
from multiprocessing import Process
from multiprocessing import Value   #在不同的程序間共享的變數
from multiprocessing import Lock
import time


def deposit(money,lock):    #存錢
    for i in range(5):
        time.sleep(0.001)
        lock.acquire()   #上鎖的操作
        money.value += 200
        lock.release()


def withdraw(money,lock):   #取錢
    for i in range(10):
        time.sleep(0.001)
        lock.acquire()   
        money.value -= 100
        lock.release()


if __name__ == '__main__':
    money = Value('i',2000) 
    lock = Lock() 


    d = Process(target=deposit,args=(money,lock))
    d.start()
    w = Process(target=withdraw,args=(money,lock))
    w.start()
    d.join()
    w.join()


    print(money.value)  


(3)
from multiprocessing import Process
import time
import os


class Proces:
    def __init__(set,interval):
        self.interval = interval


    def new_start(self):
        p = Process(target = self.run,args =())
        p.start()


        print('child process id is %d,name is %s' % (p.pid,p.name))
        
        print(os.getpriority(os.PRIO_PROCESS,p.pid))
        os.setpriority(os.PRIO_PROCESS,p.pid,1)        #獲取程序改優先順序
        print(os.getpriority(os.PRIO_PROCESS,p.pid))


        p.join()
        print('main process')


class Timer(Proces):
    def __init__(self,interval):
        self.interval = interval


    def run(self):  #列印時間的方法
        for i in range(5):
            time.sleep(self.interval)
            print(time.ctime()+'have a rest')
            
if __name__ == '__main__':
    t = Timer(2) #建立定時器

    t.new_start()   #繼承process  do not call run   父類中函式會自動呼叫這個run方法

================================================================================================

相關程式碼:

設定優先順序1-20,做一個模擬os的排程程式:某一時刻,四個程序:
Watch TV:          random優先順序  import rnadom
Listen to music:   random優先順序
Print Doc:         4
Write Doc:         4
建立佇列,資訊放進佇列,模擬佇列的排程(觀察取數情況,驗證時間片)


我的解法:
from queue import PriorityQueue 
# import random
# class Item(object):


#     def __init__(self,name,level):      #self 可以將屬性寫進去
#         self.name = name
#         self.level = level


#     def __repr__(self):
#         return (str(self.name) + ':' + str(self.level))


#     def __lt__(self,other):      #小於的比較 重寫
#         return self.level > other.level


# if __name__ == '__main__':
#     q = PriorityQueue()
#     q.put(Item('watch tv',random.randint(1,20)))
#     q.put(Item('music',random.randint(1,20)))
#     q.put(Item('print',4))
#     q.put(Item('write',4))


#     while not q.empty():
#         print(q.get())


#===============================================================================================
多程序定期器:每一個小時,你的定時器提示你:不要coding,休息一下吧!
提示的同時顯示當前程序的pid,name,os模組設定優先順序


我的解法:


# from multiprocessing import Process
# import time


# def getTime(interval):
#     while True:
#         time.sleep(interval)
#         print(time.ctime())


# if __name__ == '__main__':
#     p = Process(target=getTime,args=(1,))
#     p.start()
#     print(p.name,p.pid)   #子程序 子程序的id


#     p.join()   #等待子程序結束 實際是阻塞
#     print('ending')    #由於子程序無線迴圈,由於join(),導致主程序無法執行,等待子程序結束,形成阻塞


#===============================================================================================


#顯示程序id,優先順序
# from multiprocessing import Process
# import time
# import os


# def getTime(interval):
#     while True:
#         time.sleep(interval)
#         print(time.ctime())


# if __name__ == '__main__':
#     p = Process(target=getTime,args=(1,))
#     p.start()
#     print('子程序的名字和id:',p.name,p.pid)   #子程序 子程序的id
    
#     print(os.getpriority(os.PRIO_PROCESS,p.pid))
#     os.setpriority(os.PRIO_PROCESS,p.pid,1)        #獲取程序改優先順序
#     print(os.getpriority(os.PRIO_PROCESS,p.pid))


#     p.join()   #等待子程序結束 實際是阻塞
#     print('ending')    #由於子程序無線迴圈,由於join(),導致主程序無法執行,等待子程序結束,形成阻塞


#===============================================================================================


#將定時器改成面向物件的形式@@@@@@@@@@@@@已經完成,在20171106程式碼檔案中
# from multiprocessing import Process
# import time
# import os


# class Timer(Process):
#     def __init__(self,interval):
#         self.sleep = interval


#     def run(self):  #列印時間的方法
            time.sleep(interval)
            print(time.ctime()+'have a rest')
            print(os.getpriority(os.PRIO_PROCESS,p.pid))
            os.setpriority(os.PRIO_PROCESS,p.pid,1)        #獲取程序改優先順序
            print(os.getpriority(os.PRIO_PROCESS,p.pid))


#     if __name__ == '__main__':
#         t = Timer(3600) #建立定時器
#         t.start()   #繼承process  do not call run   父類中函式會自動呼叫這個run方法


#===============================================================================================
#===============================================================================================


20171104作業 - 老師的解法


設定優先順序1-20,做一個模擬os的排程程式:某一時刻,四個程序:
Watch TV:          random優先順序  import rnadom
Listen to music:   random優先順序
Print Doc:         4
Write Doc:         4
建立佇列,資訊放進佇列,模擬佇列的排程(觀察取數情況,驗證時間片)




# from queue import PriorityQueue 
# import random
# class Item(object):


#     def __init__(self,name,level):      #self 可以將屬性寫進去
#         self.name = name
#         self.level = level


#     def __repr__(self):
#         return (str(self.name) + ':' + str(self.level))


#     def __lt__(self,other):      #小於的比較 重寫
#         return self.level > other.level


# if __name__ == '__main__':
#     q = PriorityQueue()
#     q.put(Item('watch tv',random.randint(1,20)))
#     q.put(Item('music',random.randint(1,20)))
#     q.put(Item('print',4))
#     q.put(Item('write',4))


#     while not q.empty():
#         print(q.get())


#     q.put(Item('print',4+q.qsize()))
#     q.put(Item('write',4+q.qsize()))


#===============================================================================================


20171104作業- 老師的解法


多程序定期器:每一個小時,你的定時器提示你:不要coding,休息一下吧!
提示的同時顯示當前程序的pid,name,os模組設定優先順序


# from multiprocessing import Process
# import time


# def getTime(interval):
#     while True:
#         time.sleep(interval)
#         print(time.ctime())


# if __name__ == '__main__':
#     p = Process(target=getTime,args=(1,))
#     p.start()
#     print(p.name,p.pid)   #子程序 子程序的id


#     p.join()   #等待子程序結束 實際是阻塞
#     print('ending')    #由於子程序無線迴圈,由於join(),導致主程序無法執行,等待子程序結束,形成阻塞


#==================================


#顯示程序id,優先順序
# from multiprocessing import Process
# import time
# import os


# def getTime(interval):
#     while True:
#         time.sleep(interval)
#         print(time.ctime())


# if __name__ == '__main__':
#     p = Process(target=getTime,args=(1,))
#     p.start()
#     print('子程序的名字和id:',p.name,p.pid)   #子程序 子程序的id
    
#     print(os.getpriority(os.PRIO_PROCESS,p.pid))
#     os.setpriority(os.PRIO_PROCESS,p.pid,1)        #獲取程序改優先順序
#     print(os.getpriority(os.PRIO_PROCESS,p.pid))


#     p.join()   #等待子程序結束 實際是阻塞
#     print('ending')    #由於子程序無線迴圈,由於join(),導致主程序無法執行,等待子程序結束,形成阻塞


#=================================


#將定時器改成面向物件的形式
# from multiprocessing import Process
# import time
# import os


# class Timer(Process):
#     def __init__(self,interval):
#         self.sleep = interval


#     def run(self):  #列印時間的方法
#             time.sleep(interval)
            # print(time.ctime()+'have a rest')
            # print(os.getpriority(os.PRIO_PROCESS,p.pid))
            # os.setpriority(os.PRIO_PROCESS,p.pid,1)        #獲取程序改優先順序
            # print(os.getpriority(os.PRIO_PROCESS,p.pid))


#     if __name__ == '__main__':
#         t = Timer(3600) #建立定時器
#         t.start()   #繼承process  do not call run   父類中函式會自動呼叫這個run方法




#===============================================================================================
#===============================================================================================


20171106課堂練習程式碼


#生產消費模型


# from multiprocessing import Queue
# from multiprocessing import Process
# import time


# #消費者模型
# def consumer(input_q):
#     time.sleep(2)
#     while not input_q.empty():
#         print(input_q.get())


# #生產者邏輯
# def producer(sequence,output_p):
#     for i in sequence:
#         output_p.put(i)






# if __name__ == '__main__':
#     q= Queue()
#     con_p = Process(target=consumer,args=(q,))
#     con_p.start()


#     sequence =[1,2,3,4,5]
#     producer(sequence,q)


#=================================


#生產消費模型
# from multiprocessing import Queue
# from multiprocessing import Process
# import time


# #消費者模型
# def consumer(input_q):
#     time.sleep(2)
#     while True:
#         if not input_q.empty():
#             print(input_q.get())
#         else:
#             break
# #生產者邏輯
# def producer(sequence,output_p):
#     for i in sequence:
#         output_p.put(i)






# if __name__ == '__main__':
#     q= Queue()
#     con_p = Process(target=consumer,args=(q,))
#     con_p.start()


#     sequence =[1,2,3,4,5]
#     producer(sequence,q)


#=================================


# #生產消費模型 
# from multiprocessing import Queue
# from multiprocessing import Process
# import time


# #消費者模型
# def consumer(input_q):  #input_q  宣告引數  從生產者那裡拿走物品(資料,佇列資料)
#     while True:
#         item = input_q.get()
#         if item == None:       #用None作為標誌,判斷生產者是否已經結束了生產
        
#             break
#         print(item)


# #生產者邏輯
# def producer(sequence,output_p):


#     for i in sequence:
#         time.sleep(1)
#         output_p.put(i)


# if __name__ == '__main__':
#     q= Queue()
#     con_p = Process(target=consumer,args=(q,))  #消費者程序 建立一個程序,程序的目標一定是個函式名,或者值類中的函式名即方法名,程序的引數
#     con_p.start()


#     sequence =[1,2,3,4,5]
#     # pro_p = Process(target=producer,args=(sequence,q))  #消費者程序
#     # pro_p.start()
#     producer(sequence,q)  #  生產者開始生產數字12345,放入佇列中
    
#     q.put(None)     #  向佇列中放入一個空值(或者說是生產一個空值),空值也是值,是一個特殊的值




#=================================




# #生產消費模型  佇列  演示一個子程序既可以put 也可以get
# from multiprocessing import Queue
# from multiprocessing import Process
# import time


# #消費者模型
# def consumer(input_q):  #input_q  宣告引數  從生產者那裡拿走物品(資料,佇列資料)
#     while True:
#         item = input_q.get()
#         if item == None:       #用None作為標誌,判斷生產者是否已經結束了生產
#             # input_q.put(6)
#             break
#         print(item)


# #生產者邏輯
# def producer(sequence,output_p):


#     for i in sequence:
#         time.sleep(1)
#         output_p.put(i)


# if __name__ == '__main__':
#     q= Queue()
#     con_p = Process(target=consumer,args=(q,))  #消費者程序 建立一個程序,程序的目標一定是個函式名,或者值類中的函式名即方法名,程序的引數
#     con_p.start()


#     sequence =[1,2,3,4,5]
#     # pro_p = Process(target=producer,args=(sequence,q))  #消費者程序
#     # pro_p.start()
#     producer(sequence,q)  #  生產者開始生產數字12345,放入佇列中
    
#     # con_p.join()    #一定等子程序結束,保證consumer也能向佇列中新增數字6 表明對列中子程序的雙方既可以put,也可以get,而管道只能單向,一個子程序在定義管道後只能get或者put
#     q.put(None)     #  向佇列中放入一個空值(或者說是生產一個空值),空值也是值,是一個特殊的值




#===============================================================================================


# #管道操作-僅瞭解
# from multiprocessing import Pipe
# from multiprocessing import Process


# #消費者模型
# def consumer(pipe):
#     (conn1,conn2) = pipe #兩個變數構成一個管道,管道是設定的引數pipe
#     conn2.close()        #關閉管道的傳送方!!!
#     while True:
#         try:
#             item = conn1.recv()
#             print(item)
#         except EOFError:         #檔案讀取錯誤
#             break
#     print('consumer ending done')
# #生產者邏輯
# def producer(sequence,sendPipe):
#     for i in sequence:
#         sendPipe.send(i)




# if __name__ == '__main__':
#     (conn1,conn2) = Pipe()  #建立管道,conn1是接收方  conn2是傳送方
#     con_p = Process(target=consumer,args=((conn1,conn2),)) #建立消費者子程序,把管道(元組)作為引數傳入;生產者為主程序
#     con_p.start()


#     conn1.close()  #關閉生產者的輸出管道
#     sequence =[1,2,3,4,5]   #這裡還要用到傳送方
#     producer(sequence,conn2)
#     conn2.close()






#=================================


# #管道操作- 僅瞭解
# from multiprocessing import Pipe
# from multiprocessing import Process




# #消費者模型
# def consumer(pipe):
#     (conn1,conn2) = pipe 
#     conn2.close()
#     while True:
#         try:
#             item = conn1.recv()
#         except EOFError:
#             conn1.close()
#             break
#         print(item)
#     print('consumer ending done')


# #生產者邏輯
# def producer(sequence,sendPipe):
#     for i in sequence:
#         sendPipe.send(i)




# if __name__ == '__main__':
#     (conn1,conn2) = Pipe()  #建立管道,conn1是接收方  conn2是傳送方
#     con_p = Process(target=consumer,args=((conn1,conn2),)) #建立消費者子程序,把管道(元組)作為引數傳入;生產者為主程序
#     con_p.start()


#     conn1.close()  #關閉生產者的輸出管道
#     sequence =[1,2,3,4,5]   #這裡還要用到傳送方
#     producer(sequence,conn2)
#     conn2.close()


#===============================================================================================


# #銀行存取款demo1 正常


# from multiprocessing import Process
# from multiprocessing import Value   #在不同的程序間共享的變數
# import time


# def deposit(money):    #存錢
#     for i in range(100):
#         money.value += 1


# def withdraw(money):   #取錢
#     for i in range(100):
#         money.value -= 1


# if __name__ == '__main__':
#     money = Value('i',2000) #共享變數 int 數值2000
#     d = Process(target=deposit,args=(money,))
#     d.start()
#     w = Process(target=withdraw,args=(money,))
#     w.start()
    
#     d.join()
#     w.join()


#     print(money.value)    #三個程序一起跑 不一定會執行詞句 需要等待子程序結束






#=================================


# #銀行存取款demo1  - 存取款異常


前提知識1:多程序不能訪問同一個全域性變數,原因是不同的程序使用的虛擬記憶體空間不同,而全域性變數實際是用的同一個記憶體空間,
因此需要 import Value 生成一個能夠在不同的程序間共享和操作的變數


前提知識2:對於time.sleep ,在一個程式中,子程序之間會搶佔資源,比如搶佔 value ,看a在睡醒的時刻是否能夠搶佔到資源value


# from multiprocessing import Process
# from multiprocessing import Value   #在不同的程序間共享的變數
# import time


# def deposit(money):    #存錢
#     for i in range(100):
#         time.sleep(0.1)
#         money.value += 1


# def withdraw(money):   #取錢
#     for i in range(100):
#         time.sleep(0.1)
#         money.value -= 1


# if __name__ == '__main__':
#     money = Value('i',2000) #共享變數 int 數值2000
#     d = Process(target=deposit,args=(money,))
#     d.start()
#     w = Process(target=withdraw,args=(money,))
#     w.start()
    
#     d.join()
#     w.join()


#     print(money.value)    #三個程序一起跑 不一定會執行詞句 需要等待子程序結束




#=================================


#銀行存取款demo2 - 存取款異常-解決 —— 鎖1
from multiprocessing import Process
from multiprocessing import Value   #在不同的程序間共享的變數
from multiprocessing import Lock
import time


def deposit(money,lock):    #存錢
    # lock.acquire()   #上鎖的操作
    for i in range(100):
        time.sleep(10)
        money.value += 1
    # lock.release()


def withdraw(money,lock):   #取錢
    # lock.acquire()   #上鎖的操作
    for i in range(100):
        time.sleep(0.0001)
        money.value -= 1
    # lock.release()


if __name__ == '__main__':
    money = Value('i',2000) #共享變數 int 數值2000
    lock = Lock() #銀行為了安全 加了一把互斥鎖


    d = Process(target=deposit,args=(money,lock))
    d.start()
    w = Process(target=withdraw,args=(money,lock))
    w.start()
    
    d.join()
    w.join()


    print(money.value)    #三個程序一起跑 不一定會執行詞句 需要等待子程序結束




#=================================


# #銀行存取款demo3 - 存取款異常-解決 —— 鎖2 -更優 因為鎖的力度更小,時間更短
# from multiprocessing import Process
# from multiprocessing import Value   #在不同的程序間共享的變數
# from multiprocessing import Lock
# import time


# def deposit(money,lock):    #存錢
    
#     for i in range(100):
#         time.sleep(0.1)
#         lock.acquire()   #上鎖的操作
#         money.value += 1
#         lock.release()


# def withdraw(money,lock):   #取錢


#     for i in range(100):
#         time.sleep(0.1)
#         lock.acquire()   #上鎖的操作
#         money.value -= 1
#         lock.release()


# if __name__ == '__main__':
#     money = Value('i',2000) #共享變數 int 數值2000
#     lock = Lock() #銀行為了安全 加了一把互斥鎖


#     d = Process(target=deposit,args=(money,lock))
#     d.start()
#     w = Process(target=withdraw,args=(money,lock))
#     w.start()
    
#     d.join()
#     w.join()


#     print(money.value)    #三個程序一起跑 不一定會執行詞句 需要等待子程序結束,因此新增join方法