進階2
Day1
python總一切皆物件
type->class->obj
object是最高的父類,本身繼承None
type(type(1))
type
type(object)
type
type.__base__
object
python中的物件有基本的三個特徵:
id(身份)type(型別)value(值)
a=[] id(a)
type(a)
list
a
[]
python中不同的型別:
1 None型別,直譯器剛開始執行就生成了一個None型別物件所有用到了None的地方指向的都是同一個全域性的None
a=None b=None print(id(a)==id(b)) c=[] d=[] print(id(c)==id(d))
True False 2 數值型別 四種
print( type(1), type(0.2), type(True), type(complex(1)) )
<class 'int'> <class 'float'> <class 'bool'> <class 'complex'> 3 迭代型別 可用for迭代的物件 實現了iter方法或者getitem方法的物件4 序列型別 list tuple arry str range bytes bytearrymemoryview(二進位制序列)5 對映型別 k-v 字典,queryset等6 集合型別set frozenset ,set和對映型別底層實現一致效率較高7 上下文管理型別 with 語句所形成的型別8 模組型別(package也是一種型別)9 class和例項型別10 函式型別11 方法型別12 程式碼型別(程式碼本身也會被解釋為一種型別!!!)13 object物件14 type型別15 ellipsis型別省略號型別?!16 notimplemented型別就是類物件
Day2
魔法函式
以雙下劃線開頭結尾,具有特殊功能的函式 一般不要去自己定義,過載python提供給我們的即可
python的資料模型是python底層實現機制重要的一環我們可以通過過載這些魔法函式來實現自己想要的功能
先列出主要的魔法函式
非數學運算類:
字串表示:
repr():
__str__(): 集合序列相關: __len__(): __getitem__(): __setitem__(): __delitem__(): __contains__(): 迭代相關: __iter__(): __next__(): 可呼叫 __call__(): with上下文管理器 __enter__(): __exit__(): 數值轉換 __abs__(): __bool__(): __int__(): __float__(): __hash__(): __index__(): 元類相關 __new__(): __init__(): 屬性相關 __getattr__(): __setattr__(): __getattribute__(): __setattribute__(): __dir__(): 屬性描述符 __get__(): __set__(): __delete__(): 協程 __await__(): __aiter__(): __anext__(): __aenter__(): __aexit__():
數學運算類:
str 和 repr
class company(object): def __init__(self,em): self.em=em def __str__(self): return "employee:" +",".join(em) def __repr__(self): return "開發者模式下不用列印就可以呼叫" def __len__(self): return len(self.em) em=["bobby","tom","bob","jhon"] com=company(em) print(com) print("len的用法: ",len(com)) com
employee:bobby,tom,bob,jhon len的用法:4 開發者模式下不用列印就可以呼叫
str(self):
在直接列印這個物件時就會呼叫該方法,若沒有定義則會列印一些基本屬性
repr(self):
在開發者模式中直接呼叫這個物件則會呼叫repr方法python內建的資料型別是用Cpython實現的,內建方法是C語言實現的效率會很高
例如 len 方法是在資料結構內維護一個長度,呼叫len的時候直接從記憶體讀取會快很多
for 對一個物件進行遍歷的時候會先尋找iter方法,找不到會自動尋找getitem方法
Day3
鴨子型別和python的多型性 走路像鴨子叫聲像鴨子長得像鴨子 那麼就是鴨子。只要物件擁有這個特性那就是這個型別就可以呼叫相關的方法
class Cat(object): def say(self): print("hahaha...") class Dog(object): def say(self): print("15551") l=[] l.append(Dog()) l.append(Cat()) for i in l: i.say()
15551 hahaha...
不同型別的物件只要擁有相同名字的方法就可以呼叫,傳入物件的控制代碼即可
li=[1,2] se=(3,4) li.extend(se) li
[1, 2, 3, 4]
只要是相同的基礎資料型別(迭代型別,序列型別)方法可以混用
getitem:根據傳入的int引數,返回一個列表中的元素
iter:返回一個可迭代物件
next:當被迭代時,返回下一個迭代的物件
class Test(): def __init__(self): self.n=[1,2,3,4] def __getitem__(self,i): return "it is string"[i] def __iter__(self): for i in range(4): yield self.n[i] test=Test() for i in range(10): print(test[0:i]) for x in test: print(x) #和下面是等價的,不過__iter__是返回一個可迭代物件,而iter()直接把這這個可迭代物件拿出來轉換 test1=iter(test) next(test1) next(test1) next(test1) next(test1) next(test1)
i it it it i it is it is it is s it is st it is str 1 2 3 4 --------------------------------------------------------------------------- StopIterationTraceback (most recent call last) <ipython-input-52-1fe123b9197d> in <module>() 23 next(test1) 24 next(test1) ---> 25 next(test1) StopIteration:
next(x)=x. next ,關於迭代有兩個概念,第一個是Iterable(可迭代物件)第二個是Iterator(迭代器)
協議規定Iterable的 iter 方法會返回一個Iterator,
Iterator的 next 方法會返回下一個迭代物件,如果迭代結束則丟擲StopIteration異常。iter(itr, 'c') 這個意思是說,返回itr的iterator,而且在之後的迭代之中,迭代出來'c'就立馬停止。對這個itr有什麼要求呢?這個itr在這裡必須是callable的,即要實現 call 函式定義了 getitem 方法之後該物件就可以被下標和切片
抽象基類
class company(object): def __init__(self,l): self.l=l def __len__(self): return 5 from collections import Sized #物件是否有長度 isinstance(company(1),Sized) #物件是否有某個屬性,傳入的是控制代碼 hasattr(company(1),"__len__")
True
實現一個抽象基類最簡單的方法就是在要做基類的類方法中丟擲異常,使其必須重寫這個方法
class AbstractBase(object): def a(): raise NotImplementError def b(): raise NotImplementError #更優雅一些的方法 import abc#自帶的abc,可以做裝飾器用 class A(metaclass=abc.ABCMeta): @abc.abstractmethod def my(self): pass A()
--------------------------------------------------------------------------- TypeErrorTraceback (most recent call last) <ipython-input-59-dc359d224db9> in <module>() 12def my(self): 13pass ---> 14 A() TypeError: Can't instantiate abstract class A with abstract methods my
abc中強制規定了abstractmethod,繼承時若不實現這一方法就無法例項化
from collections.abc import *
已經做好的一些基類可以拿來繼承
實現了 subclasshook 用於型別判斷
python的鴨子型別是根本,抽象基類可以給我們提供一些介面的強制規定和幫助我們自定義型別等等
class A (): pass class B(A): pass b=B() isinstance(b,A)
True
is 和 ==不能混用
is是判斷是不是同一個物件通過id()的指向,而相等是判斷值相等,isinstance判斷物件是否在繼承鏈的型別中
功能類,以MixIn命名結尾多繼承關係中作為功能類繼承,設計實現單一功能,相當於代替介面
類變數和物件變數
class A(): aa=1 def __init__(self,x,y):#self 實在例項化的時候傳遞進來的例項本身 self.x=x self.y=y a=A(2,3) a.aa=100 print(a.aa,a.x,a.y) print(A.aa)
再呼叫的時候,會首先查詢例項變數再去查詢類變數
在沒有初始化a的時候存在一個A的類是一個類物件,例項化的時候成為一個類物件的例項
對類例項進行賦值,會影響後續類物件,對類物件進行賦值,則只會影響本類物件
python3中使用C3演算法解決菱形繼承
class Date(): def __init__(self,y,m,d): self.year=y self.month=m self.day=d def __str__(self): return r"{0}/{1}/{2}".format(self.year,self.month,self.day) def tomorrow(self): self.day+=1 date=Date(2018,3,4) print(date) date.tomorrow() print(date)
2018/3/4 2018/3/5
呼叫date.tomorrow()相當於tomorrow(date)只會改變當前物件的屬性,類屬性應該寫成Date.day+=1
class Date(): def __init__(self,y,m,d): self.year=y self.month=m self.day=d def __str__(self): return r"{0}-{1}-{2}".format(self.year,self.month,self.day) def tomorrow(self): self.day+=1 @staticmethod def vaild(string): temp=string.split("-") if temp[0]: print("valited") @classmethod def dformat(cls,string): temp=string.split("-") for i in range(3): temp[i]=int(temp[i]) return cls(temp[0],temp[1],temp[2]) date=Date(2018,3,4) string=str(date) print(Date.dformat(string))
2018-3-4
類方法和靜態方法,靜態方法一般是用來驗證是否符合規範,不用再新建一個物件,而類方法可以直接返回例項物件
class Date(): birthday=1995 def __init__(self,birthday): self.birthday=birthday class User(Date): def __init__(self): self.age=2018-super().birthday def __repr__(self): return str(self.age) user=User() user print(user.__dict__,User.__dict__)
{'age': 23} {'__module__': '__main__', '__init__': <function User.__init__ at 0x0000025EDB51DE18>, '__repr__': <function User.__repr__ at 0x0000025EDB5606A8>, '__doc__': None}
自省機制
__attr不能直接訪問,相當於private,但也可_class__attr來訪問
dict可以直接下標訪問是存貯屬性的字典,也可以動態修改
dir會把所有的屬性特有的和基礎的完整的列出來
super函式是按照 mro 順序來呼叫的,BFS,如果super有預設可以直接讓父類的建構函式來完成初始化
Mixin的特點
和基類是弱相關或不相關
Mixin功能單一
不要使用Mixin的super
python的try except和finally語句的return執行到error 時進入except 棧頂push 一個值,不返回,先執行finally語句再從棧頂拿出一個值pop
上下文管理器的協議,python是基於協議的執行
class Sample(object): S="山口山" def hah(self): print("aa") def __enter__(self): print("start") def __exit__(self,exc_type,exc_val,exc_tb): print("end") @classmethod def get(cls): return cls() def gett(): return Sample() a=Sample() # a.__dict__ # dir(a) with gett() as sample: print(type(sample)) dir(sample) sample.hah() print(sample.S)
start <class 'NoneType'> end --------------------------------------------------------------------------- AttributeErrorTraceback (most recent call last) <ipython-input-172-7d068effdd73> in <module>() 23print(type(sample)) 24dir(sample) ---> 25sample.hah() 26print(sample.S) AttributeError: 'NoneType' object has no attribute 'hah'
import contextlib @contextlib.contextmanager def p(): print("前操作") yield {} print("後操作") with p() as pp: print("中間的操作")
前操作 中間的操作 後操作
Day5
序列 容器序列 扁平序列 可變/不可變
在 collections abc中有兩個抽象基類 Squence MutableSequence序列和多物件序列
Sequence序列類繼承了Reversible可反轉,Collection,Collection繼承了Sized長度,Iterable可迭代,Container容器->absctactmethod contains 負責is以以及 setitem delitem
在序列中+ += extend的區別
a=[1,2] c=a+[3,4] c+=[5,6] c
[1, 2, 3, 4, 5, 6]
+=實際上時abc中的 iadd 呼叫的時extend ->執行for 迴圈一個個append到序列中,所以可以接受一個序列即可,在原序列中直接加,而+是產生一個新的序列
切片操作是會返回一個新的列表
class Group (): def __init__(self,groupname,companyname,member): self.group_name = groupname self.company = companyname self.member = member def __reversed__(self): pass#自行解決 def __getitem__(self, item): cls=type(self) if isinstance (item,slice): return cls(group_name=self.group_name,companyname=self.company,member=self.member[item]) elif isinstance(item,numbers.Integral): return cls(group_name=self.group_name,companyname=self.company,member=[self.member[item]]) #切片slice物件和item是重點 def __iter__(self): pass#自行解決 def __contains__(self,item): if item in self.member: return True else: return False # 用if 來解決 in
維護一個已排序的序列
import bisect #your class&function here if __name__ == "__main__": l=[] bisect.insort(l,1) bisect.insort(l, 3) bisect.insort(l, 5) bisect.insort(l, 6) bisect.insort(l, 2) print(l) bisect.insort_left(l,5) bisect.insort_right(l,2.9) print(l)
[1, 2, 3, 5, 6] [1, 2, 2.9, 3, 5, 5, 6]
import arry from collections import deque #留坑
列表生成式效能是高於列表普通操作的,可以用函式來進行操作
def x(y): print(y) return 2*y#返回值會被新增進列表中 l=[x(i) for i in l if i %2==1] l1=(x(i) for i in l if i %2==1) print(type(l),type(l1))#生成器表示式 print(l,l1) dic={"a":15,"b":56,"c":88} reverseddic={v:k for k,v in dic.items()} print(reverseddic)#鍵值對反轉
Day6
dict是一個Mapping型別,abc繼承關係是
-Container as Container,
-Hashable as Hashable,
-Iterable as Iterable,
Iterator as Iterator,
-Sized as Sized,
Callable as Callable,
-Mapping as Mapping,
MutableMapping as MutableMapping,
Sequence as Sequence,
MutableSequence as MutableSequence,
Set as Set,
MutableSet as MutableSet,
MappingView as MappingView,
ItemsView as ItemsView,
KeysView as KeysView,
ValuesView as ValuesView,
深拷貝改變拷貝物件不會改變原物件,只要拷貝的物件是非容器型別(比如字串,數字和其他原子型別)就不能進行拷貝而是新增引用
import copy a={} b=a c=a.copy()#淺拷貝,複製了一個其他物件,但是不會把字典中的字典和列表進行修改 d=copy.deepcopy(a)#深拷貝,遞迴複製 b["c"]=5 c["d"]=6 print(a,b,c,d)
{'c': 5} {'c': 5} {'c': 6} {}
a=["a","b","c"] dic={} dic.fromkeys(a,"沙雕牛肉")#傳入一個 可迭代物件,預設值 生成字典 dic.get("想取的索引","取不到就返回的預設值") dic.setdefault("索引","值")#先新增或更改一個鍵值對,然後取出 dic.update(Iterable)#將這個可迭代物件一般是鍵值對形式新增到字典中去,[(a,b),(c,d)] a=b,c=d {"a":"b","c":"d"}
'預設值'
python原生的一些資料結構list dict等等直接繼承在某些情況下不會生效,最好繼承collections UserDict,list同理
set是一個無序不重複集合修改用add()del()pop()等等,frozenset是不可變集合,無法修改但是可以作為dict的key
s=set("def")#會把迭代物件分別作為元素 s.add("abc")#直接新增 s.update("abc")#會把迭代物件分別作為元素 d=set("defgh") s.difference(d)#比較兩個set的不同,取差集,是一個新的set,主要的操作 |並 &交 -差
{'a', 'abc', 'b', 'c'}
python中的垃圾回收演算法是引用計數,一個變數有n個引用,當引用計數變為 0 即回收垃圾,del函式可完成更高階的垃圾回收
引數絕對不能用可變物件(list)
collections的用法
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/23 16:34" #import your model here import collections from collections import namedtuple,defaultdict,deque,Counter,ChainMap from functools import partial#包裹函式返回控制代碼 from itertools import chain#傳入好幾個可迭代一塊迭代 from pprint import pprint#漂亮列印 from copy import deepcopy#深拷貝 #your class&function here if __name__ == "__main__": tup=("a","b","c")#tuple是不可變物件,有序 a,b,c=tup#可以直接拆包 d,*other=tup#d是第一個元素,其他的不管了 #c語言中tuple是struct而list卻是arry,tuple在編譯的時候就確定,效能強,可作為dic的key,不可變->可雜湊 print(a,b,c,d) #namedtuple的用法 dic={"age":23,"country":"china","height":165,"edu":"master"} User=namedtuple("User",["age","country","height","edu"])#會給你建立一個類,類名加屬性列表 user=User(age=23,country="china",height=165,edu="mm")#可以按屬性名例項化,並且屬性名不可變 f=(23,"china",165) user2=User(*f,"muster")#和上面初始化是一樣,並且額外傳入了了master這個屬性 #從資料庫裡拿出來少了一列的話可以直接加上 user3=User(**dic)#用字典例項化 print(user3) print("age is :",user2.age)#省空間,記憶體和消耗小,引數就是用tuple完成的 exec("print('hello')")#執行程式碼 #defualtdict的用法,c寫的,還有setdefault dic= {'runoob': '菜鳥', 'google': 'Google 搜尋'} print("Value : %s" % dic.setdefault('runoob', None))#呼叫setdefault的時候,找不到就返回這個default值 print("Value : %s" % dic.setdefault('Taobao', '淘寶')) def de(): return {"a":"booon"} d=defaultdict(de)#傳入一個可呼叫物件,如果字典沒有找到你要的,就給你新建一個鍵值為物件的返回值,可用函式包裝一下 for i in ["apple","sansum","HP"]:#int預設為0,list預設空字典 print(d[i]) #deque雙端佇列 x=[1,2,3,4,5] q=deque(x,maxlen=20)#可迭代物件,最大長度 print(q) q.clear()#清空 print(q) q.append(6)#新增物件 q.appendleft([1,2])#向左端新增物件 print(q.count(6))#返回6出現的次數 q1=q.copy()#淺拷貝 q2=deepcopy(q)#深拷貝 q.extend(q1)#合併,直接在原來deque操作,不會返回新的 q.extendleft(q1)#從左邊合併,直接在原來deque操作,不會返回新的 q.insert(3,"xxx")#在下標為3地方插入 q.reverse()#翻轉 q.pop()#返回最後一個並刪除 q.popleft()#返回最左邊的一個並刪除 q.index(6,0,5)#在0-5下標之間查詢6 #Counter的使用 a="sadbsdvfbdjfbdksdv" s=Counter(a)#傳入一個可迭代物件字串列表什麼的,統計元素出現次數 s1=Counter(a) s.update(s1)#把兩個Counter加起來 pprint(s.most_common(2))#最多次數的倆,是列表 print(s) for i in ChainMap(s,s1): pprint(i)#把倆字典合成Chainmap物件,但是不是加在一起了,迭代的時候前面出現的元素後面不會在列印
接下來的程式碼會很多我在PyCharm裡面寫
@property @被裝飾的類.setter @被裝飾的類.getter
class A(): def __init__(self,birth,day): self.birth=birth self.day=day def __getattr__(self,item):#找不到屬性的時候會進入這個函式 return "沒發現"+item def __getattribute__(self,item):#只要查詢屬性就會進入這裡 return "發現了"+item a=A(1,3) a.name
'發現了name'
使用屬性描述符限制傳值的型別,參考Django的form驗證
如果user是某個類的例項,那麼user.age 以及等價的getattr(user,age)
首先呼叫 getattribute 。如果類定義了 getattr 方法,
那麼在____getattribute__丟擲AttributeError的時候就會呼叫到 getattr ,
而對於描述符( get )的呼叫,則是發生在 getattribute 內部的。
user = User(),那麼user.age 順序如下:
(1) E3果“”是出現在User或其基類的 diet 中,且age是data descriptor,
(2) 如果“age”出現在obj的 diet 中,那麼直接返回obj. diet [fage’],
(3) 如果“age”出現在User或其基類的 dict *
(3.1) 如果age是non-data descriptor,那麼呼叫其 get 方 法 , 否 則
(3.2) 返回 dict [fage’]
(4) 如果User有 getattr 方法,呼叫 getattr 方法,否則
(5) 丟擲AttributeError
class A (): def __new__(cls):#代表本類 return super.__new__(cls) def __init__(self):#代表本物件
File "<ipython-input-2-00e9d8750bd0>", line 5 ^ SyntaxError: unexpected EOF while parsing
new是用來自定義類產生過程的邏輯,init是用來定義例項產生的過程的
如果new沒有返回物件,則不會進入例項化階段
Day 7
迭代器是實現的迭代協議,無法下標訪問(getitem)不直接返回,惰性返回資料的方式,,iterable-> iter函式 和tierator->netx函式 實際訪問通過的是next
生成器 只要是含有yield的就是生成器 提供了惰性求值
python—切皆物件,棧幀物件,位元組碼物件
當foo呼叫子函式bar,又士建立一個棧幀
所有的棧幀都是分配在堆記憶體上,這就決定了棧幀可以獨立於呼叫者存在
靜態語言是放到棧上執行完畢之後會銷燬動態語言放在堆上,找到棧幀可以持續呼叫

