1. 程式人生 > >Python學習—pyhton中的進程

Python學習—pyhton中的進程

第一個 timeit 不同 訪問 就會 gist 避免 pid 哪些

1.進程定義

進程: 進程就是一個程序在一個數據集上的一次動態執行過程。進程一般由程序、數據、進程控制塊(pcb)三部分組成。
(1)我們編寫的程序用來描述進程要完成哪些功能以及如何完成;
(2)數據則是程序在執行過程中所需要使用的資源;
(3)進程控制塊用來記錄進程的所有信息。系統可以利用它來控制和管理進程,它是系統感知進程存在的唯一標誌。

2.創建進程

新創建的進程在內存獨立開辟一塊空間,不與其他進程共享空間、數據。
同一個進程中,新創建的線程與此進程裏其他線程共享空間、數據。

1.os.fork()函數

os模塊的三個方法:
os.fork()創建一個當前進程的子進程
os.getpid()獲取當前進程pid

os.getppid()獲取當前進程的父進程的Pid
關於fork():
它用來創建一個進程,即為當前進程的子進程,復制父進程的所有代碼並從fork語句處開始運行。運行父進程還是子進程的取決於當前os調度策略。
在父進程中返回子進程的pid,在子進程中返回0。即返回0表示在子進程中運行,返回大與0的數表示在父進程中運行。

例子:

import os

print(‘當前進程:‘,os.getpid())
print(‘當前進程的父進程:‘,os.getppid())

pid = os.fork()
if pid == 0:
    print(‘此時為子進程:‘,os.getpid(),‘\n其父進程:‘,os.getppid())
else:
    print(‘父進程:‘,os.getpid(),‘\nos.fork的返回值pid:‘,pid)

運行結果:

當前進程: 16839
當前進程的父進程: 2912
父進程: 16839 
os.fork的返回值pid: 16842
此時為子進程: 16842 
其父進程: 16839

從運行結果中看,在linux中fork產生子進程後是先運行父進程,當父進程結束後再進入子進程運行。

2.實例化進程類

直接通過實例化進程類multiprocessing.Process創建新進程。
和線程類一樣,進程類也有start()方法,join()方法。調用對象的start()方法實例上也是調用的類中的run()方法。

# 導入進程模塊
import multiprocessing
import os

def job(ss):
    print(ss,‘當前子進程:%s‘ %os.getpid())

#實例化進程類,並提交任務,傳入任務所需要的參數
p1 = multiprocessing.Process(target=job,args=(‘abc‘,))
p1.start()
p2 = multiprocessing.Process(target=job,args=(‘123‘,))
p2.start()

# 和線程一樣,進程也有join方法。
p1.join()
p2.join()

print(‘完成......‘)

運行結果:

abc 當前子進程:17234
123 當前子進程:17235
完成......

3.繼承進程類來自定義進程類

繼承python提供的進程類,重寫方法,創建自己所需要的進程類,再實例化自定義的進程類。

import multiprocessing

class Job(multiprocessing.Process):
    #重寫構造方法
    def __init__(self,cc):
        super(Job, self).__init__()
        self.cc = cc

    #重寫run方法,和線程一樣
    def run(self):
        print(self.cc)

#實例化對象
if __name__ == "__main__":
    pp = []
    for i in range(10):
        p = Job(str(i)+‘:123456‘)
        pp.append(p)
        p.start()

    for p in pp:
        p.join()
    print(‘hahhahaha‘)

運行結果:

0:123456
1:123456
2:123456
3:123456
4:123456
5:123456
6:123456
7:123456
8:123456
9:123456
hahhahaha

3.多進程與多線程的對比

import threading
import multiprocessing
from timeit import timeit

class Jobthread(threading.Thread):
    def __init__(self,li):
        super(Jobthread,self).__init__()
        self.li = li
    def run(self):
        sum(self.li)

class Jobprocess(multiprocessing.Process):
    def __init__(self,li):
        super(Jobprocess, self).__init__()
        self.li = li
    def run(self):
        for i in self.li:
            sum(i)

# 這個裝飾器是自己寫的,用來計算某個函數執行時間
@timeit
def use_Pro(list):
    for i in range(0,len(list), 1000):
        p = Jobprocess(list[i:i+1000])
        p.start()

@timeit
def use_Thr(list):
    for li in list:
        t = Jobthread(li)
        t.start

