1. 程式人生 > >老男孩14期自動化運維day10隨筆和作業

老男孩14期自動化運維day10隨筆和作業

1.IO(磁碟,網路等)操作不佔用CPU
計算佔用CPU,例如1+1

多執行緒使用場景:python多執行緒不適合CPU密集操作型的任務,適合IO密集型的任務(例如socket server )

2.程序
每一個程序都是由預設父程序啟動的(每一個子程序都是由主程序啟動的)
比如在pycharm啟動程式 ,在windows上是pycharm為父程序:主程序的父程序為pycharm
比如在linux終端啟動程式,在linux上是terminal為父程序,主程序的父程序為terminal
兩個方法:os.getpid() 獲取程序號
os.getppid() 獲取父程序號

建立程序與執行緒類似

from multiprocessing import Process

def func(a):
  pass
p=Process(target=func,args=(a,))
p.start()

3.多程序(multiprocess)
多程序間的通訊:不同程序之間是不允許訪問對方記憶體的,多程序要實現通訊,只能通過以下方式
----程序Queue
----pipe 管道

以下為後兩種方式詳細解釋:
(1)程序通過程序Queue進行通訊
與執行緒的queue不一樣 只能用於程序通訊的特殊的queue叫程序Queue

from multiprocessing import
Process,Queue def f(q): q.put([42,None,'1']) if __name__=='__main__': q=Queue() p=Process(target=f,args=(q,)) p.start() print(q.get())

上述過原理為: 在主程序通過程序QUEUE讀到了子程序往q裡存的資料
解析過程:

