1. 程式人生 > >老男孩14期自動化運維day9隨筆和作業(多執行緒批量管理主機)(二)

老男孩14期自動化運維day9隨筆和作業(多執行緒批量管理主機)(二)

執行緒與程序

1.執行緒:

os呼叫CPU進行運算的最小單位,被包含在程序中(就是一堆指令)

小知識點
運算速度比較:CPU>RAM>>磁碟
CPU 稍大於RAM(記憶體),RAM遠大於磁碟
每一個程式的記憶體都是獨立的,不能互相訪問
單核CPU只能同時執行一個任務,但是因為太快了,在CPU內進行上下文切換(執行緒的上下文字質上是一組CPU的暫存器,有正在執行程式中的指標及堆疊指標。)
(1)以下是一個執行緒例項:

# coding:utf-8
# Author:Yang

import threading
import  time

start_time=
time.time() # 多執行緒 def run(n): print("task",n) time.sleep(2) print("task done",n) t_objs=[] # 存執行緒例項 for i in range(50): t =threading.Thread(target=run,args=("t%s"%i,)) t.start() t_objs.append(t) # 為了不阻塞後面執行緒的啟動,不在這裡join,先放到一個列表裡 for t in t_objs: # 想檢視所有執行緒執行完的時間必須先建立個執行緒的列表 ,並for迴圈
t.join() print(threading.current_thread(),threading.active_count()) # mainthread、 當前活躍執行緒 print("cost:",time.time()-start_time) # 一個程序至少有一個執行緒,這段程式本身就是一個主執行緒,最後的cost時間是主執行緒執行的時間,所以說cost裡沒有2s,在for迴圈裡只是開闢了子執行緒的執行 # 預設情況下主執行緒是不會等子執行緒執行完畢的

上述程式碼中,例項化執行緒寫法為

t=treading.Thread(target=方法名,args=(引數,))
注意args中引數是一個元組,最後一個不寫就是預設none t.start

一個程序至少有一個執行緒,這段程式執行時候本身起了一個主執行緒,最後的cost時間是主執行緒執行的時間,所以說cost裡沒有2s,在for迴圈裡只是開闢了子執行緒的執行
預設情況下主執行緒是不會等子執行緒執行完畢的

那麼如何去檢測所有子執行緒執行結束的時間?

在python中使用t.join()是等待執行緒結束,在java,c#中是wait(),相當於不等待結果就不會往下走,變成序列的了

需要檢測所有子執行緒執行結束時間,如下:

for i in range(50):

    t =threading.Thread(target=run,args=("t%s"%i,))
    t.start()
    t_objs.append(t)  # 為了不阻塞後面執行緒的啟動,不在這裡join,先放到一個列表裡
for t in t_objs: # 想檢視所有執行緒執行完的時間必須先建立個執行緒的列表 ,並for迴圈
    t.join()

(2)除了過程寫法 還有類式寫法(用的較少):

#!/usr/bin/env python
# coding:utf-8
# Author:Yang

import threading
import time


# 類的寫法來寫多執行緒
# 這樣寫比較複雜

class MyThread(threading.Thread):
    def __init__(self,n,sleep_time):
        self.n=n
        self.sleep_time=sleep_time
        super(MyThread,self).__init__() # 因為繼承父類 要重構父類的構造方法

    def run(self): # 只能是run
        print("running task",self.n)
        time.sleep(self.sleep_time)
        print("task done..",self.n)
t1=MyThread("t1",2)
t2=MyThread("t2",4)

t1.start()
# t1.join()  # 其實就是有些語言中的wait() 等待執行緒結束 相當於不等待結果就不會往下走,變成序列的了
t2.start()

t1.join()
t2.join()
print("main thread...")

(3)多執行緒 守護執行緒

通過t.setDaemon(True)
把當前執行緒設定成守護執行緒(主執行緒結束都退出,不關心守護執行緒是否結束,例如寫socket時,建立一個連線就是一個執行緒,當程式斷開,所有socket都斷開)

for i in range(50):

    t =threading.Thread(target=run,args=("t%s"%i,))
    t.setDaemon(True) # 把當前執行緒設定成守護執行緒(主執行緒結束都退出 不關心守護執行緒是否結束),就不用加join了
    t.start()
    t_objs.append(t)  # 為了不阻塞後面執行緒的啟動,不在這裡join,先放到一個列表裡

(4)互斥鎖

需求,當一個執行緒在改全域性變數的時候,防止另外的執行緒來改資料,需要加互斥鎖。在python3.x不用加鎖了,2.x在ubuntu上要加鎖

#!/usr/bin/env python
# coding:utf-8
# Author:Yang
import threading
import  time

# 互斥鎖
start_time=time.time()
def run(n):
    lock.acquire() # 獲取鎖
    global num # 宣告全域性變量了
    num+=1
    lock.release()
lock=threading. Lock # 釋放鎖

# python 3.x不用加鎖了 2.x在ubuntu上要加鎖
num =0
t_objs=[]

for i in range(50):

    t =threading.Thread(target=run,args=("t%s"%i,))
    t.start()
    t_objs.append(t)
for t in t_objs:
    t.join()

print("-----all finished---")
print("num:",num)

在 python2.x的ubuntu上會出現結果為(“num”,49)發現不是50了,說明有執行緒在改資料的時候,另外的執行緒也在操作資料

(5)遞迴鎖
遞迴鎖的使用場景:內部鎖過多的時候
threading.Lock 內部鎖過多會卡死,使用RLock遞迴鎖 可以解除死鎖

import threading, time

# 遞迴鎖的使用場景 (內部鎖過多的時候)

