1. 程式人生 > >進擊的Python【第九章】:paramiko模塊、線程與進程、各種線程鎖、queue隊列、生產者消費者模型

進擊的Python【第九章】:paramiko模塊、線程與進程、各種線程鎖、queue隊列、生產者消費者模型

password locking form maxsize 廁所 sorted [0 hostname nbsp

一、paramiko模塊

他是什麽東西?

  paramiko模塊是用python語言寫的一個模塊,遵循SSH2協議,支持以加密和認證的方式,進行遠程服務器的連接。

先來個實例:

 1 import paramiko
 2 # 創建SSH對象
 3 ssh = paramiko.SSHClient()
 4 
 5 # 允許連接不在know_hosts文件中的主機
 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
 7 # 連接服務器
 8 ssh.connect(hostname=10.0.0.31, port=52113, username=
root, password=123456) 9 # 執行命令 10 stdin, stdout, stderr = ssh.exec_command(df) 11 # 獲取命令結果 12 res,err = stdout.read(),stderr.read() 13 result = res if res else err 14 15 print(result.decode()) 16 17 # 關閉連接 18 ssh.close()

有些機智的少年會突然想到,如果我想做信任呢??我的秘鑰怎麽用在裏面呢?

同樣機智的我想到了下面的方法:

 1 import paramiko
2 3 #這裏寫我們的密鑰文件 4 private_key = paramiko.RSAKey.from_private_key_file(id_rsa.txt) 5 6 # 創建SSH對象 7 ssh = paramiko.SSHClient() 8 # 允許連接不在know_hosts文件中的主機 9 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 10 # 連接服務器,這裏我們用pkey參數設置為私鑰登陸 11 ssh.connect(hostname=10.0.0.41, port=22, username=
xiaoqiang, pkey=private_key) 12 13 # 執行命令 14 stdin, stdout, stderr = ssh.exec_command(df) 15 result = stdout.read() 16 print(result.decode()) 17 stdin, stdout2, stderr = ssh.exec_command(ifconfig) 18 # 獲取命令結果 19 result2 = stdout2.read() 20 print(result2.decode()) 21 22 # 關閉連接 23 ssh.close()

二、進程與線程

什麽是進程??

An executing instance of a program is called a process.

Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

程序並不能單獨運行,只有將程序裝載到內存中,系統為它分配資源才能運行,而這種執行的程序就稱之為進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。

在多道編程中,我們允許多個程序同時加載到內存中,在操作系統的調度下,可以實現並發地執行。這是這樣的設計,大大提高了CPU的利用率。進程的出現讓每個用戶感覺到自己獨享CPU,因此,進程就是為了在CPU上實現多道編程而提出的。

有了進程為什麽還要線程?

進程有很多優點,它提供了多道編程,讓我們感覺我們每個人都擁有自己的CPU和其他資源,可以提高計算機的利用率。很多人就不理解了,既然進程這麽優秀,為什麽還要線程呢?其實,仔細觀察就會發現進程還是有很多缺陷的,主要體現在兩點上:

  • 進程只能在一個時間幹一件事,如果想同時幹兩件事或多件事,進程就無能為力了。

  • 進程在執行的過程中如果阻塞,例如等待輸入,整個進程就會掛起,即使進程中有些工作不依賴於輸入的數據,也將無法執行。

例如,我們在使用qq聊天, qq做為一個獨立進程如果同一時間只能幹一件事,那他如何實現在同一時刻 即能監聽鍵盤輸入、又能監聽其它人給你發的消息、同時還能把別人發的消息顯示在屏幕上呢?你會說,操作系統不是有分時麽?但我的親,分時是指在不同進程間的分時呀, 即操作系統處理一會你的qq任務,又切換到word文檔任務上了,每個cpu時間片分給你的qq程序時,你的qq還是只能同時幹一件事呀。

再直白一點, 一個操作系統就像是一個工廠,工廠裏面有很多個生產車間,不同的車間生產不同的產品,每個車間就相當於一個進程,且你的工廠又窮,供電不足,同一時間只能給一個車間供電,為了能讓所有車間都能同時生產,你的工廠的電工只能給不同的車間分時供電,但是輪到你的qq車間時,發現只有一個幹活的工人,結果生產效率極低,為了解決這個問題,應該怎麽辦呢?。。。。沒錯,你肯定想到了,就是多加幾個工人,讓幾個人工人並行工作,這每個工人,就是線程!

什麽是線程?

線程是操作系統能夠進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務

A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.

