1. 程式人生 > >Python基礎 - 第九天 - paramiko模塊、進程、線程

Python基礎 - 第九天 - paramiko模塊、進程、線程

python

本篇內容:

1.paramiko模塊使用

2.進程、線程簡介

3.python調用線程的方法

4.join - 等待線程執行

5.守護線程

6.GIL - 全局解釋器鎖

7.互斥鎖

8.信號量

9.事件

10.隊列



一、paramiko模塊使用

1.paramiko模塊簡介

? paramiko是一個基於SSH用於連接遠程服務器並執行相關操作(SSHClient和SFTPClinet,即一個是遠程連接,一個是上傳下載服務),使用該模塊可以對遠程服務器進行命令或文件操作,值得一說的是,fabric和ansible內部的遠程管理就是使用的paramiko來現實。


2.使用paramiko模塊做SSHClient:用於連接遠程服

務器並執行基本命令

server上要啟動ssh程序


①SSHClient沒有封裝Transport的用法

import?paramiko


#?創建SSH對象
ssh?=?paramiko.SSHClient()

#?允許連接不在~/.ssh/known_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

#?連接服務器
ssh.connect(hostname=‘c1.salt.com‘,?port=22,?username=‘root‘,?password=‘123‘)

#?執行命令,不要執行top之類的在不停的刷新的命令
stdin,?stdout,?stderr?=?ssh.exec_command(‘df‘)

#?獲取命令結果
result?=?stdout.read()

#?獲取的命令結果是bytes類型
print(result.decode(encoding="utf-8"))

#?關閉連接
ssh.close()


②SSHClient封裝Transport的用法

import?paramiko


transport?=?paramiko.Transport((‘hostname‘,?22))????#?建立連接

transport.connect(username=‘wupeiqi‘,?password=‘123‘)????#?建立連接

#?創建SSH對象
ssh?=?paramiko.SSHClient()????#?SSHClient是定義怎麽傳輸命令、怎麽交互文件
ssh._transport?=?transport

#?執行命令,不要執行top之類的在不停的刷新的命令
stdin,?stdout,?stderr?=?ssh.exec_command(‘df‘)

#?獲取命令結果
result?=?stdout.read()

#?獲取的命令結果是bytes類型
print(result.decode(encoding="utf-8"))

#?關閉連接
transport.close()


③基於ssh免密登入的私鑰連接

import?paramiko


#?指定使用ssh免密登入的私鑰
private_key?=?paramiko.RSAKey.from_private_key_file(‘/home/auto/.ssh/id_rsa‘)

transport?=?paramiko.Transport((‘hostname‘,?22))
transport.connect(username=‘root‘,?pkey=private_key)

ssh?=?paramiko.SSHClient()
ssh._transport?=?transport

stdin,?stdout,?stderr?=?ssh.exec_command(‘df‘)

transport.close()


3.使用paramiko模塊做SFTPClient:用於連接遠程服務器並執行上傳下載

server上要啟動ssh程序


①基於用戶名密碼上傳下載,SFTPClient封裝Transport的用法

import?paramiko


transport?=?paramiko.Transport((‘hostname‘,?22))??#?建立連接

transport.connect(username=‘root‘,?password=‘123‘)??#?建立連接

#?創建sftp對象
sftp?=?paramiko.SFTPClient.from_transport(transport)??#?SFTPClient是定義怎麽傳輸文件、怎麽交互文件

#?將location.py?上傳至服務器?/tmp/test.py
sftp.put(‘/tmp/location.py‘,?‘/tmp/test.py‘)

#?將remove_path?下載到本地?local_path
sftp.get(‘remove_path‘,?‘local_path‘)

#?關閉連接
transport.close()


②基於ssh免密登入的私鑰上傳下載

import?paramiko


#?指定使用ssh免密登入的私鑰
private_key?=?paramiko.RSAKey.from_private_key_file(‘/home/auto/.ssh/id_rsa‘)
?
transport?=?paramiko.Transport((‘hostname‘,?22))
transport.connect(username=‘root‘,?pkey=private_key?)
?
sftp?=?paramiko.SFTPClient.from_transport(transport)
#?將location.py?上傳至服務器?/tmp/test.py
sftp.put(‘/tmp/location.py‘,?‘/tmp/test.py‘)
#?將remove_path?下載到本地?local_path
sftp.get(‘remove_path‘,?‘local_path‘)
?
transport.close()



二、進程、線程簡介

1.進程簡介

