1. 程式人生 > >IO阻塞模型 非阻塞模型

IO阻塞模型 非阻塞模型

client __main__ pip -a 密鑰 paramiko authorize upper bcd

IO阻塞模型(blocking IO)

在linux中,默認情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:

技術分享圖片

所以,blocking IO的特點就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。

技術分享圖片 技術分享圖片
from socket import *

server = socket(AF_INET,SOCK_STREAM)
server.bind((127.0.0.1‘,8080))
server.listen(5)

while True:
    conn,addr = server.accept()
    print(addr)
    while True:
        try:
            data = conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()
技術分享圖片 技術分享圖片 技術分享圖片
from socket import *

client = socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1‘,8080))
while True:
    msg = input(>>:).strip()
    if not msg:continue
    client.send(msg.encode(utf-8))
    data = client.recv(1024)
    print(data.decode(utf-8))
client.close()
技術分享圖片

非阻塞IO模型

Linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:

技術分享圖片

所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel數據準備好了沒有。

技術分享圖片 技術分享圖片
# 1.對cpu的占用率過多,但是是無用的占用
# 2.在鏈接數過多的情況下不能及時響應客戶端的消息

from socket import *

server = socket(AF_INET,SOCK_STREAM)
server.bind((127.0.0.1‘,8080))
server.listen(5)
server.setblocking(False)  # 非阻塞型,默認為阻塞型True

conn_l = []
while True:
    try:
        conn,addr = server.accept()
        conn_l.append(conn)
        print(addr)
    except BlockingIOError:
        # print(‘幹其它活去了‘)
        # time.sleep(2)
        del_l = []
        for conn in conn_l:
            try:
                data = conn.recv(1024)
                if not data:  # 針對linux系統
                    conn.close()
                    del_l.append(conn)
                    continue
                conn.send(data.upper())
            except BlockingIOError:
                pass
            except ConnectionResetError:
                conn.close()
                del_l.append(conn)
        for conn in del_l:
            conn_l.remove(conn)
技術分享圖片 技術分享圖片 技術分享圖片
from socket import *

client = socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1‘,8081))
while True:
    msg = input(>>:).strip()
    if not msg:continue
    client.send(msg.encode(utf-8))
    data = client.recv(1024)
    print(data.decode(utf-8))
client.close()
技術分享圖片

IO多路復用

IO multiplexing這個詞可能有點陌生,但是如果我說select/epoll,大概就都能明白了。有些地方也稱這種IO方式為事件驅動IO(event driven IO)。我們都知道,select/epoll的好處就在於單個process就可以同時處理多個網絡連接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:

技術分享圖片

當用戶進程調用了select,那麽整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上還更差一些。因為這裏需要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。

強調:

1. 如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。

2. 在多路復用模型中,對於每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

結論: select的優勢在於可以處理多個連接,不適用於單個連接

技術分享圖片 技術分享圖片
from socket import *
import select

server = socket(AF_INET,SOCK_STREAM)
server.bind((127.0.0.1‘,8080))
server.listen(5)
server.setblocking(False)  # 非阻塞型,默認為阻塞型True

read_l = [server,]
print(strating....)
while True:
    rl,wl,xl = select.select(read_l,[],[])  # 整體的返回值是一個元組,rl為元組裏的一個列表
    # print(‘===>‘,rl)  # rl裏的值就是server對象或conn對象
    for r in rl:
        if r is server:
            conn,addr = r.accept()
            read_l.append(conn)
        else:
            try:
                data = r.recv(1024)
                if not data:
                    r.close()
                    read_l.remove(r)
                r.send(data.upper())
            except ConnectionResetError:
                r.close()
                read_l.remove(r)
技術分享圖片 技術分享圖片 技術分享圖片
from socket import *

client = socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1‘,8081))
while True:
    msg = input(>>:).strip()
    if not msg:continue
    client.send(msg.encode(utf-8))
    data = client.recv(1024)
    print(data.decode(utf-8))
client.close()
技術分享圖片