Suppose you‘re reading a book, and you want to take a break right now, but you want to be able to come back and resume reading from the exact point where you stopped. One way to achieve that is by jotting down the page number, line number, and word number. So your execution context for reading a book is these 3 numbers.

If you have a roommate, and she‘s using the same technique, she can take the book while you‘re not using it, and resume reading from where she stopped. Then you can take it back, and resume it from where you were.

Threads work in the same way. A CPU is giving you the illusion that it‘s doing multiple computations at the same time. It does that by spending a bit of time on each computation. It can do that because it has an execution context for each computation. Just like you can share a book with your friend, many tasks can share a CPU.

On a more technical level, an execution context (therefore a thread) consists of the values of the CPU‘s registers.

Last: threads are different from processes. A thread is a context of execution, while a process is a bunch of resources associated with a computation. A process can have one or many threads.

Clarification: the resources associated with a process include memory pages (all the threads in a process have the same view of the memory), file descriptors (e.g., open sockets), and security credentials (e.g., the ID of the user who started the process).

三、線程與進程的區別

簡而言之,一個程序至少有一個進程,一個進程至少有一個線程。
線程的劃分尺度小於進程,使得多線程程序的並發性高。
另外,進程在執行過程中擁有獨立的內存單元,而多個線程共享內存,從而極大地提高了程序的運行效率。
線程在執行過程中與進程還是有區別的。每個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。但是線程不能夠獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
從邏輯角度來看,多線程的意義在於一個應用程序中,有多個執行部分可以同時執行。但操作系統並沒有將多個線程看做多個獨立的應用,來實現進程的調度和管理以及資源分配。這就是進程和線程的重要區別。

進程是具有一定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位.
線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧),但是它可與同屬一個進程的其他的線程共享進程所擁有的全部資源.
一個線程可以創建和撤銷另一個線程;同一個進程中的多個線程之間可以並發執行.

進程和線程的主要差別在於它們是不同的操作系統資源管理方式。進程有獨立的地址空間,一個進程崩潰後,在保護模式下不會對其它進程產生影響,而線程只是一個進程中的不同執行路徑。線程有自己的堆棧和局部變量,但線程之間沒有單獨的地址空間,一個線程死掉就等於整個進程死掉,所以多進程的程序要比多線程的程序健壯,但在進程切換時,耗費資源較大,效率要差一些。但對於一些要求同時進行並且又要共享某些變量的並發操作,只能用線程,不能用進程。

看了上面這些概念應該有個大概的印象啦,那我們來個具體栗子,通過代碼理解下:

多線程的調用有兩種方法:

直接調用:

 1 import threading
 2 
 3 def run(n):
 4     print("task", n)
 5 
 6 t1 = threading.Thread(target=run, args=("t1",))
 7 t2 = threading.Thread(target=run, args=("t2",))
 8 
 9 t1.start()
10 t2.start()

上面是多線程的一個簡單用法,大家運行後會覺得,這和單線程好像並沒有什麽區別。。。。。好吧,確實,那我們修改下

import threading

def run(n):
    print("task", n)
    time.sleep(2) #我們讓他睡兩秒

t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))

t1.start()
t2.start()

我們用上面的代碼和單線程的比較後就發現區別了,多線程的是同時輸出結果,然後等待兩秒,一共就兩秒,而單線程的是輸出一個等兩秒,在出一個再等兩秒一共四秒。

繼承式調用:

 1 import threading
 2 import time
 3  
 4  
 5 class MyThread(threading.Thread):
 6     def __init__(self,num):
 7         threading.Thread.__init__(self)
 8         self.num = num
 9  
10     def run(self):#定義每個線程要運行的函數
11  
12         print("running on number:%s" %self.num)
13  
14         time.sleep(3)
15  
16 if __name__ == __main__:
17  
18     t1 = MyThread(1)
19     t2 = MyThread(2)
20     t1.start()
21     t2.start()

四、線程的join和daemon用法

先給一段代碼感受下輸出:

 1 import threading
 2 import time
 3 
 4 def run(n):
 5     print("task ",n )
 6     time.sleep(2)
 7     print("task done",n)
 8 
 9 start_time = time.time()
10 t_objs = [] #存線程實例
11 for i in range(50):
12     t = threading.Thread(target=run,args=("t-%s" %i ,))
13     t.start()
14     t_objs.append(t) #為了不阻塞後面線程的啟動,不在這裏join,先放到一個列表裏
15 
16 print("----------all threads has finished...")
17 print("cost:",time.time() - start_time)