? 每個進程都提供執行程序所需的資源。一個進程有一個虛擬地址空間、可執行代碼、對系統對象的開放句柄、一個安全上下文、一個唯一的進程標識符(PID)、環境變量、一個優先級類、最小和最大工作集大小,以及至少一個執行線程。每個進程都是從一個線程開始的,通常稱為主線程,但是可以從它的任何線程中創建額外的線程。

? 進程自己是不能執行的,必須通過線程來執行;


2.線程簡介

? 線程是操作系統能夠進行運算調度的最小單位(操作系統調度CPU最小的單位就是線程)。它被包含在進程之中,是進程中的實際運作單位。

??線程是一個執行的上下文(指令),它是CPU執行指令流所需的全部信息。

??一條線程指的是進程中一個單一順序的控制流,一個進程中可以並發多個線程,每條線程並行執行不同的任務。


3.進程和線程的區別

? 線程共享創建它的進程的地址空間;進程都有自己的地址空間(相同父進程的多個子進程也是這樣的)。同一進程中的線程是共享數據,進程是不共享數據(相同父進程的多個子進程也是這樣的)

??線程可以直接訪問其進程的數據段;子進程擁有父進程數據段的副本。

??線程可以與同一進程的其他線程直接通信;進程必須使用進程間通信(IPC)來與同級進程通信。

??新的線程很容易創建;新的進程需要對父進程的復制(克隆)。

??線程可以對同一進程的其他線程進行相當大的控制和操作;進程只能對子進程進行控制和操作。

??對主線程的更改(取消、優先級更改等)可能影響同一進程的其他線程的行為;對父進程的更改不會影響子進程(只要不刪除父進程就不會影響子進程)。



三、python調用線程的方法

1.直接調用線程的方式

import?threading
import?time


#?定義線程要運行的函數
#?函數名可以隨意命名
def?run(n):
????print("task",?n)

????time.sleep(2)


if?__name__?==?"__main__":

????#?group默認為空,為將來的擴展預留給ThreadGroup類實現的
????#?target是由run方法函數調用的可調用對象。默認為空,代表著什麽都不做
????#?name是線程名稱。默認情況下,構造一個唯一的名稱:thread-n,n是一個十進制數
????#?args是target調用的可調用對象的參數元組,默認為()。即使只有一個參數,也要加上逗號
????#?kwargs是target調用的可調用對象的關鍵字參數字典
????t1?=?threading.Thread(target=run,?args=(1,))??#?生成一個線程實例
????t2?=?threading.Thread(target=run,?args=(2,))??#?生成另一個線程實例

????t1.start()??#?啟動一個線程
????t2.start()??#?啟動另一個線程


2.繼承式調用線程(自定義線程類)的方式

import?threading
import?time


class?MyThread(threading.Thread):
????"""自定義的線程類"""
????def?__init__(self,?num):
????????#?先重構構造函數,再繼承父類的構造函數
????????super(MyThread,?self).__init__()
????????self.num?=?num

????#?定義線程要運行的方法函數
????#?註意,方法函數名必須要是run,因為程序裏已經寫死,會自動調用run方法函數
????def?run(self):
????????print("運行的數字是:%s"?%?self.num)

????????time.sleep(3)


if?__name__?==?‘__main__‘:

????t1?=?MyThread(1)??#?生成一個線程實例,並傳遞參數
????t2?=?MyThread(2)??#?生成另一個線程實例,並傳遞參數

????t1.start()??#?啟動一個線程
????t2.start()??#?啟動另一個線程


3.線程的其它方法

? print(線程實例.getName()):獲取線程名;


??線程實例.setName(name):為線程設置名稱;


??線程實例.setDaemon():設置線程為後臺線程(守護線程)或前臺線程,默認情況下所有線程都是前臺線程。線程實例名.setDaemon()代表將當前線程設置成前臺線程,線程實例名.setDaemon(True)代表將當前線程設置成後臺線程(守護線程)。


??線程實例.join(timeout):等待線程終止,timeout是等待的秒數,timeout為空就代表一直等到線程終止。等待的線程執行完畢後,主線程才繼續往下執行,該方法使得多線程變得無意義;


??線程實例.run():線程被cpu調度後自動執行線程對象的run方法;


??print(threading.active_count()):查看當前活躍的線程數;


??print(threading.current_thread()):查看當前線程的實例,主線程叫MainThread,子線程叫Thread-n;


4.程序開啟線程後的註意事項