棧幀位元組碼
原生過程是將遞迴之後的位元組碼都壓到棧幀裡去,然後函式的指標指向最近一次執行的棧幀,執行之後就會執行下一個棧幀所以可以順利的yield

棧幀
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "10/12/2018 3:51 PM" #import your model here import dis import numpy as np from collections.abc import Mapping,MutableMapping #your class&function here def generat(): yield 1 s="我是s" yield 2 m = "我是m" yield 3 def gen_fib(index): n,f0,f1=0,0,1 while n<=index: f0,f1=f1,f1+f0 yield f0 n+=1 if __name__ == "__main__": ge=generat() print(dis.dis(ge)) print(ge.gi_frame.f_lasti)#最後指向棧幀的指標 print(ge.gi_frame.f_locals)#本地變數 next(ge) print(ge.gi_frame.f_lasti)#最後指向棧幀的指標 print(ge.gi_frame.f_locals)#本地變數 next(ge) print(ge.gi_frame.f_lasti)#最後指向棧幀的指標 print(ge.gi_frame.f_locals)#本地變數 next(ge) print(ge.gi_frame.f_lasti)#最後指向棧幀的指標 print(ge.gi_frame.f_locals)#本地變數 next(ge) # for i in ge: #print(i) # for i in gen_fib(50): #print(i)
120 LOAD_CONST1 (1) 2 YIELD_VALUE 4 POP_TOP 136 LOAD_CONST2 ('我是s') 8 STORE_FAST0 (s) 1410 LOAD_CONST3 (2) 12 YIELD_VALUE 14 POP_TOP 1516 LOAD_CONST4 ('我是m') 18 STORE_FAST1 (m) 1620 LOAD_CONST5 (3) 22 YIELD_VALUE 24 POP_TOP 26 LOAD_CONST0 (None) 28 RETURN_VALUE None -1 {} 2 {} 12 {'s': '我是s'} 22 {'s': '我是s', 'm': '我是m'} --------------------------------------------------------------------------- StopIterationTraceback (most recent call last) <ipython-input-9-c12afe8f7bdb> in <module>() 37print(ge.gi_frame.f_lasti)#最後指向棧幀的指標 38print(ge.gi_frame.f_locals)#本地變數 ---> 39next(ge) 40# for i in ge: 41#print(i) StopIteration:
讀取一行超大檔案有分隔符
def myreadlines(f,newline): buf="" while True: while newline in buf : pos=buf.index(newline) yield buf[:pos] buf=buf[pos+len(newline):] chunk=f.read(4096) if not chunk: yield buf break buf+=chunk if __name__ == "__main__": with open("./files/a.txt","r",encoding="utf-8") as f: for i in myreadlines(f,"|"): print(i)
sdfghRSXCSYDVASYEDCRAVSBWDYGT exrsadcsvydbasfygv7aysdsaidbas9dunb shdvgyascvdtysacdysgbdyuasfvy dsfvgyasuvbdas87fydvasyfvi dvfysaded7ftv7sadybs sdyfb87dygvfds7ybnfufvwb8dfvbe6tvdb8uvfwy8duiygwdf sedfuwyiyfrte7y8urgevtfbyefb7vyd8unedyb8 edyundimgrfey8unimdgrvyb8unwime
socket相關
sk.bind(address)
s.bind(address) 將套接字繫結到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。
sk.listen(backlog)
開始監聽傳入連線。backlog指定在拒絕連線之前,可以掛起的最大連線數量。
backlog等於5,表示核心已經接到了連線請求,但伺服器還沒有呼叫accept進行處理的連線個數最大為5 這個值不能無限大,因為要在核心中維護連線佇列
sk.setblocking(bool)
是否阻塞(預設True),如果設定False,那麼accept和recv時一旦無資料,則報錯。
sk.accept()
接受連線並返回(conn,address),其中conn是新的套接字物件,可以用來接收和傳送資料。address是連線客戶端的地址。
接收TCP 客戶的連線(阻塞式)等待連線的到來
sk.connect(address)
連線到address處的套接字。一般,address的格式為元組(hostname,port),如果連接出錯,返回socket.error錯誤。
sk.connect_ex(address)
同上,只不過會有返回值,連線成功時返回 0 ,連線失敗時候返回編碼,例如:10061
sk.close()
關閉套接字
sk.recv(bufsize[,flag])
接受套接字的資料。資料以字串形式返回,bufsize指定最多可以接收的數量。flag提供有關訊息的其他資訊,通常可以忽略。
sk.recvfrom(bufsize[.flag])
與recv()類似,但返回值是(data,address)。其中data是包含接收資料的字串,address是傳送資料的套接字地址。
sk.send(string[,flag])
將string中的資料傳送到連線的套接字。返回值是要傳送的位元組數量,該數量可能小於string的位元組大小。即:可能未將指定內容全部發送。
sk.sendall(string[,flag])
將string中的資料傳送到連線的套接字,但在返回之前會嘗試傳送所有資料。成功返回None,失敗則丟擲異常。
內部通過遞迴呼叫send,將所有內容傳送出去。
sk.sendto(string[,flag],address)
將資料傳送到套接字,address是形式為(ipaddr,port)的元組,指定遠端地址。返回值是傳送的位元組數。該函式主要用於UDP協議。
sk.settimeout(timeout)
設定套接字操作的超時期,timeout是一個浮點數,單位是秒。值為None表示沒有超時期。一般,超時期應該在剛建立套接字時設定,因為它們可能用於連線的操作(如 client 連線最多等待5s )
sk.getpeername()
返回連線套接字的遠端地址。返回值通常是元組(ipaddr,port)。
sk.getsockname()
返回套接字自己的地址。通常是一個元組(ipaddr,port)
sk.fileno()
套接字的檔案描述符
#__server__ import socket #tcp sk=socket.socket(socket.AF_INET,socket.SOCK_STREAM)#使用ipv4,TCP sk.bind(("127.0.0.1",5333))#一個元組監聽(ip,埠號) sk.listen(5)#請求來了被掛起的最大數量,超過之後便不再接受請求 sock,addr=sk.accept()#先accep之後會產生sock和addr使用本sock物件進行操作 while True:#用一個while True來不斷接受請求,若要處理多個請求請用多執行緒 data=sock.recv(1024)#一次接受1024個Bytes recvmssage=data.decode("utf-8")#客戶端怎麼編碼就得怎麼解碼 print(recvmssage) temp="hello traveler" sock.send(temp.encode("utf-8"))#可以指定也可以不制定編碼 sock.close()#Tcp一定要關閉連線 #上述程式進行一次之後接收不到東西就會停止,需要client不斷髮送自行控制吧 #udp sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)#使用ipv4,UDP sk.bind(("127.0.0.1",5333))#需要繫結("ip",port) data, addr = sk.recvfrom(1024)#接受到 資料,地址(地址要留著後面還要發回去) print(data.decode("utf-8"))#列印 temp="the light will bring me vicetory" sk.sendto(temp.encode(),addr)#傳送到addr
#__client__ import socket #tcp sk=socket.socket(socket.AF_INET,socket.SOCK_STREAM)#使用ipv4 TCP sk.connect(("127.0.0.1",5333))#TCP是需要連線的 temp="hello remote people" sk.send(temp.encode())#傳送同樣需要編碼 d=sk.recv(1024).decode("utf-8")#接收位元組並解碼 print(d) sk.close()#需要關閉 #udp sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)#用ipv4,UDP addr = ('127.0.0.1',5333)#先確定自己的地址 data="ha? me?" sk.sendto(data.encode(), addr)#傳送到,記得編碼 recvdata, addr=sk.recvfrom(1024)#接受,同樣需要區大小,最好能給個buffer print(recvdata.decode('utf-8'))#傳送的時候要記得編碼