if __name__ == "__main__":
    list = [[1,2,3,4,5,6],[2,3,4,5,6,7],[3,4,5,6,7,8],[4,5,6,7,8,9]]*1000
    use_Pro(list)
    use_Thr(list)

運行結果:

use_Pro運行時間0.0041866302490234375
use_Thr運行時間0.02240157127380371

正如看到的結果一樣,多進程適合計算密集型任務,多線程適合i/o密集型任務。

3.守護進程與終止進程

1.守護進程-daemon屬性

和線程類似,進程類也有一個daemon屬性,默認值為False。
當改變他的值為True時,當主進程結束,就會強行終止其他的所以進程。
實例:
(1)第一個程序

import multiprocessing
import time

def job():
    print(‘開始子進程‘)
    time.sleep(3)
    print(‘子進程結束‘)

if __name__ == "__main__":
    p = multiprocessing.Process(target=job)
    p.start()
    print("程序結束......")

運行結果:

程序結束......
開始子進程
子進程結束

主進程結束,其他進程還在繼續執行。
(2)第二個程序

import multiprocessing
import time

def job():
    print(‘開始子進程‘)
    time.sleep(3)
    print(‘子進程結束‘)

if __name__ == "__main__":
    p = multiprocessing.Process(target=job)
    p.daemon = True
    p.start()
    print("程序結束......")

運行結果:

程序結束......

當主進程結束,其他進程將會被強制終止結束。

2.終止進程

import multiprocessing
import time

def job():
    print(‘開始子進程‘)
    time.sleep(3)
    print(‘子進程結束‘)

if __name__ == "__main__":
    p = multiprocessing.Process(target=job)
    p.daemon = True
    print(p.is_alive())     #啟動進程之前查看進程狀態
    p.start()
    print(p.is_alive())     #啟動進程之後查看進程狀態
    p.terminate()           #終止進程
    print(p.is_alive())     #終止進程命令一發出後,查看進程狀態。此時進程在釋放過程中,還沒有被完全釋放。
    p.join()                #先讓進程完全釋放
    print(p.is_alive())     #最後查看進程狀態

    print("程序結束......")

運行結果:

False
True
True
False
程序結束......

4.進程間通信

"""
通過隊列實現進程間通信,隊列充當消息管道的作用(類似生產者消費者模型)
這裏通信一直存在,也就是這兩個進程會一直存在,沒有銷毀釋放。
"""
import multiprocessing
from multiprocessing import Queue
import time

class Put_news(multiprocessing.Process):
    def __init__(self,queue):
        super(Put_news, self).__init__()
        self.queue = queue
    def run(self):
        for i in range(100):
            self.queue.put(i)
            print("傳遞消息:%s" %i)
            time.sleep(0.1)

class Get_news(multiprocessing.Process):
    def __init__(self,queue):
        super(Get_news, self).__init__()
        self.queue = queue
    def run(self):
        while True:
            time.sleep(0.11)
            print("接收消息++++++++++++:%s" %(self.queue.get()))

if __name__ == "__main__":
    q = Queue()
    p = Put_news(q)
    g = Get_news(q)
    p.start()
    g.start()

    if not p.is_alive():
        g.terminate()

運行結果:

截圖

5.分布式進程

任務需要處理的數據特別大, 希望多臺主機共同處理任務。multiprocessing.managers子模塊裏面可以實現將進程分布到多臺機器上
(管理端主機要運算一些列任務,通過與其他主機建立“連接“,將任務分配給其他主機執行,並將執行結果返回給管理端主機。)
管理端主機代碼:

import random
from queue import Queue
from multiprocessing.managers import BaseManager

# 1.創建隊列(發送任務的隊列,收取結果的隊列)
task_queue = Queue()
result_queue = Queue()

# 第二三步驟可以互換順序
# 2.將隊列註冊到網絡(這樣其他主機可以通過網絡接收任務,發送結果)
# 註冊的隊列(任務隊列,結果隊列)的唯一標識碼分別為‘put_task_queue‘,‘get_result_queue‘
BaseManager.register(‘put_task_queue‘,callable=lambda :task_queue)
BaseManager.register(‘get_result_queue‘,callable=lambda : result_queue)

# 3.綁定端口(3333),設定密碼(hahahaha)
manager = BaseManager(address=(‘172.25.254.158‘,3333),authkey=b‘hahahaha‘)