? 主程序是主線程在執行;

? 主線程創建子線程後,主線程不會等待子線程執行完畢後再向下執行。也就是說主線程和子線程是並行的;



四、join - 等待線程執行

線程實例.join(timeout):等待線程終止,timeout是等待的秒數,timeout沒有指定就代表一直等到線程終止。等待的線程執行完畢後,主線程才繼續往下執行;

import?threading
import?time


def?run(n):
????print("task",?n)

????time.sleep(2)

????print("task?done",?n)

????print("查看當前線程",?threading.current_thread())


if?__name__?==?"__main__":
????t_obj_list?=?[]

????start_time?=?time.time()

????for?i?in?range(3):??#?啟動3個線程
????????t?=?threading.Thread(target=run,?args=("t-%s"?%?i,))??#?生成線程實例
????????t.start()??#?啟動一個線程
????????t_obj_list.append(t)

????print("當前活躍的線程數",?threading.active_count())

????for?item?in?t_obj_list:
????????item.join()??#?等待線程終止

????print("程序執行總耗時?%s?秒"?%?(time.time()?-?start_time))

????print("查看當前線程",?threading.current_thread())



五、守護線程

1.什麽是守護線程

? 當主線程執行完畢,任何守護線程不管是否執行完成,都會自動終止。程序只會等待主線程、非守護線程執行完成後再退出程序;

??無法將主線程設置成守護線程;

??守護線程一直在監聽主線程是否退出;



2.設置守護線程的方法

? 線程實例.setDaemon():設置線程為後臺線程(守護線程)或前臺線程,默認情況下所有線程都是前臺線程。線程實例名.setDaemon()代表將當前線程設置成前臺線程,線程實例.setDaemon(True)代表將當前線程設置成後臺線程(守護線程)。

? ? 如果是後臺線程,主線程執行過程中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均停止。

? ? 如果是前臺線程,主線程執行過程中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序停止

? 註意,一定要在啟動線程之前設置,不然就設置不了;


3.實例

import?threading
import?time


def?run(n):
????print("task",?n)

????time.sleep(2)

????print("task?done",?n)

????print("查看當前線程",?threading.current_thread())


if?__name__?==?"__main__":

????for?i?in?range(3):??#?啟動3個線程
????????t?=?threading.Thread(target=run,?args=("t-%s"?%?i,))??#?生成線程實例
????????t.setDaemon(True)??#?將當前線程設置成守護線程。一定要在啟動線程之前設置
????????t.start()??#?啟動一個線程

????print("當前活躍的線程數",?threading.active_count())

????print("查看當前線程",?threading.current_thread())



六、GIL - 全局解釋器鎖

??全局解釋器鎖(GIL)的作用就是同一時間只讓一個線程調用CPU;

??機器上無論有多少個CPU,通過CPython無論啟動多少個線程,在執行的時候全局解釋器鎖(GIL)同一時刻只允許一個線程執行操作;

??由於線程之間是進行隨機調度,當多個線程同時修改同一條數據時可能會出現臟數據,所以,出現了全局解釋器鎖(GIL),同一時刻只允許一個線程執行操作。



七、互斥鎖

1.為什麽要有互斥鎖

? 一個進程下可以啟動多個線程,多個線程共享父進程的內存空間,也就意味著每個線程可以訪問同一份數據。假設現在有A、B兩個線程,此時都要對number進行減1操作, 由於2個線程是並發同時運行的,所以2個線程很有可能同時拿走了number=100這個初始變量交給cpu去運算,當A線程去處完的結果是99,但此時B線程運算完的結果也是99,兩個線程同時將CPU運算的結果再賦值給number變量後,結果就都是99。這樣計算出的結果就有問題,那怎麽辦呢?很簡單,每個線程在要修改公共數據時,為了避免自己在還沒改完的時候別人也來修改此數據,可以給這個數據加一把鎖,這樣其它線程想修改此數據時就必須等待你修改完畢並把鎖釋放掉後才能再訪問此數據。


2.加互斥鎖的例子

import?threading


def?operation():
????global?number??#?在每個線程中都獲取這個全局變量

????print(‘線程實例?%s?獲取的number值為?%s?‘?%?(threading.current_thread(),?number))

????lock.acquire()??#?修改數據前加鎖

????number?-=?1??#?對此公共變量進行-1操作

????lock.release()??#?修改完成後釋放鎖


number?=?10??#?設定一個共享變量

thread_list?=?[]