socketserver的繼承
import socketserver
封裝好了一些東西了拿來直接用
呼叫方式為
sock=socketserver.ThreadTCPServer(("ip",port),HandlerClass)
sock.server_forever如果知道應用程式只能操縱面向資料流的連線(如TCP),那麼應從StreamRequestHandler繼承,而不是BaseRequestHandler。StreamRequestHandler類設定了兩個屬性,h.wfile是將資料寫入客戶端的類檔案物件,h.rfile是從客戶端讀取資料的類檔案物件。
如果要編寫針對資料包操作的處理程式並將響應持續返回傳送方,那麼它應當從DatagramRequestHandler繼承。它提供的類介面與StramRequestHandler相同。常用的handler屬性
h.client_address#客戶端地址
h.server#獲取自己的server物件
h.request
對 TCP server,h.request 屬性是連線到 client 的連線套接字物件;就是那個socket
對 UDP server,h.request 屬性是一個二元組(data, sock),data 是 client 端傳送的資料(最大8192位元組),sock是server端套接字。
使用這個屬性可以獲取在這個進/執行緒中與client套接字建立連線的連線套接字,從而可以使用這個套接字與client端通訊。
StreamRequestHandler 和 DatagramRequestHandler 則遮蔽了 self.request 對TCP和UDP連線的區別,二者都重定義了 setup() 和 finish() 方法,提供統一的 self.rfile 和 self.wfile 屬性以便從客戶端讀取資料或向客戶端傳送資料。
from socketserver import BaseRequestHandler class MyHandl(BaseRequestHandler): def setup(self): print("begine") def handle(self): conn=self.request#拿到請求物件 data=conn.recv(1024)#接收 print(data.decode("utf-8")) temp="你好,你好~" conn.sendall(temp.encode("utf-8")) print(self.client_address) def finish(self): print("over")
ThreadTCPServer
ThreadUDPServer
ThreadingUnixStreamServer
ThreadingUnixDatagramServer
所有的Server都有下面的方法
1.sock.socket 用於傳入請求的套接字物件
2.sock.server_address 監聽伺服器的地址.比如元組("127.0.0.1",80)
3.sock.RequestHandlerClass 傳遞給伺服器建構函式並由使用者提供的請求處理程式類.
4.sock.serve_forever() 處理無限的請求.
5.sock.shutdown() 停止serve_forever()迴圈.
6.sock.fileno() 返回伺服器套接字的整數檔案描述符.該方法可以有效的通過輪詢操作(如select()函式)使用伺服器例項.
多執行緒
python中的一個執行緒對應c中的一個執行緒,GIL在某一時刻只能有一個執行緒在一個cpu上執行位元組碼無法將多個執行緒對映到多個核上
GIL不是在本執行緒使用完之後才會釋放,根據執行的位元組碼和時間片和IO操作才釋放,導致執行緒不安全
最開始語言使用的都是多程序,對系統資源消耗很大,後來開始使用執行緒,執行緒是依附於程序的,作業系統能排程的最小單位是執行緒IO為主的程式效能差別不大
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/19 8:48" #import your model here from threading import Thread #your class&function here class sayHello(Thread):#建立一個執行緒類 def __init__(self,someone):#init傳引數 super().__init__()#呼叫父類的構造 self.someone=someone#引數 self.daemon = True#在這裡設定守護執行緒 def run(self):#重寫run函式執行緒會執行這個函式 print("Hello ",self.someone) if __name__ == "__main__": def sayHi(someone):#定義一個執行緒要用的函式 print("Hi ",someone) t = Thread(target=sayHi,args = ("janny",))#例項化一個縣城 t.start()#開始執行 namelist=["天使","76","Bob","安娜","法雞"] ThreadList=[]#定義一個執行緒列表 for i in namelist: ThreadList.append(sayHello(i))#新增執行緒 ThreadList[2].setDaemon(True)#設定為守護執行緒,其他執行緒結束的時候自動結束 print(ThreadList[2].isAlive())#返回執行緒是否還活著 ThreadList[2].setName("你好執行緒")#設定執行緒名。 print(ThreadList[2].getName())#返回執行緒名。 for t in ThreadList: t.start()#開始一個執行緒
Hi False 你好執行緒 janny Hello天使 Hello76 HelloBob Hello安娜 Hello法雞
LIFO佇列 基本FIFO佇列 優先順序佇列
這三種基礎佇列已經實現了執行緒安全
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/19 11:36" #import your model here from queue import Queue,LifoQueue,PriorityQueue from threading import Thread import time #your class&function here if __name__ == "__main__": #先進先出FIFO 也就是基本的Queue q=Queue(maxsize=10)#定義最大容量,滿了就會阻塞執行緒,不定義就預設無上限 for i in range (10): q.put(i,timeout=5)#往裡放一個東西,超時時間 for i in range(10): print(q.get(timeout=5))#返回了一個取出的值,超時等待時間 print("--------分割----------") #先進後出LIFO q=LifoQueue(maxsize=10) for i in range(5): q.put(i) #q.join()#會直接在這裡卡住不動,等佇列清空並且task_down發出才會執行 # 可以配合empty full 判斷是不是消費或者生產結束 for i in range(10): print(q.get()) print("--------分割----------") #優先順序隊儲存一個元組,數字越小優先順序越高 from random import randint q=PriorityQueue(maxsize=10)#maxsize同上 for p in zip([randint(0,10) for i in range(10)],[chr(randint(65,90))for i in range(10)]): #隨機生成數字和字母的列表表示式 q.put(p) print(q.qsize())#獲取佇列的長度 print(q.empty())#佇列是空的嗎 print(q.full())#佇列是滿的嗎 for i in range(10): print(q.get())
join()會暫時掛起佇列和妄圖注入佇列的執行緒,直到佇列清空,每娶一個就要執行task_down()否則他不知道這個佇列已經空了就會一直掛著。。。
就是用join的時候就一定要每次取完task_down一下
鎖
鎖要小心使用,
假設得到a,b才能執行
執行緒A得到了b,B得到了A
AB都不能執行
鎖裡在獲得鎖會死鎖
注意鎖只能被一個執行緒獲得執行釋放之後其他執行緒才會使用
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/19 15:58" #import your model here from threading import Lock,RLock#引入鎖 from threading import Thread #your class&function here a = 0 lock=Lock() rLock=RLock() def add1(): global a global lock for i in range(1000000): lock.acquire()#獲得鎖 a += 1 lock.release()#釋放鎖 def sub1(): global a global rLock for i in range(1000000): rLock.acquire()#在一個執行緒內可重複獲得的鎖 rLock.acquire()#重複獲得 a -= 1 rLock.release()#一定要有對應次數的釋放 rLock.release() if __name__ == "__main__": t1=Thread(target=add1) t2=Thread(target=sub1) t1.start() t2.start() print(a) #如果去掉鎖的話結果a會亂
產生死鎖的四個必要條件:
(1) 互斥條件:一個資源每次只能被一個程序使用。
(2) 請求與保持條件:一個程序因請求資源而阻塞時,對已獲得的資源保持不放。
(3) 不剝奪條件:程序已獲得的資源,在末使用完之前,不能強行剝奪。
(4) 迴圈等待條件:若干程序之間形成一種頭尾相接的迴圈等待資源關係。
條件變數
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/19 16:29" #import your model here from threading import Thread,Condition #your class&function here class TianMao(Thread): def __init__(self,cond): super().__init__() self.cond = cond def run(self): self.cond.acquire()#獲得 print("天貓:1") self.cond.notify()#會發送一個訊號 self.cond.wait()#沒收到訊號之前會阻塞 print("天貓:3") self.cond.notify()#會發送一個訊號 self.cond.wait()#沒收到訊號之前會阻塞 print("天貓:5") self.cond.notify()#會發送一個訊號 self.cond.wait()#沒收到訊號之前會阻塞 self.cond.release()#一定記得釋放掉 class XiaoAi(Thread): def __init__(self,cond): super().__init__() self.cond=cond def run(self):#和天貓的呼叫一樣,兩種呼叫方式 with self.cond:#with 好用 self.cond.wait() print("小愛:2") self.cond.notify() self.cond.wait() print("小愛:4") self.cond.notify() self.cond.wait() print("小愛:6") self.cond.notify() if __name__ == "__main__": cond=Condition() t = TianMao(cond) x = XiaoAi(cond) x.start() t.start()
啟動順序很重要,如果notify先啟動了那麼沒有wait的執行緒就會卡住
with cond/acquire 之後才能使用wait和notify
condition原理是兩層鎖,底層鎖線上程呼叫了wait的時候釋放,上面的鎖在每次wait的時候分配一把放到cond的等待佇列中去,等notify的喚醒
訊號量
和鎖一樣用,可以不用同一個函式來釋放,可以呼叫多個
是個全域性物件
定義n個訊號量,每次qcquire就-1,每次release就+1減到了0 就會鎖住
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/19 17:10" #import your model here from threading import Thread,Semaphore import time #your class&function here class consumer(Thread): def __init__(self,sem): super().__init__() self.sem=sem def run(self): time.sleep(2) print("eat!") self.sem.release() class producer(Thread): def __init__(self, sem): super().__init__() self.sem = sem def run(self): self.sem.acquire() print("make!") consumer(self.sem).start() if __name__=="__main__": sem=Semaphore(3) for i in range(20): p=producer(sem) p.start()
make!make! make! eat!eat!eat! make! make!make! eat! eat! make! make! eat! make! eat! eat! make! make!eat! make! eat!eat! make! make! eat! make! eat! eat! make! make! eat! make! eat!eat! make! make! eat! eat! eat!
程式結構基本就是一個生產者執行緒拉起一個消費者執行緒,消費者消費完了才會釋放訊號量
執行緒池
對執行緒進行管理,呼叫執行緒狀態,獲取執行緒返回值,進行池化管理
執行緒完成之後主執行緒能夠立即知道
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/19 18:57" #import your model here from concurrent.futures import ThreadPoolExecutor,as_completed,wait,ALL_COMPLETED import time #your class&function here def go(times): time.sleep(times) print("{}is done".format(times)) return "it is {} done".format(times)#函式有返回值 executor=ThreadPoolExecutor(max_workers=2)#n個執行緒併發執行 task1=executor.submit(go,(2))#提交一個執行緒物件到池中,池中可以放無數個,但是隻能同時執行max_workers個 task2=executor.submit(go,(8))#submit提交了之後會立即執行,附帶了execute了 #task是一個future物件 if __name__ == "__main__": print(task2.done())#獲取函式是否執行完了。 print("取消1成功了嗎",task1.cancel())#嘗試取消一個執行緒,取消成功Ture,否則False print("取消2成功了嗎",task2.cancel())#完成了的執行緒無法取消,未完成已經在池中的可以取消 print(task1.result())#獲取執行完之後的返回結果 time.sleep(9) print(task2.done()) #窮舉是比較低階的做法 t=[5,6,3,7] all_task=[executor.submit(go,(i)) for i in t]#迭代放進一個執行緒池 for future in as_completed(all_task):#允許非同步的等代執行緒池中完成的執行緒並取回結果 print(future.result())#誰先做完取回誰 wait(all_task,return_when=ALL_COMPLETED)#等all_task全執行完才執行main執行緒 ''' FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' _AS_COMPLETED = '_AS_COMPLETED' 可以取上面的值 ''' #使用map for future in executor.map(go,t):#和map是一個道理 對映 print(future)#直接可以列印結果哦,返回順序和url的順序一致
多程序
程序之間是相互獨立的,變數空間按不能共享,各程序保留一份程式碼副本和執行堆疊需要用特殊的資料結構來訪問
linux下子程序永遠返回0,而父程序返回子程序的ID。這樣做的理由是,一個父程序可以fork出很多子程序,所以,父程序要記下每個子程序的ID,而子程序只需要呼叫getppid()就可以拿到父程序的ID。
ProcessPoolExecutor 代替ThreadPoolExecutor就可以了,介面形狀一樣
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/19 20:07" #import your model here import multiprocessing #your class&function here def go(a): print("it seems like ",a) return "it is {} done ".format(a) if __name__ == "__main__": p=multiprocessing.Process(target=go,args=(5,)) print(p.pid)#沒啟動的時候為None p.start() print(p.pid)#啟動了返回pid p.join() #使用池 pool=multiprocessing.Pool(multiprocessing.cpu_count())#傳入worker數量預設為cpu核數量 result=pool.apply_async(go,args=(3,))#和commit一個作用直接傳進去 pool.close()#不再接受新的程序任務 pool.join()#可以一次性將池中的程序都阻塞 print(result.get()) for result in pool.imap(go,[1,5,3]):#非同步返回結果,一次性提交 print(result) for result in pool.imap_unordered(go,[2,6,4]):#非同步返回結果,一次性提交,無序 print(result)
程序之間通訊
併發 並行 阻塞 非阻塞 同步 非同步
併發 在某一時間段內有多個程式執行在一個cpu上
並行 在某一時刻有多個程式執行在多個cpu上
阻塞 呼叫函式時當前函式被掛起
非阻塞 呼叫函式執行緒不會被掛起會立即返回
同步IO 發出請求之後等待結果
非同步IO 發出請求之後立即返回,不等待結果uinx下五種IO模型
阻塞式IO
非阻塞式IO
IO多路複用
非同步IO(協程)
訊號驅動式IO(用的少)select、poll、epoll
select, poll , epoll都是I〇多路複用的機制。I/O多路複用就是通過一種機
制,一個程序可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或
者寫就緒),能夠通知程式進行相應的讀寫操作。但select, poll , epoll本
質上都是同步I/O ,因為他們都需要在讀寫事件就緒後自己負責進行讀寫,
也就是說這個讀寫過程是阻塞的,而非同步I/O則無需自己負責進行讀寫,異
步I/O的實現會負責把資料從核心拷貝到使用者空間。select
select函式監視的檔案描述符分3類,分別是writefds、readfds、和
exceptfds。呼叫後select函式會阻塞,直到有描述副就緒(有資料可讀、
可寫、或者有except),或者超時(timeout指定等待時間,如果立即返回
設為null即可),函式返回。當select函式返回後,可以通過遍歷fdset,來
找到就緒的描述符。
select目前幾乎在所有的平臺上支援,其良好跨平臺支援也是它的一個
優點。selec啲一個缺點在於單個程序能夠監視的檔案描述符的數量存在最
大限制,在Linux上一般為1024 ,可以通過修改巨集定義甚至重新編譯核心的
方式提升這一限制,但是這樣也會造成效率的降低。poll
不同與select使用三個點陣圖來表示三個fdset的方式,poll使用一個
pollfd的指標實現。
pollfd結構包含了要S見的event和發生的event,不再使用select "參
數-填〃傳遞的方式。同時,poNfd並沒有最大數量限制(但是數量過大後
效能也是會下降)。和select函式_樣,poll返回後,需要輪詢pollfd來獲
取就緒的描述符。
從上面看,select和poll都需要在返回後,通過遍歷檔案描述符來獲取
已經就緒的socket。事實上,同時連線的大量客戶端在一時刻可能只有很少
的處於就緒狀態,因此隨著監視的描述符數量的增長,其效率也會線性下降epoll
linux支援,windows不支援
epoll是在2.6核心中提出的,是之前的select和poll的增強版本。
相對於select和poll來說,epol更加靈活,沒有描述符限制。
epoll使用一個檔案描述符管理多個描述符,
將使用者關係的檔案描述符的事件存放到核心的一個事件表中,這樣在使用者空間和核心空間的copy只需一次。
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/21 19:29" #import your model here import asyncio import time from functools import partial #your class&function here async def gethtml(html): print("start "+html) await asyncio.sleep(2)#非同步等待 print('end '+html) return "窩做完了" def callback(address,future):#使用partial時引數必須按順序擺放,呼叫時是先傳入的地址,loop又給的future print("I have send it to ",address) if __name__ == "__main__": start_time=time.time() loop=asyncio.get_event_loop()#建立一個協程池 loop.run_until_complete(gethtml("a.com"))#基本用法,提交一個協程物件立即非同步執行 #使用future get_future=asyncio.ensure_future(gethtml("b.com"))#和loop.create_task的方法作用一樣,返回一個future物件,參考submit,把協程物件放進去 loop.run_until_complete(get_future)#傳入協程future並執行,會直接對原先的佇列做操作 print(get_future.result())#列印處理完的返回值 # 使用task,可以添加回調 task = loop.create_task(gethtml("c.com"))# task和future一樣,是future的子類 task.add_done_callback(partial(callback,"[email protected]"))#太難嫁一個回撥,在任務完成時就會呼叫,會自動向callback傳入一個完成了的future loop.run_until_complete(task)# 傳入協程future並執行,會直接對原先的佇列做操作 #partial函式用於把(函式,引數)包裹起來形成一個新的(控制代碼)可供其他函式接收 print(task.result())# 列印處理完的返回值 #使用任務佇列 tasks=[gethtml("a.com") for i in range(10)]#任務列表,全是協程物件 #loop.run_until_complete(asyncio.wait(tasks))#傳入asyncio.wait(任務列表)預設直到全完成才退出 tasks2 = [gethtml("a.com") for i in range(10)] tasks2=asyncio.gather(*tasks2) #wait方法和執行緒中的wait方法一樣,會阻塞執行緒直到滿足returnwhen然後觸發一個事件繼續進行 loop.run_until_complete(asyncio.gather(*tasks,tasks2)) #可以直接task2.cancel()取消佇列中的任務 # gather更加高層,*tasks可以將task傳入,可傳入多個taskgroup,注意task和task2 print(time.time()-start_time)
包含各種特定系統實現的模組化事件迴圈傳輸和協議抽象
對TCP、UDP、SSL、子程序、延時呼叫以及其他的具體支援模仿futures模組但適用於事件迴圈使用的Future類心
基於yield from的協議和任務,可以讓你用順序的方式編寫併發程式碼必須使用一個將產生阻塞IO的呼叫時
有介面可以把這個事件轉移到執行緒池莫仿threading模組中的同步原語、可以用在單執行緒內的協程之間
asyncio生態
asyncio官方只實現了比較底層的協議,比如TCP,UDP。所以諸如HTTP協議之類都需要藉助第三方庫,比如aiohttp。
雖然非同步程式設計的生態不夠同步程式設計的生態那麼強大,但是如果又高併發的需求不妨試試,下面說一下比較成熟的非同步庫
aiohttp
非同步http client/server框架
github地址: ofollow,noindex">https://github.com/aio-libs/aiohttp
sanic
速度更快的類flask web框架。
github地址:
https://github.com/channelcat/sanicuvloop
快速,內嵌於asyncio事件迴圈的庫,使用cython基於libuv實現。
官方效能測試:
nodejs的兩倍,追平golang
github地址: https://github.com/MagicStack/uvloop
為了減少歧義,這裡的效能測試應該只是網路IO高併發方面不是說任何方面追平golang。
總結
Python之所以能夠處理網路IO高併發,是因為藉助了高效的IO模型,能夠最大限度的排程IO,然後事件迴圈使用協程處理IO,協程遇到IO操作就將控制權丟擲,那麼在IO準備好之前的這段事件,事件迴圈就可以使用其他的協程處理其他事情,然後協程在使用者空間,並且是單執行緒的,所以不會像多執行緒,多程序那樣頻繁的上下文切換,因而能夠節省大量的不必要效能損失。
注: 不要再協程裡面使用time.sleep之類的同步操作,因為協程再單執行緒裡面,所以會使得整個執行緒停下來等待,也就沒有協程的優勢了
本文主要講解Python為什麼能夠處理高併發,不是為了講解某個庫怎麼使用,所以使用細節請查閱官方文件或者執行。
無論什麼程式語言,高效能框架,一般由事件迴圈 + 高效能IO模型(也許是epoll)組成
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/22 8:18" #import your model here import asyncio import time #your class&function here async def get(t): print("waiting") await asyncio.sleep(t) print("done after {} seconds".format(t)) if __name__ == "__main__": tasks=[get(1),get(2),get(3)] loop=asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks))#開始執行任務 except KeyboardInterrupt as e:#按下Ctrl+c的時候會觸發 all_tasks=asyncio.Task.all_tasks()#獲取所有的協程任務,沒有傳入,同過eventloop獲取的 for i in all_tasks:#迭代 print("cancel it!") print(i.cancel())#取消任務,成功就返回True loop.stop()#停止迴圈 loop.run_forever()#呼叫,否則會出錯 finally: loop.close()#無論怎樣都要關閉loop