# 4.啟動manager,開始共享隊列
manager.start()

# 5.通過網絡訪問共享的隊列
task = manager.put_task_queue()
result = manager.get_result_queue()

# 6.向任務隊列中放入執行任務的數據,這裏放入100個任務
for i in range(100):
    n = random.randint(10,500)
    task.put(n)
    print(‘任務列表加入數據:‘+str(n))

# 7.從結果隊列中讀取各個主機的任務執行結果
for j in range(100):
    res = result.get()
    print(‘執行結果:‘+str(res))

# 8.任務執行結束,關閉共享隊列
manager.shutdown()

運算主機代碼:

"""
在各個工作主機上執行的代碼相同
"""

from multiprocessing.managers import BaseManager

# 1. 連接manager端,獲取共享的隊列
import time

worker = BaseManager(address=(‘172.25.254.158‘,3333),authkey=b‘hahahaha‘)

# 2.註冊隊列,去獲取網絡上共享的隊列中的內容
BaseManager.register(‘put_task_queue‘)
BaseManager.register(‘get_result_queue‘)

# 3.連接網絡
worker.connect()

# 4.通過網絡訪問共享的隊列

task = worker.put_task_queue()
result = worker.get_result_queue()

# 5.讀取任務,處理任務,這裏讀取了50個任務進行處理
# 每臺運算主機上的處理任務數量可以不同,不過為了避免修改代碼,一般都相同。
for i in range(50):
    n = task.get()
    print(‘執行任務 %d**2 = ‘%(n))
    res = ‘%d**2=%d‘ %(n,n**2)  #這裏設置執行的任務是求平方
    result.put(res)     #將結果放入結果隊列
    time.sleep(1)       #休息1秒

print(‘工作主機執行任務結束.....‘)

6.進程池

和線程一樣,進程也有進程池。
1.第一種方法

import multiprocessing
import time

def job(id):
    print(‘start id ---> %d‘ %id)
    print(‘end id ----> %d‘ %id)
    time.sleep(3)
# 創建含有8個進程的進程池
pool = multiprocessing.Pool(8)
# 給進城池的進程分配任務
for i in range(12):
    pool.apply_async(job,args=(i,))

# 關閉進程池,使進程池不再工作運行
pool.close()
# 等待所有子進程結束之後再開始主進程
pool.join()

print(‘all works completed!‘)

運行結果:

start id ---> 0
end id ----> 0
start id ---> 1
end id ----> 1
start id ---> 2
end id ----> 2
start id ---> 3
end id ----> 3
start id ---> 4
end id ----> 4
start id ---> 5
end id ----> 5
start id ---> 6
end id ----> 6
start id ---> 7
end id ----> 7
start id ---> 8
end id ----> 8
start id ---> 9
end id ----> 9
start id ---> 10
end id ----> 10
start id ---> 11
end id ----> 11
all works completed!

2.第二種方法

from concurrent.futures import ProcessPoolExecutor
import time
def job(id):
    print(‘start id ---> %d‘ %id)
    print(‘end id ----> %d‘ %id)
    time.sleep(3)

# 創建含有2個進程的進程池
pool = ProcessPoolExecutor(max_workers=2)
# 給進程池的進程分配任務,submit方法返回一個_base.Future對象
f1 = pool.submit(job,1)
f2 = pool.submit(job,2)
f3 = pool.submit(job,3)
f4 = pool.submit(job,4)
# 執行f1對象的各種方法
f1.done()
f1.result()

運行結果:

start id ---> 1
end id ----> 1
start id ---> 2
end id ----> 2
start id ---> 3
end id ----> 3
start id ---> 4
end id ----> 4

3.第三種方法

from concurrent.futures import ProcessPoolExecutor
import time
def job(id):
    print(‘start id ---> %d‘ %id)
    print(‘end id ----> %d‘ %id)
    time.sleep(1)
pool = ProcessPoolExecutor(max_workers=3)
pool.map(job,range(1,10))

運行結果:

start id ---> 1
end id ----> 1
start id ---> 2
end id ----> 2
start id ---> 3
end id ----> 3
start id ---> 4
end id ----> 4
start id ---> 5
end id ----> 5
start id ---> 6
end id ----> 6
start id ---> 7
end id ----> 7
start id ---> 8
end id ----> 8
start id ---> 9
end id ----> 9

Python學習—pyhton中的進程