其實是兩個queue,主程序開啟queue,把queue作為引數傳入子程序中,是複製了另一個queue給子程序(pickle序列化
然後 子程序往queue傳資料,再pickle反序列化給主程序的queue
只是實現了這個程序的資料傳給另一個程序

(2)PIPE管道

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello from child'])
    conn.send([42, None, 'hello from child3'])
    print("",conn.recv())
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe() # 生成管道有兩頭,返回兩個值
    p = Process(target=f, args=(child_conn,))
    p.start()
    print("parent",parent_conn.recv())  # prints "[42, None, 'hello']"
    print("parent",parent_conn.recv())  # prints "[42, None, 'hello']"
    parent_conn.send(" from hshs")  # prints "[42, None, 'hello']"
    p.join()

就是生成一個Pipe管道物件,管道有兩頭,返回兩個值,在這裡兩頭沒有準確定義必須是傳資料的還是收資料的

(2)程序間的資料共享:
本來程序之間記憶體不能互相訪問,但是可以通過manager實現程序間的資料共享
----通過manager

from multiprocessing import Process, Manager
import os
# 程序間的共享資料
# 其實也是copy了十個資料 最後彙總,而且內部加了鎖 ,不用人為加鎖


def f(d, l):
    d[os.getpid()]=os.getpid()
    l.append(1)
    print(l,d)


if __name__ == '__main__':
    with Manager() as manager: # 和 manger=Manager() 一樣
        d = manager.dict() # 生成一個可在多個程序之間共享、傳遞的字典

        l = manager.list(range(5)) # 生成一個可在多個程序之間共享、傳遞的list

        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list: # 等待結果
            res.join()
        l.append("from parent")
        print(d)
        print(l)

原理:建立manager物件,通過manager.dict()生成一個可以再多程序間共享、傳遞的字典物件,通過manager.list() 生成一個可以在多程序間共享、傳遞的列表物件

其實就是copy了十個資料,最後彙總,而且內部加了鎖,所以這裡不用加鎖

4.程序鎖
本身程序之間記憶體不共享,為什麼還需要程序鎖?

原因:主要是保證在螢幕上列印的時候不亂

from multiprocessing import Process, Lock

# 程序鎖
# 本身程序之間記憶體不共享,為什麼還需要程序鎖?
# 原因:主要是保證在螢幕上列印的時候不亂

def f(l, i):
    #l.acquire()
    print('hello world', i)
    #l.release()


if __name__ == '__main__':
    lock = Lock()

    for num in range(100):
        Process(target=f, args=(lock, num)).start()

5.程序池

程序啟動開銷比執行緒啟動大很多,所以python有程序池 概念防止電腦崩潰。執行緒池可以通過訊號量自己定義

在windows上啟動多程序跟linux不一樣 ,要import freeze_support或者加if name==‘main

from multiprocessing import Process,Pool,freeze_support

# 在windows上啟動多程序跟linux不一樣 ,要import freeze_support或者加if __name__=='__main__'

import time
import os
def Foo(i):
    time.sleep(2)
    print("in process ",os.getpid())
    return  i+100

def Bar(arg):
    print("--->exec done:",arg)

if __name__=='__main__':
    #freeze_support()
    pool=Pool(5) # 和 pool=Pool(process=5)一樣  意思為 允許程序池同時放入5個程序,同時執行的程序只有五個

    for i in range(10):# 啟動了 但是被放進程序池的程序才會執行
        pool.apply_async(func=Foo,args=(1,),callback=Bar)  # callback=回撥  執行完Foo 再執行Bar ,注意:回撥是主程序呼叫的而不是子程序,例如備份資料庫時候只用父程序建立連線,子程序去回撥父程序的連線 而不用多次建立連線
        # pool.apply(func=Foo,args=(i,)) # 序列
        # pool.apply_async(func=Foo,args=(1,)) # 並行
    print('end')
    pool.close()
    pool.join() # 等所有程序結束 不寫join 主程序不會等子程序結束

# 注意python的要求(官方文件都沒寫),先close再join(死記硬背),如果把join註釋了,不等子程序執行完畢程式就關閉了

過程:通過Pool()建立一個pool物件。

pool=Pool(5) 與 pool=Pool(process=5) 表示允許同時放入5個程序,同時執行的程序只有五個,跟多執行緒的訊號量一樣。

在for迴圈裡啟動程序,但這是還沒有啟動,只有放入程序池的程序才能執行。

pool.apply(func=Foo,args=(1,)) 這是序列執行程序
pool.apply_async(func=Foo,args=(1,)) 這是並行執行程序(非同步)
pool.app;y_async(func=Foo,args=(1,),callback=Bar)
這裡callback是回撥函式,執行完Foo,再執行Bar,注意:回撥是主程序的呼叫而不是子程序的呼叫,例如備份資料庫時候只用父程序建立連線,子程序去回撥父程序的連線
而不用多次建立連線

最後很重要的一點:python的官方要求文件都沒寫,必須先close再join(死記硬背),如果把join註釋了,不等子程序執行完畢程式就關閉了。(而不像多執行緒時,先join再close)

pool.close()
pool.join()

另外:這裡再將一下_ _name _ _的的意思:

如果再程式里加入 if name== 'main’則是手動執行的時候會執行
如果把該程式當做模組被另外程式import後,另外的程式不執行 if name== 'main’裡的內容
____ main__代表主程序的__name_ 子程序為__mp_main__的__name__
所以 if name=='main’是判斷是否__name__為主程序
(就是當前該程式手動執行自己,被其他模組匯入則不執行裡面的內容)

6.協程
(微執行緒)是一種使用者態的輕量級執行緒
協程為什麼很快:協程是遇到IO操作就切換,所以剩下都是CPU操作就很快
執行緒的切換是儲存在cpu的暫存器裡,而協程擁有自己的暫存器上下文和棧。
可以在單執行緒下實現併發效果(實際上還是序列 因為切換時間很快,所以在使用者視角下是並行)

優點:
1.無需執行緒上下文切換的開銷
2.無需原子操作的鎖定及同步的開銷(改變數可以叫原子操作)
3.方便切換控制流,簡化程式設計模型
4.高併發、高擴充套件、低成本:一個cpu支援上萬的協程都不是問題

缺點:
1.因為是單執行緒,無法利用多核資源,它不能同時將單個CPU的多個核用上(不能多核),協程
需要和程序配合才能執行在多CPU上,除非是CPU密集型應用
2.進行阻塞(Blocking)操作(如IO)會阻塞掉整個程式

注意:當一個函式含有yield關鍵字時 ,第一次呼叫它是變成一個生成器,必須加__next__()才執行

兩種切換方式
(1)手動切換 greenlet


from greenlet import greenlet
# 手動切換  gevent 封裝了greenlet
def test1():
    print(12)
    gr2.switch() # 切換gr2
    print(34)
    gr2.switch()
def test2():
    print(56)
    gr1.switch() # 切換gr1
    print(78)

gr1 = greenlet(test1) #啟動一個協程
gr2 = greenlet(test2)
gr1.switch()

(2)自動切換 gevent

gevent自動IO切換 封裝了greenlet (手動IO切換),可以通過greenlet實現併發同步或非同步程式設計

import gevent
# 自動IO切換


def foo():
    print('Running in foo')
    gevent.sleep(2)
    print('Explicit context switch to foo again')
def bar():
    print('Explicit精確的 context內容 to bar')
    gevent.sleep(1)
    print('Implicit context switch back to bar')
def func3():
    print("running func3 ")
    gevent.sleep(0)
    print("running func3  again ")


gevent.joinall([
    gevent.spawn(foo), # 啟動協程
    gevent.spawn(bar),
    gevent.spawn(func3),
])

程式碼中 sleep只是gevent模擬io消耗的時間代指類似於io的消耗的時間

gevent.spawn(協程) 啟動協程,遇到IO操作自動切換

執行過程為:
foo 先列印第一句 然後遇到io 切換給bar列印第一句 然後遇到io 切換給func3 列印第一句 然後遇到io
又給foo 發現foo還在等io,就給bar bar 也在等io ,又給func3 io結束後 列印第二句,返回給foo 還在等io給bar,bar 等io結束列印第二句 發給foo,foo等待io結束 列印foo第二句
這個單執行緒的非同步執行,只要2s ,但是不使用協程要3s,要2s是指單執行緒中io等待時間最多的點

通過gevent自動切換協程能實現什麼?

(1)很牛逼的實現:gevent實現大併發單執行緒socket server(通過協程)

from gevent import socket, monkey

monkey.patch_all()


# 通過gevent自動切換協程實現單執行緒下的socket併發(很牛逼!!!)
# 大併發socket server(單執行緒)


def server(port):
    s = socket.socket()
    s.bind(('0.0.0.0', port))
    s.listen(500)
    while True:
        cli, addr = s.accept()
        gevent.spawn(handle_request, cli)


def handle_request(conn):
    try:
        while True:
            data = conn.recv(1024)
            print("recv:", data)
            conn.send(data)
            if not data:
                conn.shutdown(socket.SHUT_WR)

    except Exception as  ex:
        print(ex)
    finally:
        conn.close()


if __name__ == '__main__':
    server(9999)

(2)gevent實現簡單大併發單執行緒爬蟲

import gevent,time
from urllib import request
from gevent import monkey

#簡單協程大併發爬網頁


monkey.patch_all() # 把當前程式的所有io操作給我單獨做上標記

def f(url):
    print('GET:%s'%url)
    resp=request.urlopen(url)
    data=resp.read()
    print('%d bytes received from %s .'%(len(data),url))


urls=['https://www.python.org/',
      'https://www.yahoo.com/',
      'https://github.com/'
]
time_start=time.time()
for url in urls:
    f(url)
print("同步cost:",time.time()-time_start)


async_time_start=time.time()

gevent.joinall([
    gevent.spawn(f,'https://www.python.org/'),
    gevent.spawn(f,'https://www.yahoo.com/'),
    gevent.spawn(f,'https://github.com/'),

])
print("非同步cost:",time.time()-async_time_start)

(沒加monkey.path.all()前提下)時間一樣是因為gevent 跟urllib沒關係 ,gevent不知道urllib在做io ,所以就沒有切換,可以通過加入 monkey補丁

monkey.patch_all() # 把當前程式的所有io操作給我單獨做上標記,標記他是IO操作,遇到他就切換

7.論事件驅動與非同步IO

通常,我們寫伺服器處理模型的程式時,有以下幾種模型:
(1)每收到一個請求,建立一個新程序,來處理該請求
(2)每收到一個請求,建立一個新執行緒,來處理該請求
(3)每收到一個請求,放入一個事件列表,讓主程序通過非阻塞IO方式來處理請求(協程)

io是作業系統執行的(就是利用事件驅動模型把io操作扔到作業系統中一個佇列,io執行完後呼叫回撥函式告知你執行完的標記)
上面的幾種方式,各有千秋:
第(1)種方法,由於建立新的程序的開銷比較大,所以,會導致伺服器效能比較差,但實現比較簡單
第(2)種方法,由於要涉及到執行緒的同步,有可能面臨死鎖等問題
第(3)種方法,在寫應用程式程式碼時,邏輯比前面兩種都複雜
綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數網路伺服器採用的方式

事件驅動模型:

1.有一個事件(訊息)佇列
2.例如滑鼠按下時 ,往這個佇列中增加一個點選事件(訊息)
3.有個迴圈,不斷從佇列中取出事件,根據不同的時間,呼叫不同的函式,如onClick()、onKeyDown()等
4.事件(訊息)一般都各自儲存各自的處理函式指標,這樣,每個訊息都有獨立的處理函式

在這裡插入圖片描述

事件驅動程式設計是一種程式設計正規化,這裡程式的執行流由外部事件決定

8.IO多路複用

程序的阻塞
只有處於執行態的程序,才有可能轉為阻塞狀態。當程序進入阻塞狀態時,不會佔用CPU資源

檔案描述符 fd:
就是一組非負整數,是作業系統內部檔案記錄表(有序的,存放的是控制代碼物件)的索引值,作業系統拿到檔案描述 符,從檔案記錄表中找到檔案控制代碼物件,從物件中操作資料。在unix,linux上才有檔案描述符的概念

快取IO:
又被稱作標準I/O,大多數檔案系統預設IO都是快取IO。在linux快取IO機制中,資料會先拷貝到作業系統核心的快取 區中,然後從作業系統核心再拷貝到應用程式的地址空間(比如socket中,兩次send會發送在一起(黏包),是因為系統為了減少作業系統核心拷貝
到應用程式的開銷。)(核心態—》使用者態的資料轉換) 快取IO缺點:這些資料拷貝對cpu以及記憶體的開銷是非常大的

9.IO五種網路模式(有一種驅動IO很少用)

情景:使用者有個read操作

1-3都是同步IO(synchronous IO 必須等核心態到使用者態的轉變)
(1)阻塞IO(blocking iO)

在linux ,預設情況下所有的socket都是阻塞IO (blocking IO) 使用者傳送read操作到核心
核心中沒有資料,在等待資料被髮送過來,此時使用者程序在等待,當核心中有資料後,再返回給使用者。 使用者在等待的時候就是阻塞I/O

(2)非阻塞IO(nonblocjing io)

linux下可以通過設定socket為nonblocking
使用者傳送read操作到核心,核心中沒有資料,使用者不用等核心是或否有資料,核心沒有資料會發送一個error到使用者,使用者收到
核心的資訊做判斷,當為error的時候可以去做其他事,收到資料之後再處理資料 所以 nonblocking
IO的特點是使用者程序徐不斷的主動詢問kernel資料好了沒有,可以實現使用者視角下的單執行緒多併發
但是在核心態到使用者態 如果資料過大 還是會阻塞

(3)I/O多路複用(IO multiplexing或者 event driven IO 事件驅動IO)

常用的select poll epoll 是建立在非租塞IO的情況下,因為非阻塞IO情況下,在等待
接受資料的時候沒有阻塞,但是在拷貝資料的時候,如果從核心拷貝到使用者的資料太大,則會阻塞,這是IO多路複用要解決的問題。

三種方式 select poll epoll:
select (windows,linux) 例如多個連線 迴圈這些連線(例如有一百個連結,就迴圈這個一百個,有一個返回資料就返回給使用者),任意一個返回就返回訊號(缺點,檔案描述符上限1024,當然可以自行修改,如果要迴圈連線(陣列輪詢)過多,容易浪費資源)
poll 沒有最大檔案描述符限制(基於select優化 但還是有select的缺點)

epoll (最流行的,windows不支援,linux2.6核心.Django就是用的這個,例如nginx)

(1)epoll_create 建立一個epoll物件,一般epollfd = epoll_create()

(2)epoll_ctl (epoll_add/epoll_del的合體),往epoll物件中增加/刪除某一個流的某一個事件

比如

epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//註冊緩衝區非空事件,即有資料流入

epoll_ctl(epollfd, EPOLL_CTL_DEL, socket,
EPOLLOUT);//註冊緩衝區非滿事件,即流可以被寫入

(3)epoll_wait(epollfd,…)等待直到註冊的事件發生

(注:當對一個非阻塞流的讀寫發生緩衝區滿或緩衝區空,write/read會返回-1,並設定errno=EAGAIN。而epoll只關心緩衝區非滿和緩衝區非空事件)。

4.非同步IO(asynchronous IO,不用等核心態到使用者態)—用得少(其實很多叫非同步IO都用的是IO多路複用 epoll)

發起一個read操作,立刻返回,所以不會對使用者程序產生任何block。然後kernel會等待資料準備完成,然後拷貝到使用者記憶體,
當這一切完成之後,kenel會給使用者程序傳送一個signal,告訴他read操作已完成。

注意 :這裡拷貝完成才會給使用者程序傳送一個signal,所以使用者程序是不會阻塞的,使用者程序只是把任務丟給核心,可以去做其他事,當他收到核心傳送的signal時就知道資料已經從核心態到使用者態了。

以上這四種網路模式圖解:
在這裡插入圖片描述

一個單執行緒下通過select方式實現IO多路複用的socket server 例子:

import select
import socket
import queue

# 單執行緒下的io 多路複用的selcet實現socket_server


server=socket.socket()
server.bind(('localhost',9000))
server.listen(1000)

# 設定為非阻塞模式

server.setblocking(False) # 不阻塞



inputs=[server]
outputs=[]



while True:
  readable,writeable,exceptional=select.select(inputs,outputs,inputs)

  print(readable,writeable,exceptional)
  for r in readable:
    if r is server: # 如果是server 代表來了一個新連線
      conn,addr=server.accept()
      print("來了個新連線",addr)
      inputs.append(conn)   # 是應為這個新建立的連線還沒發資料過來,現在就接受的話程式就要報錯
         # 所以要想實現這個客戶端發資料來時,server端能知道,就需要讓select再監測這個conn
    else:  # 如果是之前的conn 表示發資料了
      data=r.recv(1024)
      print("收到資料",data)
      r.send(data)
      print("send done..")

server.setblocking(False) 設為非阻塞模式

兩個列表 inputs 和 outputs
select 去迴圈去inputs列表裡的物件

在inputs列表裡的物件首先必須要是server本身,其次是conn連線例項。

如果是server 代表來了一個新連線
然後 inputs.append(conn) 是應為這個新建立的連線還沒發資料過來,現在就接受的話程式就要報錯 ,所以要想實現這個客戶端發資料來時,server端能知道,就需要讓select再監測這個conn
如果是conn 表示這個連線開始發資料了