巢狀協程的返回機制
#import your model here import asyncio,time from concurrent.futures import ThreadPoolExecutor #your class&function here def s(t): print("sleep ",t) time.sleep(t) #在非同步中使用同步程式碼 if __name__ == "__main__": loop=asyncio.get_event_loop() executor=ThreadPoolExecutor(3)#建立執行緒池 tasks=[] for i in range(3): task=loop.run_in_executor(executor,s,i)#傳入要新增的執行緒池,要執行的同步函式,引數,返回一個task物件 tasks.append(task)#新增任務列表 loop.run_until_complete(asyncio.wait(tasks))
def callback(t): print("sleep {} times".format(t)) def stoploop(loop): loop.stop() if __name__ == "__main__": loop=asyncio.get_event_loop() now=loop.time() loop.call_later(2,callback,5)#過指定的時間就會執行某個函式,引數,根據時間排列先後順序 loop.call_at(now+4,callback,1)#過指定的時間就會執行某個函式,引數,loop中的時間 loop.call_soon(callback,4)#立即執行某個函式,引數,不會按時間排序一起開始執行 loop.call_soon(stoploop,loop)#停止協程 loop.call_soon_threadsafe(callback,3)#執行緒安全的執行某個函式 loop.run_forever()#別忘了這個
鎖
lock=Lock() cache={} async def parse(arg):#請求html的函式 global cache async with lock:#可以acquire也可以直接async with=with await if arg in cache: return cache[arg] await asyncio.sleep(3)#耗時操作 cache["arg"]=2 print(arg) async def use():#使用這個請求,呼叫了這個解析函式 await parse("didi") async def reparse():#重新解析這個請求,再次呼叫這個解析函式,這樣就對一個url呼叫了2次 await parse("didi") #在這樣使用時會導致協程不安全所以使用鎖
爬蟲
#-*-coding:utf-8-*- #SettingCode here __author__ = "a_little_rubbish" __date__ = "2018/11/23 13:27" #import your model here import aiohttp import aiomysql import asyncio import re from asyncio.queues import Queue from pyquery import PyQuery stop=False #your class&function here start_url="http://www.jobbole.com/" waiting_urls=Queue()#未爬取的連線 seen_urls=set()#已爬取過的連線,量大就用布隆過濾器 async def fetch(url,session):#獲取url try: async with session.get(url) as the_response: print("url status:{}".format(the_response.status)) if the_response.status in [200,201]: data =await the_response.text() return data except Exception as e: print(e) async def extract_urls(html): urls=[] pq=PyQuery(html) for links in pq.items("a"): url=links.attr("href") if url and url.startswith("http") and url not in seen_urls: urls.append(url) await waiting_urls.put(url) async def init_urls(url,session):#用作啟動 html= await fetch(url,session) await extract_urls(html) async def article_handler(url,session,pool): html=await fetch(url,session) seen_urls.add(url) await extract_urls(html) pq=PyQuery(html) tittle=pq("tittle").text() async with pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute("INSERT INTO tittle VALUE {}".format(tittle)) print("ok +1") async def consumer(pool): async with aiohttp.ClientSession() as session: while not stop: url= await waiting_urls.get() print("get {}".format(url)) if re.match("http://.*?jobbole.com/\d+/",url): if url not in seen_urls: asyncio.ensure_future(article_handler(url,session,pool)) else: if url not in seen_urls: asyncio.ensure_future(init_urls(url,session)) async def main(loop): pool = await aiomysql.create_pool(host="127.0.0.1", port=3306, user="root", password="1118", db="aiosql", loop="", charset="utf8", autocommit=True) async with aiohttp.ClientSession() as session: html= await fetch(start_url,session) seen_urls.add(start_url) await extract_urls(html) asyncio.ensure_future(consumer(pool)) if __name__ == "__main__": loop=asyncio.get_event_loop() asyncio.ensure_future(main(loop)) loop.run_forever()