運行後有沒有發現。臥槽,為什麽sleep的兩秒沒算進去啊,WTF~

這裏我們就用到了join,我們在重點解釋下,敲黑板!!一個程序,最少有一個線程,他叫主線程,其他線程都是由主線程創建,創建完主線程和子線程就沒關系了。

在上面的程序中,主線程在創建完子線程後,並沒有等子線程運行完,就執行了下面的代碼,導致了最後輸出時間沒有包含子線程的兩秒。

既然我們知道了原因,我們就修改下代碼:

 1 import threading
 2 import time
 3 
 4 def run(n):
 5     print("task ",n )
 6     time.sleep(2)
 7     print("task done",n)
 8 
 9 start_time = time.time()
10 t_objs = [] #存線程實例
11 for i in range(50):
12     t = threading.Thread(target=run,args=("t-%s" %i ,))
13     t.start()
14     t_objs.append(t) #為了不阻塞後面線程的啟動,不在這裏join,先放到一個列表裏
15 
16 for t in t_objs: #循環線程實例列表,等待所有線程執行完畢
17     t.join()
18 
19 
20 print("----------all threads has finished...")
21 print("cost:",time.time() - start_time)

這裏join的作用就是主線程等待其他線程執行完畢後在往下走。

daemon的作用有點和join相反的意思,主程序會等待非守護線程執行完畢後退出,守護線程相當於主線程的仆人,主線程結束,守護線程也結束。

舉個栗子:

 1 import threading
 2 import time
 3 
 4 def run(n):
 5     print("task ",n )
 6     time.sleep(2)
 7     print("task done",n,threading.current_thread())
 8 
 9 start_time = time.time()
10 t_objs = [] #存線程實例
11 for i in range(50):
12     t = threading.Thread(target=run,args=("t-%s" %i ,))
13     t.setDaemon(True) #把當前線程設置為守護線程
14     t.start()
15     t_objs.append(t) #為了不阻塞後面線程的啟動,不在這裏join,先放到一個列表裏
16 
17 time.sleep(2)
18 print("----------all threads has finished...",threading.current_thread(),threading.active_count())
19 print("cost:",time.time() - start_time)

運行後,發現程序不等守護線程的sleep直接結束了。

五、Python GIL(Global Interpreter Lock)

  GIL顧名思義叫全局鎖,他為了使每一個數據只能由一個線程使用,防止造成上下文混亂。聽到這。。。我擦,那這還叫什麽多線程呀,其實GIL並不是Python的特性,它是在實現Python解析器(CPython)時所引入的一個概念。就好比C++是一套語言(語法)標準,但是可以用不同的編譯器來編譯成可執行代碼。有名的編譯器例如GCC,INTEL C++,Visual C++等。Python也一樣,同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行。像其中的JPython就沒有GIL。然而因為CPython是大部分環境下默認的Python執行環境。所以在很多人的概念裏CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷。所以這裏要先明確一點:GIL並不是Python的特性,Python完全可以不依賴於GIL。

GIL的流程圖:

技術分享

線程鎖(互斥鎖mutex)

當一個程序有多個線程共享一個父進程的內存,意味著可以修改同一份數據,當兩個進程同時要修改一份數據是,我們就要用到鎖

舉個栗子:

 1 import threading
 2 import time
 3 
 4 def run(n):
 5     lock.acquire()
 6     global  num
 7     num +=1
 8     time.sleep(1)
 9     lock.release()
10 
11 
12 lock = threading.Lock()
13 num = 0
14 t_objs = [] #存線程實例
15 for i in range(50):
16     t = threading.Thread(target=run,args=("t-%s" %i ,))
17     t.start()
18     t_objs.append(t) #為了不阻塞後面線程的啟動,不在這裏join,先放到一個列表裏
19 
20 for t in t_objs: #循環線程實例列表,等待所有線程執行完畢
21     t.join()
22 
23 print("----------all threads has finished...",threading.current_thread(),threading.active_count())
24 
25 print("num:",num)

RLock(遞歸鎖)

鎖套鎖的時候,需要用到遞歸鎖,不然外面的鎖和裏面的鎖會混淆

 1 import threading,time
 2  
 3 def run1():
 4     print("grab the first part data")
 5     lock.acquire()
 6     global num
 7     num +=1
 8     lock.release()
 9     return num
