封裝,多態,反射,異常處理,網絡編程
接口與歸一化設計(python中使用繼承的方式)
抽象類
import abc class Interface(metaclass=abc.ABCMeta): #定義接口Interface類來模仿接口的概念,python中 #沒有interface關鍵字來定義一個接口 all_type=‘file‘ #只定義功能集合而不實現具體的功能 @abc.abcstractmethod def read(self): # 定義接口函數read pass @abc.abcstractmethod def write(self): # 定義接口函數write pass class Txt(Interface): #文本,具體實現read和write def read(self): print(‘文本數據的讀取方法‘) def write(self): print(‘文本數據的寫方法‘) t=Txt() print(t.all_type)
多態
一種接口,多種實現(接口重用)
class Animal: def __init__(self,name): self.name=name def talk(self): pass @staticmethod def animal_talk(obj): obj.talk() class Cat(Animal): def talk(self): print(‘Meow!‘) class Dog(Animal): def talk(self): print(‘Woof!Woof!‘) d = Dog(‘chen‘) #d.talk() c = Cat(‘xu‘) #c.talk() 使用統一的接口 Animal.animal_talk(c) Animal.animal_talk(d)
封裝
1.如何隱藏屬性(類的屬性和對象的屬性)
class Foo: __N=1111 # 類的屬性被隱藏 _Foo__N:1111 def __init__(self,name): self.__Name=name # 給對象屬性隱藏 self._Foo__Name=namedef __f1(self): #_Foo__f1 print(‘f1‘) def f2(self): self.__f1() # self._Foo__f1() f=Foo(‘egon‘) #print(f.__N) # 報錯,無法訪問 #f.__f1() # 也無法訪問 #f.__Name #也無法訪問 f.f2() # f1 內部調用,可以訪問
#以上隱藏需要註意的問題:
#1.這種隱藏只是一種語法上變形操作,並不會將屬性真正隱藏起來
#print(Foo.__dict__)類的名稱空間 => ‘-Foo__N‘:1111
#print(f.__dict__)對象的名稱空間 => {‘_foo_Name‘:‘egon‘}
#print(f._foo_Name) # egon
#print(f._Foo__N) # 1111
#2.這種語法級別的變形是在類定義階段發生,並且只在類定義階段發生
Foo.__x=2222
print(Foo.__dict__) # ‘__x‘:2222
print(Foo.__x) # 2222
f.__x=3333
print(f.__dict__) # ‘__x‘:3333 並沒有發生變形
print(f.__x) # 3333
#3.在子類定義的__x不會覆蓋在父類定義的__x,因為子類中變形成了:_子類名__x,
#而父類中變形成了:_父類名__x,即雙下劃線開頭的屬性在繼承給子類時,子類是無法覆蓋的
class Foo: __N=1111 # _Foo__N def __init__(self,name): self.__Name=name # self._Foo__name=name def __f1(self): # _Foo__f1 print(‘f1‘) def f2(self): self.__f1() # self._Foo__f1() f=Foo(‘egon‘) #print(f.__N) #f.__f1() #f.__Name #f.f2()
封裝的真實目的
#封裝不是單純意義上的隱藏
#1.封裝數據屬性:將屬性隱藏起來,然後對外提供訪問屬性的接口,關鍵是我們在接口內定制一些控制邏輯
#從而嚴格控制使用對數據屬性的使用
class People: def __init__(self,name,age): self.__Name=name self.__Age=age def tell_info(self): print(‘<名字:%s 年齡:%s>‘ % (self.__Name,self.__Age)) def set_info(self,x,y): if not isinstance(x,str): raise TypeError(‘%s must be str‘ %x) if not isinstance(y,int): raise TypeError(‘%s must be int‘ %y) self.__Name=x self.__Age=y p=People(‘egon‘,18) p.tell_info() #p.set_info(‘Egon‘,‘19‘) p.set_info(‘Egon‘,19) p.tell_info()
#2.封裝函數屬性:為了隔離復雜度
class ATM: def __card(self): print(‘插卡‘) def __auth(self): print(‘用戶認證‘) def __input(self): print(‘輸入取款金額‘) def __print_bill(self): print(‘打印賬單‘) def __take_money(self): print(‘取款‘) def withdraw(self): self.__card() self.__auth() self.__input() self.__print_bill() self.__take_money() a=ATM() a.withdraw()
靜態屬性property
# bmi 體脂指數 class People: def __init__(self,name,weight,height): self.name=name self.weight=weight self.height=height @property def bmi(self): return self.weight / (self.height**2) p=People(‘egon‘,75,1.80) #print(p.bmi()) #bmi是一個名詞,像訪問數據屬性一樣訪問bmi就可以了 print(p.bmi) #訪問,設置,刪除 class Foo: def __init__(self,x): self.__Name=x @property def name(self): return self.__Name @name.setter # 前提是name已經被property修飾過一次了,才能調用.setter def name(self,val): if not isinstance(val,str): raise TypeError self.__Name=val @name.deleter def name(self): #print(‘===‘) #del self.__Name raise PermissionError # 不允許刪除 f=Foo(‘egon‘) print(f.name) f.name=‘Egon‘ print(f.name) del f.name print(f.name)
反射
#通過字符串找到屬性對應的值
class Foo: x=1 def __init__(self,name): self.name=name def f1(self): print(‘from f1‘) f=Foo(‘egon‘) #hasattr print(hasattr(f,‘name‘)) # f.name print(hasattr(f,‘f1‘)) # f.f1 print(hasattr(f,‘x‘)) # f.x #setattr setattr(f,‘age‘,18) # f.age=18 #getattr print(getattr(f,‘name‘)) # f.name print(getattr(f,‘abc‘,None)) # f.abc print(getattr(f,‘name‘,None)) # f.abc func=getattr(f,‘f1‘) # f.f1 print(func) func() #delattr delattr(f,‘name‘) # del f.name print(f.__dict__)
class FtpServer:
def __init__(self,host,port):
self.host=host
self.port=port
def run(self):
while True:
cmd=input(‘>>: ‘).strip()
if not cmd:continue
if hasattr(self,cmd):
func=getattr(self,cmd)
func()
def get(self):
print(‘get func‘)
def put(self):
print(‘put func‘)
f=FtpServer(‘192.168.1.2‘,21)
f.run()
----------------------------------------------
item系列
#把對象變成類似字典對象,可以像字典一樣進行增刪改查
#也是一種歸一化的思想
#__setitem__,__getitem__,__delitem__
class Foo:
def __getitem__(self,item):
print(‘getitem--->‘,self,item)
return self.__dict__[item]
def __setitem__(self,key,value):
print(‘setitem--->‘,self,key,value)
self.__dict__[key]=value
# setattr(self,key,value) 效果一樣
def __delitem__(self,key):
print(‘delitem--->‘,self,key)
self.__dict__.pop(key)
f=Foo()
f[‘x‘]=1111
del f[‘x‘]
print(f[‘x‘])
----------------------------------------------
打印對象信息__str__
class People:
def __init__(self,name,age,sex):
self.name=name
self.age=age
self.sex=sex
def __str__(self): #在對象被打印時觸發執行
return ‘<name:%s age:%s sex:%s>‘ % (self.name,self.age,self.sex) #只能返回字符串類型
p=People(‘egon‘,18,‘male‘)
print(p)
----------------------------------------------
析構方法__del__
class Foo:
def __init__(self,x):
self.x=x
def __del__(self): # 在對象資源被釋放時出發
print(‘---del---‘)
f=Foo()
print(‘===>‘)
----------------------------------------------
異常處理
#什麽時候用try...except
#錯誤一定會發生,但是無法預知錯誤發生條件
#CS結構的軟件能夠用到,客戶的操作無法控制,客戶單方面把軟件終止掉,
#因為服務端客戶端都是基於TCP鏈接的,但客戶端單方面把鏈接斷開,服務端也會受影響
#這種錯誤有可能發生,服務端不能受影響,這就用到了異常處理
class EgonException(BaseException):
def __init__(self,msg):
self.msg=msg
def __str__(self):
return ‘<%s>‘ % self.msg
raise EgonException(‘egon的異常‘)
----------------------------------------------
基於tcp協議的socket實現簡單通信
簡單的套接字
1.服務端
# socket.AF_INET 用什麽類型的套接字,地址家族
# socket.SOCK_STREAM 流式套接字,就是調用tcp
import socket
#買手機
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) # 基於tcp的套接字
#插卡
phone.bind((‘127.0.0.1‘,8080))
#開機
phone.listen(5) #最大掛起的鏈接數
#等電話鏈接
print(‘server start...‘)
conn,client_addr=phone.accept() # (建立好的tcp鏈接,客戶端的ip和端口)
print(‘鏈接‘,conn)
print(client_addr)
#基於建立的鏈接,收發消息
client_data=conn.recv(1024)
print(‘客戶端的消息‘,client_data)
conn.send(client_data.upper())
#掛電話
conn.close()
#關機
phone.close()
2.客戶端
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
phone.send(‘hello‘.encode(‘utf-8‘)) # 需要二進制類型,轉為bytes類型
server_data=phone.recv(1024) #服務端回復的消息
print(‘服務端回應的消息‘,server_data)
phone.close()
-------------------------------------------------------
加上通信循環
#服務端
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) # 用來解決Address already in use 問題
phone.bind((‘127.0.0.1‘,8080))
phone.listen(5)
print(‘server start...‘)
conn,client_addr=phone.accept()
#基於建立的鏈接,收發消息
while True: #通訊循環
client_data=conn.recv(1024)
conn.send(client_data.upper())
conn.close()
phone.close()
#客戶端
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
while True:
msg=input(‘>>: ‘).strip()
if not msg:continue
phone.send(msg.encode(‘utf-8‘))
server_data=phone.recv(1024)
print(server_data.decode(‘utf-8‘))
phone.close()
-----------------------------------------------------
加上鏈接循環的套接字
#服務端
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
phone.bind((‘127.0.0.1‘,8080))
phone.listen(5)
print(‘server start...‘)
#鏈接循環
while True: # 這個循環是用來解決當有Exception異常時,是客戶端斷了,把這個斷掉的資源回收掉
# 重新等待其他鏈接鏈接進來
conn,client_addr=phone.accept()
while True:
#此處用try..except是由於和多個客戶端建立了鏈接,一個客戶端斷掉鏈接後會服務端拋出異常,
#這樣就導致其余的客戶端也會發生錯誤,拋異常,為了使服務端能夠繼續和沒有主動斷掉鏈接的客戶端
#繼續通信,不崩潰,需要用到這個try..except..,但是只能解決不拋出異常的問題,不能解決讓其他客戶端
#進來和服務端建立通信
try: # 用於解決windows方面的問題
client_data=conn.recv(1024)
if not client_data:break #主要用於linux系統和mac,客戶端單方面斷掉後,recv不會阻塞掉,
#而是不斷的收空,一直send,對方沒有收,一直死循環這個try內的
#代碼,所以需要加上 if not client_data:break ,如果收到的消息
#為空,肯定是客戶端斷掉了鏈接
conn.send(client_data.upper())
except Exception:
break # 把這個通信循環直接斷掉
conn.close()
phone.close()
#客戶端1
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
while True:
msg=input(‘>>: ‘).strip()
if not msg:continue
phone.send(msg.encode(‘utf-8‘))
server_data=phone.recv(1024)
print(server_data.decode(‘utf-8‘))
phone.close()
#客戶端2
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
while True:
msg=input(‘>>: ‘).strip()
if not msg:continue
phone.send(msg.encode(‘utf-8‘))
server_data=phone.recv(1024)
print(server_data.decode(‘utf-8‘))
phone.close()
#客戶端3
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
while True:
msg=input(‘>>: ‘).strip()
if not msg:continue
phone.send(msg.encode(‘utf-8‘))
server_data=phone.recv(1024)
print(server_data.decode(‘utf-8‘))
phone.close()
---------------------------------------------------------
模擬ssh遠程執行命令
#客戶端
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
while True:
cmd=input(‘>>: ‘).strip()
if not cmd:continue
#發命令
phone.send(cmd.encode(‘utf-8‘))
#收命令的執行結果
cmd_res=phone.recv(1024)
#打印結果
print(cmd_res.decode(‘gbk‘)) #linux系統的話改為utf-8
phone.close()
#服務端
import socket
import subprocess
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
phone.bind((‘127.0.0.1‘,8080))
phone.listen(5)
print(‘server start...‘)
while True:
conn,client_addr=phone.accept()
while True:
try:
cmd=conn.recv(1024) #bytes格式
if not cmd:break
#執行命令,拿到結果(subprocess命令的執行結果是bytes類型)
res=subprocess.Popen(cmd.decode(‘utf-8‘), # 需要字符串格式,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout=res.stdout.read() #從正確的管道裏面讀,但是gbk編碼
stderr=res.stderr.read() #從錯誤管道裏面讀,但是gbk編碼
conn.send(stdout+stderr)
except Exception:
break
conn.close()
phone.close()
---------------------------------------------------
解決粘包問題
只有TCP有粘包現象,UDP永遠不會粘包
由於TCP是流式的,沒有開頭沒有結尾,就會發生粘包現象
UDP是數據報協議,它不是基於流的
粘包現象
服務端
from socket import *
s=socket(AF_INET,SOCK_STREAM)
s.bind((‘127.0.0.1‘,8080))
s.listen(5)
conn,addr=s.accept()
#收發消息
data1=conn.recv(1024)
print(‘data1‘,data1)
data2=conn.recv(1024)
print(‘data2‘,data2)
conn.close()
s.close()
#data1: b‘helloworld‘
#data2 b‘‘
上面的結果不一定會發生,取決於算法要不要合並到一起發送過去,
多嘗試幾次,就會發生
客戶端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect((‘127.0.0.1‘,8080))
c.send(‘hello‘.encode(‘utf-8‘))
c.send(‘world‘.encode(‘utf-8‘))
c.close()
----------------------------------------------------
ssh遠程執行命令+定制報頭
客戶端
import socket
import struct
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
while True:
cmd=input(‘>>: ‘).strip()
if not cmd:continue
#發命令
phone.send(cmd.encode(‘utf-8‘))
#先收報頭
header=phone.recv(4)
total_size=struct.unpack(‘i‘,header)[0]
#再收命令的執行結果
recv_size=0
data=b‘‘ #結果
while recv_size < total_size:
recv_data=phone.recv(1024)
recv_size+=len(recv_data)
data+=recv_data
print(data.decode(‘gbk‘))
phone.close()
服務端
import socket
import struct
import subprocess
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
phone.bind((‘127.0.0.1‘,8080))
phone.listen(5)
print(‘server start...‘)
while True:
conn,client_addr=phone.accept()
print(conn,client_addr)
while True:
try:
cmd=conn.recv(1024)
if not cmd:break
res=subprocess.Popen(cmd.decode(‘utf-8‘),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout=res.stdout.read()
stderr=res.stderr.read()
#制作報頭
header=struct.pack(‘i‘,len(stdout)+len(stderr))
#先發報頭(固定長度,這裏一直是4)
conn.send(header)
#再發真實數據
conn.send(stdout)
conn.send(stderr)
except Exception:
break
conn.close()
phone.close()
#此方法的問題是最大只能發小於2G的內容
--------------------------------------------------------------
定制報頭的正確方式
客戶端
import socket
import struct
import json
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
while True:
cmd=input(‘>>: ‘).strip()
if not cmd:continue
#發命令
phone.send(cmd.encode(‘utf-8‘))
#先收報頭的長度
struct_res=phone.recv(4)
header_size=struct.unpack(‘i‘,struct_res)[0]
#再收報頭
header_bytes=phone.recv(header_size)
head_json=header_bytes.decode(‘utf-8‘)
head_dic=json.loads(head_json)
total_size=head_dic[‘total_size‘]
#再收命令的執行結果
recv_size=0
data=b‘‘ #結果
while recv_size < total_size:
recv_data=phone.recv(1024)
recv_size+=len(recv_data)
data+=recv_data
print(data.decode(‘gbk‘))
phone.close()
服務端
import socket
import struct
import subprocess
import json
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
phone.bind((‘127.0.0.1‘,8080))
phone.listen(5)
print(‘server start...‘)
while True:
conn,client_addr=phone.accept()
print(conn,client_addr)
while True:
try:
cmd=conn.recv(1024)
if not cmd:break
res=subprocess.Popen(cmd.decode(‘utf-8‘),
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout=res.stdout.read()
stderr=res.stderr.read()
#制作報頭
header_dic={‘total_size‘:len(stdout)+len(stderr),‘md5‘:None}
header_json=json.dumps(header_dic)
header_bytes=header_json.encode(‘utf-8‘)
#先發報頭的長度(固定4個bytes)
conn.send(struct.pack(‘i‘,len(header_bytes)))
#再發報頭
conn.send(header_bytes)
#再發真實數據
conn.send(stdout)
conn.send(stderr)
except Exception:
break
conn.close()
phone.close()
------------------------------------------------------------------
socketserver實現並發
服務端
import socketserver
class MyTcphandler(socketserver.BaseRequestHandler):
def handle(self):
while True: #通信循環
data=self.request.recv(1024) # 這個self.request就是conn
self.request.send(data.upper())
if __name__ == ‘__main__‘:
#取代鏈接循環
server=socketserver.ThreadingTCPServer((‘127.0.0.1‘,8080),MyTcphandler)
server.serve_forever()
客戶端1
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
while True:
msg=input(‘>>: ‘).strip()
if not msg:continue
phone.send(msg.encode(‘utf-8‘))
server_data=phone.recv(1024)
print(server_data.decode(‘utf-8‘))
phone.close()
客戶端2
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
while True:
msg=input(‘>>: ‘).strip()
if not msg:continue
phone.send(msg.encode(‘utf-8‘))
server_data=phone.recv(1024)
print(server_data.decode(‘utf-8‘))
phone.close()
客戶端3
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect((‘127.0.0.1‘,8080))
while True:
msg=input(‘>>: ‘).strip()
if not msg:continue
phone.send(msg.encode(‘utf-8‘))
server_data=phone.recv(1024)
print(server_data.decode(‘utf-8‘))
phone.close()
封裝,多態,反射,異常處理,網絡編程