lock?=?threading.Lock()??#?生成全局鎖

for?i?in?range(10):
????t?=?threading.Thread(target=operation)
????t.start()

????thread_list.append(t)

#?等待所有線程執行完畢,不然主線程就會往下執行,有可能某些線程還未執行完畢,主線程打印出的number值有誤
for?t?in?thread_list:
????t.join()

print(‘所有線程修改完成後number的值為‘,?number)


3.遞歸鎖

有時在修改數據時需要套多層鎖(大鎖中還包含有子鎖),如果使用普通的鎖鎖住數據,會出現在修改完數據後無法釋放鎖,鎖死的情況。對於這種情況就要使用遞歸鎖;



4.加遞歸鎖的例子

import?threading


def?run1():
????print("第一次抓取數據")

????lock.acquire()??#?修改數據前再加上一把小鎖

????global?num??#?在線程中都獲取這個全局變量
????num?+=?1

????lock.release()??#?修改完成後釋放小鎖

????return?num


def?run2():
????print("第二次抓取數據")

????lock.acquire()??#?修改數據前再加上一把小鎖

????global?num2??#?在線程中都獲取這個全局變量
????num2?+=?1

????lock.release()??#?修改完成後釋放小鎖

????return?num2


def?run3():

????lock.acquire()??#?修改數據前加上一把大鎖

????res?=?run1()

????print(‘--------run1函數和run2函數之間-----‘)

????res2?=?run2()

????lock.release()??#?修改完成後釋放大鎖

????print("run1函數返回的值是",?res)
????print("run2函數返回的值是",?res2)


if?__name__?==?‘__main__‘:

????num,?num2?=?0,?0

????lock?=?threading.RLock()??#?生成遞歸鎖

????for?i?in?range(2):??#?開啟兩個線程
????????t?=?threading.Thread(target=run3)??#?開啟的線程先調用run3函數
????????t.start()

while?threading.active_count()?!=?1:??#?子線程還未執行完畢
????print("當前活躍的線程數",?threading.active_count())
else:
????print(‘----所有線程執行完畢---‘)
????print("num的值是",?num)
????print("num2的值是",?num2)



八、信號量

1.信號量簡介

? 互斥鎖是同時只允許一個線程更改數據,而信號量(Semaphore)是同時允許一定數量的線程更改數據。

? 放行線程的原則:當允許的線程中某個線程先執行完成釋放鎖後,程序會立即再放行新的線程,將允許的線程數穩定在設置的數量,不會等到所有允許的線程都執行完成後再統一放行;

??註意,由於信號量(Semaphore)是同時允許一定數量的線程更改數據,如果這些允許的線程更改的是同一份數據,那麽有可能更改後的結果出現錯誤。


2.信號量的例子

import?threading
import?time


def?run():
????semaphore.acquire()??#?加鎖

????time.sleep(10)

????print("線程?%s?在運行"?%?threading.current_thread().name)

????semaphore.release()??#?釋放鎖


if?__name__?==?‘__main__‘:

????#?生成信號量實例,並設置最多允許3個線程可以同時修改數據
????semaphore?=?threading.BoundedSemaphore(3)

????for?i?in?range(10):??#?開啟10個線程
????????t?=?threading.Thread(target=run)
????????t.start()

while?threading.active_count()?!=?1:
????pass
else:
????print(‘所有線程都執行完畢‘)



九、事件

1.事件簡介

? python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法?set、wait、clear。

? 事件處理的機制:全局定義了一個標記,如果標記值為False,那麽執行event.wait方法時線程就會阻塞,如果標記值為True,那麽執行event.wait方法時線程不會阻塞。

? 任何線程都可以設置標記、重置標記和等待相同的事件;


2.事件的例子

import?threading
import?time


event?=?threading.Event()


def?traffic_lights():
????count?=?1

????#?創建事件對象後標記默認為False,這裏將標記設置為True,代表綠燈
????event.set()
????while?True:
????????if?5?<?count?<=?10:??#?紅燈時間5秒鐘
????????????event.clear()??#?將標記設置為False,代表紅燈
????????????print("\033[41;1m紅燈亮了\033[0m")
????????elif?count?>?10:??#?綠燈時間5秒
????????????event.set()??#?將標記設置為True,代表綠燈
????????????count?=?1??#?將count重置
????????????print("\033[42;1m綠燈亮了\033[0m")
????????else:
????????????print("\033[42;1m綠燈亮了\033[0m")