def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num += 1
    lock.release()
    return num


def run2():
    print("grab the second part data")
    lock.acquire()
    global num2
    num2 += 1
    lock.release()
    return num2


def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res, res2)




num, num2 = 0, 0
lock = threading.RLock() # threading.Lock 卡死 使用RLock 遞迴鎖 可以解除死鎖
for i in range(1):
    t = threading.Thread(target=run3)
    t.start()

while threading.active_count() != 1:
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num, num2)

(6)訊號量

semaphore.acquire()
semaphore.release() semaphore =
threading.BoundedSemaphore(5) 最多允許五個執行緒同時執行,實際上是每出來一個就放進去一個

import threading, time
# 訊號量

def run(n):
    semaphore.acquire() # 訊號量鎖
    time.sleep(1)
    print("run the thread: %s\n" % n)
    semaphore.release()

if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(5)  # 最多允許5個執行緒同時執行  實際上是每出來一個就放進去一個
    for i in range(22):
        t = threading.Thread(target=run, args=(i,))
        t.start()
while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('----all threads done---')
    #print(num)

(7) 多執行緒典型模型 生產者消費者模型
目的 解耦 排隊

import threading,time

import queue


# 生產者消費者模型 解耦 排隊
q = queue.Queue(maxsize=10)

def Producer(name):
    count = 1
    while True:
        q.put("骨頭%s" % count)
        print("生產了骨頭",count)
        count +=1
        time.sleep(0.5)



def  Consumer(name):
    #while q.qsize()>0:
    while True:
        print("[%s] 取到[%s] 並且吃了它..." %(name, q.get()))
        time.sleep(1)



p = threading.Thread(target=Producer,args=("Alex",))
c = threading.Thread(target=Consumer,args=("ChengRonghua",))
c1 = threading.Thread(target=Consumer,args=("王森",))


p.start()
c.start()
c1.start()

(8)event 事件 :

事件:執行緒可以關注另一個執行緒狀態而做出的相應的響應
標誌位:event.set() 先綠燈,預設True為綠燈
event.clear() 清空標誌位 代表紅燈
event.wait() 等待標誌位

# 事件 執行緒可以關注另一個執行緒狀態而做出相應的響應
import time
import threading

event=threading.Event()
def lighter():  # 路燈
    count = 0
    event.set() # 先綠燈 預設true為綠燈
    # event.clear() 清空標誌位 代表紅燈
    # event.wait() 等待標誌位
    while True:
        if count >5 and count<10:
            event.clear() # 把標誌位清了
            print("\033[41;1mred light is on...\033[0m")
        elif count >10:
            event.set() # 變綠燈
            count =0
        else:
            print("\033[42;1mgreen light is on...\033[0m")
        time.sleep(1)
        count+=1

def car(name):
    while True:
        if event.is_set(): # 代表綠燈
            print("[%s] running..."%name)
            time.sleep(1)
        else:
            print("[%s] sees red light,waiting..."%name)
            event.wait()
            print("\033[34;1m[%s] green light is on ,start going...\033[0m"%name)

light = threading.Thread(target=lighter,)
light.start()

car1=threading.Thread(target=car,args=("Tesla",))
car1.start()

2.程序:
程式要以一個整體的形式暴露給作業系統管理,裡面包含對各種資源的呼叫,記憶體的管理,網路介面的呼叫等。。。所以執行緒可以定義為:

對一個程式各種資源管理的集合就叫做程序

程序要操作CPU,必須要先建立一個執行緒(至少一個)程序本身不具有執行功能,而是執行緒呼叫CPU
所有在同一個程序的執行緒共享同一塊記憶體空間

3.程序與執行緒的區別:
(1)啟動執行緒更快,但是在執行時程序與執行緒的速度一樣
(2)執行緒共享記憶體空間,程序的記憶體是獨立的
(3)執行緒與父程序共享記憶體資料,父程序資料改變後,執行緒的資料會改變。子程序是克隆父程序的資料,當父程序資料改變後,不會影響到子程序的資料
(4)同一個程序的執行緒之間可以直接交流。兩個程序想通訊,必須通過中間代理來實現
(5)建立新執行緒很簡單。建立新程序需要對其父程序進行一次克隆
(6)一個執行緒可以控制和操作同一程序裡的其他執行緒,但是程序只能操作其子程序
(7)對一個執行緒進行需改會影響到同一程序的其他執行緒。對程序進行修改,但是子程序不會被修改

4.python的設計缺陷(python GIL —global interperter lock 全域性直譯器鎖 )
注意,只是C python直譯器的缺陷 像 JPython 等沒有這個缺陷,像新的pypy直譯器推出了無GIL的版本,用的最多的還是Cpython

缺陷:無論多少核CPU
永遠都是在同一時間只有一個執行緒在執行(因為GIL),所以說python是假並行,因為python起的是通過cpython直譯器下c語言呼叫系統的原生執行緒

5.佇列(Queue)
作用:解耦,使程式直接實現鬆耦合,提高執行效率
class queue.Queue() # 先進先出
class queue.LifoQueue() # last in first out 後進先出
class queue.PriortyQueue() # 儲存資料時可設定優先順序佇列

#!/usr/bin/env python
# coding:utf-8
# Author:Yang

import queue

# q=queue.Queue(maxsize=2)
# q=queue.LifoQueue()
# q.put("d1")
# q.put("d2")
#
# print(q.qsize())
# print(q.get())
# print(q.get())

q=queue.PriorityQueue()

q.put((-1,"a"))
q.put((10,"yang"))
q.put((3,"b"))
q.put((6,"c"))

print(q.get())
print(q.get())
print(q.get())
print(q.get())