10 def run2():
11     print("grab the second part data")
12     lock.acquire()
13     global  num2
14     num2+=1
15     lock.release()
16     return num2
17 def run3():
18     lock.acquire()
19     res = run1()
20     print(--------between run1 and run2-----)
21     res2 = run2()
22     lock.release()
23     print(res,res2)
24  
25  
26 if __name__ == __main__:
27  
28     num,num2 = 0,0
29     lock = threading.RLock()
30     for i in range(10):
31         t = threading.Thread(target=run3)
32         t.start()
33  
34 while threading.active_count() != 1:
35     print(threading.active_count())
36 else:
37     print(----all threads done---)
38     print(num,num2)

Semaphore(信號量)

互斥鎖 同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據 ,比如廁所有3個坑,那最多只允許3個人上廁所,後面的人只能等裏面有人出來了才能再進去

 1 import threading,time
 2  
 3 def run(n):
 4     semaphore.acquire()
 5     time.sleep(1)
 6     print("run the thread: %s\n" %n)
 7     semaphore.release()
 8  
 9 if __name__ == __main__:
10  
11     num= 0
12     semaphore  = threading.BoundedSemaphore(5) #最多允許5個線程同時運行
13     for i in range(20):
14         t = threading.Thread(target=run,args=(i,))
15         t.start()
16  
17 while threading.active_count() != 1:
18     pass #print threading.active_count()
19 else:
20     print(----all threads done---)
21     print(num)

Event事件

An event is a simple synchronization object;

the event represents an internal flag, and threads
can wait for the flag to be set, or set or clear the flag themselves.

event = threading.Event()

# a client thread can wait for the flag to be set
event.wait()

# a server thread can set or reset it
event.set()
event.clear()
If the flag is set, the wait method doesn’t do anything.
If the flag is cleared, wait will block until it becomes set again.
Any number of threads may wait for the same event.

通過Event來實現兩個或多個線程間的交互,下面是一個紅綠燈的例子,即起動一個線程做交通指揮燈,生成幾個線程做車輛,車輛行駛按紅燈停,綠燈行的規則。

 1 import threading,time
 2 import random
 3 def light():
 4     if not event.isSet():
 5         event.set() #wait就不阻塞 #綠燈狀態
 6     count = 0
 7     while True:
 8         if count < 10:
 9             print(\033[42;1m--green light on---\033[0m)
10         elif count <13:
11             print(\033[43;1m--yellow light on---\033[0m)
12         elif count <20:
13             if event.isSet():
14                 event.clear()
15             print(\033[41;1m--red light on---\033[0m)
16         else:
17             count = 0
18             event.set() #打開綠燈
19         time.sleep(1)
20         count +=1
21 def car(n):
22     while 1:
23         time.sleep(random.randrange(10))
24         if  event.isSet(): #綠燈
25             print("car [%s] is running.." % n)
26         else:
27             print("car [%s] is waiting for the red light.." %n)
28 if __name__ == __main__:
29     event = threading.Event()
30     Light = threading.Thread(target=light)
31     Light.start()
32     for i in range(3):
33         t = threading.Thread(target=car,args=(i,))
34         t.start()

queue隊列

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out
class queue.PriorityQueue(maxsize=0) #存儲數據時可設置優先級的隊列

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data).

exception queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full.

Queue.qsize()
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Queue.put_nowait(item)

Equivalent to put(item, False).

Queue.get(block=True, timeout=None)

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()

Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join() block直到queue被消費完畢

生產者消費者模型

在並發編程中使用生產者和消費者模式能夠解決絕大多數並發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。

為什麽要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那麽生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那麽消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

什麽是生產者消費者模式

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

下面是一個基本的生產者消費者模型

 1 import time,random
 2 import queue,threading
 3 q = queue.Queue()
 4 def Producer(name):
 5   count = 0
 6   while count <20:
 7     time.sleep(random.randrange(3))
 8     q.put(count)
 9     print(Producer %s has produced %s baozi.. %(name, count))
10     count +=1
11 def Consumer(name):
12   count = 0
13   while count <20:
14     time.sleep(random.randrange(4))
15     if not q.empty():
16         data = q.get()
17         print(data)
18         print(\033[32;1mConsumer %s has eat %s baozi...\033[0m %(name, data))
19     else:
20         print("-----no baozi anymore----")
21     count +=1
22 p1 = threading.Thread(target=Producer, args=(A,))
23 c1 = threading.Thread(target=Consumer, args=(B,))
24 p1.start()
25 c1.start()

進擊的Python【第九章】:paramiko模塊、線程與進程、各種線程鎖、queue隊列、生產者消費者模型