????????time.sleep(1)
????????count?+=?1


def?car(*args):
????while?True:
????????if?event.is_set():??#?標記為True,代表綠燈
????????????print("%s通行"?%?args[0])
????????????print("%s通行"?%?args[1])
????????????time.sleep(1)
????????else:
????????????print("停車等待")
????????????event.wait()??#?處理阻塞狀態,等待設置標記


light_obj?=?threading.Thread(target=traffic_lights)
light_obj.start()

for?i?in?range(2):??#?開啟兩個車的線程
????car_obj?=?threading.Thread(target=car,?args=("Tesla",?"Benz"))
????car_obj.start()



十、隊列

1.隊列簡介

? 當必須安全地在多個線程之間交換信息時,隊列在線程編程中特別有用。

??隊列用來進行線程間通訊,讓各個線程之間共享數據。

? 隊列的兩個作用:解耦和提高運行效率;


? 隊列和列表的區別:隊列中的數據取走就沒有了,而從列表中取數據是復制,只有手動刪除數據,數據才會沒有;


2.隊列的模式

①先入先出模式

隊列對象名?=?queue.Queue(maxsize=0)


②後入先出模式

隊列對象名?=?queue.LifoQueue(maxsize=0)


③往隊列中放入數據時設置優先級,按照優先級出隊列(數小的優先,字母靠前的優先),格式為一個元組:(優先級, 數據)

隊列對象名?=?queue.PriorityQueue(maxsize=0)


註意,maxsize默認值為零,當maxsize的值小於等於零時,隊列的大小是無限的。


3.隊列的其它方法

隊列對象.put(item,?block=True,?timeout=None):隊列沒有滿,將項目放入隊列中。隊列已滿時:

??●block的值為True,timeout的值為None:方法函數會一直阻塞到隊列中有空閑空間,並將項目放入隊列中為止;

??●block的值為True,timeout的值為一個正數:方法函數會一直阻塞到設置的超時時間為止,在超時時間內,如果隊列中有空閑空間就將項目放入隊列中,到了設置的超時時間就拋出queue.Full的異常。

??●block的值為False:直接拋出queue.Full的異常(在這種情況下,超時時間將被忽略);


隊列對象.put_nowait():隊列沒有滿,將項目放入隊列中。隊列已滿,就拋出queue.Full的異常。


print(隊列對象.get(block=True,?timeout=None)):隊列中有項目,就立即從隊列中移除並返回一個項目。隊列中沒有項目時:

??●block的值為True,timeout的值為None:方法函數會一直阻塞到隊列中有項目,並將項目從隊列中移除並返回為止;

??●block的值為True,timeout的值為一個正數:方法函數會一直阻塞到設置的超時時間為止,在超時時間內,如果隊列中有項目時就移除並返回一個項目,到了設置的超時時間就拋出queue.Empty的異常;

??●block的值為False:直接拋出queue.Empty的異常(在這種情況下,超時時間將被忽略);


print(隊列對象.get_nowait()):隊列中有項目,就立即從隊列中移除並返回一個項目。隊列中沒有項目,就拋出queue.Empty的異常。


print(隊列對象.qsize()):返回隊列中項目的個數;


print(隊列對象.empty()):如果隊列為空返回True,否則返回False;


print(隊列對象.full()):如果隊列已滿返回True,否則返回False;


4.隊列的實例

import?threading
import?time
import?queue


def?producer():
????count?=?1

????while?True:
????????#?隊列沒有滿,將骨頭放入隊列中
????????#?隊列已滿時,線程一直阻塞到隊列中有空閑空間,並將骨頭放入隊列中為止
????????q.put("骨頭%s"?%?count)
????????print("生產了骨頭%s"?%?count)

????????count?+=?1

????????time.sleep(0.5)


def?consumer(name):
????while?True:
????????#?隊列中有骨頭時,就立即從隊列中移除並返回一個骨頭
????????#?隊列中沒有骨頭時,線程一直阻塞到隊列中有骨頭,並將骨頭從隊列中移除並返回為止
????????print("%s?取到?%s?並吃了它"?%?(name,?q.get()))

????????time.sleep(1)


q?=?queue.Queue(10)

p?=?threading.Thread(target=producer)
c1?=?threading.Thread(target=consumer,?args=("哈士奇",))
c2?=?threading.Thread(target=consumer,?args=("泰迪",))

p.start()
c1.start()
c2.start()


Python基礎 - 第九天 - paramiko模塊、進程、線程