socketserver模塊

TCP

技術分享圖片 技術分享圖片
import socketserver

class MyTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print(========?>‘,self.request)  # self.request is conn
        while True:
            data = self.request.recv(1024)
            self.request.send(data.upper())

if __name__ == __main__:
    # socketserver.ForkingTCPServer  這個模塊的多進程只能在linux上用
    server = socketserver.ThreadingTCPServer((127.0.0.1‘,8080),MyTCPHandler)
    server.serve_forever()
技術分享圖片 技術分享圖片 技術分享圖片
from socket import *

client = socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1‘,8081))
while True:
    msg = input(>>:).strip()
    if not msg:continue
    client.send(msg.encode(utf-8))
    data = client.recv(1024)
    print(data.decode(utf-8))
client.close()
技術分享圖片

UDP

技術分享圖片 技術分享圖片
import socketserver

class MyTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print(========?>‘,self.request)  # self.request 是一個元組,第一個值是客戶端發來的消息,第二個值是一個套接字對象
        client_data=self.request[0]
        self.request[1].sendto(client_data.upper(),self.client_address)

if __name__ == __main__:
    # socketserver.ForkingTCPServer  這個模塊的多進程只能在linux上用
    server = socketserver.ThreadingUDPServer((127.0.0.1‘,8080),MyTCPHandler)
    server.serve_forever()
技術分享圖片 技術分享圖片 技術分享圖片
from socket import *

client = socket(AF_INET,SOCK_DGRAM)
while True:
    msg = input(>>:).strip()
    if not msg:continue
    client.sendto(msg.encode(utf-8‘),(127.0.0.1‘,8080))
    data,server_addr = client.recvfrom(1024)
    print(data.decode(utf-8))
client.close()
技術分享圖片

paramiko模塊

paramiko是一個用於做遠程控制的模塊,使用該模塊可以對遠程服務器進行命令或文件操作,值得一說的是,fabric和ansible內部的遠程管理就是使用的paramiko來現實

下載安裝

pip3 install paramiko #在python3中

SSHClient

用於連接遠程服務器並執行基本命令

基於用戶名密碼連接:

技術分享圖片 技術分享圖片
import paramiko

# 創建SSH對象
ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務器
ssh.connect(hostname=120.92.84.249‘, port=22, username=root‘, password=xxx)

# 執行命令
stdin, stdout, stderr = ssh.exec_command(df)
# 獲取命令結果
result = stdout.read()
print(result.decode(utf-8))
# 關閉連接
ssh.close()
技術分享圖片

基於公鑰密鑰連接:

客戶端文件名:id_rsa

服務端必須有文件名:authorized_keys(在用ssh-keygen時,必須制作一個authorized_keys,可以用ssh-copy-id來制作)

技術分享圖片 技術分享圖片
import paramiko

private_key = paramiko.RSAKey.from_private_key_file(/tmp/id_rsa)

# 創建SSH對象
ssh = paramiko.SSHClient()
# 允許連接不在know_hosts文件中的主機
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 連接服務器
ssh.connect(hostname=120.92.84.249‘, port=22, username=root‘, pkey=private_key)

# 執行命令
stdin, stdout, stderr = ssh.exec_command(df)
# 獲取命令結果
result = stdout.read()
print(result.decode(utf-8))
# 關閉連接
ssh.close()
技術分享圖片

SFTPClient

用於連接遠程服務器並執行上傳下載

基於用戶名密碼上傳下載

技術分享圖片 技術分享圖片
import paramiko
 
transport = paramiko.Transport((120.92.84.249‘,22))
transport.connect(username=root‘,password=xxx)
 
sftp = paramiko.SFTPClient.from_transport(transport)
# 將location.py 上傳至服務器 /tmp/test.py
sftp.put(/tmp/id_rsa‘, /etc/test.rsa)
# 將remove_path 下載到本地 local_path
sftp.get(remove_path‘, local_path)
 
transport.close()

IO阻塞模型 非阻塞模型