python之路——作業:高級FTP(僅供參考)
阿新 • • 發佈:2017-07-15
ice 靜態 enc lose 自己的 創建目錄 返回 msg 組成
一、作業需求
1. 用戶加密認證
2. 多用戶同時登陸
3. 每個用戶有自己的家目錄且只能訪問自己的家目錄
4. 對用戶進行磁盤配額、不同用戶配額可不同
5. 用戶可以登陸server後,可切換目錄
6. 查看當前目錄下文件
7. 上傳下載文件,保證文件一致性
8. 傳輸過程中現實進度條
9.支持斷點續傳
二、實現功能
1、多用戶同時登錄註冊(已有用戶:japhi、alex;密碼都是123)
2、上傳/下載文件(已有示例文件)
3、查看不同用戶自己家得目錄下文件,且只能訪問自己的家目錄
4、對用戶進行磁盤配額,不同用戶配額不同(使用random函數,隨機給用戶一個內存大小(10m-20m))
5、用戶登錄後,可對用戶目錄下文件目錄進行操作,包含:ls(查看當前操作目錄下文件)、cd(切換當前操作目錄)、rm(刪除文件)、mkdir(創建目錄)
6、上傳下載文件,保證文件一致性,且在傳輸過程中實現進度條
7、支持斷點續傳(*******暫未實現*******)
三、目錄說明
FTP/ |-- FTPClient/ #客戶端文件夾 | |-- 示例文件夾/ #客戶端上傳/下載示例文件夾 | |-- Client_start.py #客戶端啟動程序 | |-- FTPServer/ #服務端文件夾 | |-- bin/ | | |-- __init__.py | | |-- Server_start.py #程序啟動的主入口 | | | |-- conf/ | | |-- setting.py #配置文件 | | | |-- db/ #用戶數據 | | |-- alex #用戶名alex的數據文件夾 | | |-- japhi #用戶名japhi的數據文件夾 | | | |-- home/ | | |-- alex/ #用戶alex用戶家目錄 | | |-- japhi/ #用戶japhi用戶家目錄 | | |-- |-- log/ | |-- log_sys.log #日誌文件(未啟用) | |-- |-- src/ | | |-- __init__.py | | |-- common.py #公共功能 | | |-- Server_start.py #程序啟動的主入口 | | |-- user.py #用戶類及方法 |-- db/ #用戶數據 | | |-- alex #用戶名alex的數據文件夾 | | |-- japhi #用戶名japhi的數據文件夾 |-- FTP.png #流程圖 |-- README.txt
四、流程圖
五、代碼說明
1、FTPClient/Client_start.py
from __future__ import division import socket,os,sys,time,hashlib,math updir = os.path.join(os.path.dirname(os.path.abspath(__file__)),"示例文件夾") HOST = "localhost" PORT = 9998 def upload(client,user_info,name): ‘‘‘ 客戶端上傳文件的函數 :param client:scoket客戶端標誌 :param user_info:客戶端登陸用戶的信息 :param name:客戶端登陸用戶的名字 :return:none ‘‘‘ print("\033[1;37m當前可選上傳\033[0m".center(40,"*")) dic = {} for root, dirs, files in os.walk(updir): for i,j in enumerate(files): k = i+1 dic[k] = j print("\033[1;37m%s:%s\033[0m"%(k,j)) choice = input("請輸入要上傳的文件序號:>>>").strip() if choice.isdigit() and 0 < int(choice) <= len(dic): command = "upload+"+user_info+"+"+dic[int(choice)] client.sendall(bytes(command,encoding="utf-8")) res = client.recv(1024) if str(res,encoding="utf-8") == "True": dir = os.path.join(updir,dic[int(choice)]) f = open(dir,"rb") md5 = hashlib.md5() length = os.stat(dir).st_size client.send(str(length).encode()) sign = client.recv(1024).decode() if sign == "ok": data = f.read() md5.update(data) client.sendall(data) f.close() client.send(md5.hexdigest().encode()) res_sign = client.recv(1024) if res_sign == b‘True‘: print("\033[1;37m文件上傳成功\033[0m") elif res_sign == b‘False‘: print("\033[1;37m文件上傳失敗\033[0m") exit() elif sign == "no": print("\033[1;37m磁盤空間不足\033[0m") exit() else: print("\033[1;37m輸入有誤\033[0m") def download(client,user_info,name): ‘‘‘ 客戶端下載文件的函數 :param client: scoket客戶端標誌 :param user_info: 客戶端登陸的用戶信息 :param name:客戶端登陸的用戶名字 :return: none ‘‘‘ dic = {} command = "download+"+user_info client.sendall(bytes(command, encoding="utf-8")) data = client.recv(4069) res = eval(str(data, encoding="utf-8")) if len(res) == 0: print("\033[1;31m當前目錄下暫無文件\033[0m".center(40, "-")) else: for i,j in enumerate(res): k = i + 1 dic[k] = j print("\033[1;37m%s:%s\033[0m" % (k, j)) choice = input("請選擇要下載的文件序號:>>>") cm = dic[int(choice)] client.sendall(bytes(cm, encoding="utf-8")) print("\033[1;37m準備開始下載文件\033[0m") dir = os.path.join(updir, dic[int(choice)]) res = str(client.recv(1024).decode()).split("+") res_length = res[0] or_md5 = res[1] # print(or_md5) length = 0 f = open(dir, "wb") m = hashlib.md5() while length < int(res_length): if int(res_length) - length > 1024: # 要收不止一次 size = 1024 else: # 最後一次了,剩多少收多少 size = int(res_length) - length # print("最後一次剩余的:", size) data = client.recv(size) length += len(data) m.update(data) f.write(data) progressbar(length, int(res_length)) else: new_md5 = m.hexdigest() # print(new_md5) f.close() if new_md5 == or_md5: print("\033[1;37m文件下載成功\033[0m") return True else: print("\033[1;37m文件下載失敗\033[0m") return False def switch(client,user_info,name): ‘‘‘ 切換目錄操作函數,包括“ls”,“cd”,“rm”,“mkdir” :param client: 客戶端 :param user_info: 用戶信息 :param name: 用戶名 :return: none ‘‘‘ command = ‘‘‘ ls cd rm mkdir 目錄名 ‘‘‘ # while True: print("\033[1;33m%s\033[0m" % command) c = input("請輸入您要操作的命令:>>>").strip() if c == "ls": view_file(client, user_info, name) elif c == "cd": cm = "cd+" + user_info client.sendall(cm.encode("utf-8")) dirs = eval(client.recv(1024).decode()) if len(dirs) != 0: for j in dirs: print("\033[1;37m目錄:%s\033[0m" % (j)) choice = input("請輸入“cd”的目錄名稱:>>").strip() if choice in dirs: client.sendall(choice.encode("utf-8")) sign = client.recv(1024).decode() # print(len(res[0]),len(res[1])) if sign == "True": print("\033[1;31m%s目錄切換成功\033[0m" % (choice)) else: print("\033[1;31m%s目錄切換失敗\033[0m" % (choice)) else: print("\033[1;31m輸入有誤\033[0m") client.sendall("error".encode("utf-8")) exit() else: print("\033[1;31m無其它目錄\033[0m") elif c.split(" ")[0] == "mkdir": cm = "mkdir+" + user_info + "+" + c.split(" ")[1] # print(cm) client.sendall(cm.encode("utf-8")) res = client.recv(1024).decode() if res == "True": print("\033[1;37m目錄創建成功\033[0m") else: print("\033[1;37m目錄創建失敗\033[0m") elif c == "rm": cm = "rm+" + user_info # print(cm) client.sendall(cm.encode("utf-8")) res_list = eval(client.recv(1024)) if len(res_list) == 0: print("\033[1;37m無其它文件\033[0m") client.sendall("error".encode("utf-8")) else: for i in res_list: print("\033[1;37m文件:%s\033[0m" % i) choice = input("請輸入“rm”的文件名稱:>>").strip() if choice in res_list: client.sendall(choice.encode("utf-8")) if client.recv(1024).decode() == "True": print("\033[1;37m文件刪除成功\033[0m") else: print("\033[1;37m文件刪除失敗\033[0m") else: print("\033[1;37m輸入有誤\033[0m") client.sendall("error".encode("utf-8")) exit() else: print("\033[1;37m輸入有誤\033[0m") exit() def view_file(client,user_info,name): ‘‘‘ 客戶端查看當前目錄下文件的函數 :param client: scoket客戶端標誌 :param user_info: 客戶端登陸的用戶信息 :param name: 客戶端登陸的用戶名字 :return: none ‘‘‘ command = "view+"+user_info client.sendall(bytes(command,encoding="utf-8")) dirs = client.recv(1024) # print(dirs,"1111111111111111") if dirs.decode() == "False": dir = [] else: dir = eval(str(dirs,encoding="utf-8")) files = client.recv(1024) file = eval(str(files, encoding="utf-8")) client.sendall("true".encode("utf-8")) storage = str(client.recv(1024).decode()) # print(storage) if len(file) == 0 and len(dir) == 0: print("\033[1;31m當前目錄下暫無文件\033[0m".center(40, "-")) else: print("\033[1;33m當前目錄包含以下文件內容\033[0m".center(30,"*")) print("\033[1;35m磁盤大小:%skb\033[0m" % storage) for j in dir: print("\033[1;35m目錄:%s\033[0m"%j) for i in file: print("\033[1;35m文件:%s\033[0m"%i) print("".center(33,"*")) def operate(client,user_info,name): ‘‘‘ 客戶端操作主函數 :param client: scoket客戶端標誌 :param user_info: 客戶端登陸的用戶信息 :param name: 客戶端登陸的用戶名字 :return: none ‘‘‘ dic = {"1":upload,"2":download,"4":view_file,"3":switch} info = ‘‘‘------操作指令------ 1、上傳文件 2、下載文件 3、切換目錄操作 4、查看目錄下文件 5、退出 ‘‘‘ while True: print("\033[1;33m%s\033[0m" % info) choice = input("請輸入你要操作的命令:>>>").strip() if choice.isdigit() and 0 < int(choice) <= len(dic): dic.get(choice)(client,user_info,name) elif choice.isdigit() and int(choice) == 5: break else: print("\033[1;31m輸出錯誤\033[0m".center(40, "-")) def com_parse(client,com): ‘‘‘ 客戶端用戶登陸註冊命中解析函數 :param client: 客戶端scoket標誌 :param com: 命令 :return: 登陸成功返回True,否則False ‘‘‘ # print(com) client.sendall(bytes(com,encoding="utf-8")) re = client.recv(4096) if str(re,encoding="utf-8") == "Success": return True elif str(re, encoding="utf-8") == "Success": return False def login(client,data): ‘‘‘ 客戶端用戶登陸函數 :param client: 客戶端scoket標誌 :param data: 數據 :return: none ‘‘‘ name = input("請輸入您的名字:>>>").strip() psd = input("請輸入密碼:>>>").strip() user_info = name+"+"+psd com = "login+"+user_info # com_parse(client, com) if com_parse(client,com): print("\033[1;31m登陸成功\033[0m") operate(client,user_info,name) else: print("\033[1;31m登陸出現異常\033[0m") def register(client,data): ‘‘‘ 客戶端用戶註冊函數 :param client: 客戶端scoket標誌 :param data: 數據 :return: none ‘‘‘ name = input("請輸入您的名字:>>>").strip() psd = input("請輸入密碼:>>>").strip() com = "register+" + name + "+" + psd if com_parse(client,com): print("\033[1;31m註冊成功\033[0m") user_info = name + "+" + psd operate(client, user_info, name) else: print("\033[1;31m註冊出現異常\033[0m") def quit(client,data): ‘‘‘ 程序退出函數 :param client: 客戶端scoket標誌 :param data: 用戶數據 :return: none ‘‘‘ exit() def main_func(client,data): ‘‘‘ 客戶端主菜單函數 :param client: 客戶端scoket標誌 :param data: 數據 :return: none ‘‘‘ dic = {"1":login,"2":register,"3":quit} info = ‘‘‘------用戶登錄界面------*{0}* 1、登陸 2、註冊 3、退出 ‘‘‘.format(data) print("\033[1;33m%s\033[0m"%info) what = input("你要幹嘛?>>>").strip() if what.isdigit() and 0 < int(what) <= len(dic): dic.get(what)(client,data) else: print("\033[1;31m輸出錯誤\033[0m".center(40,"-")) def progressbar(cur, total): ‘‘‘ 進度條處理函數 :param cur: 當前文件內容長度 :param total: 總長度 :return: none ‘‘‘ percent = ‘{:.2%}‘.format(cur / total) sys.stdout.write(‘\r‘) sys.stdout.write(‘[%-50s] %s‘ % (‘=‘ * int(math.floor(cur * 50 / total)), percent)) # print(‘[%-50s] %s\r‘ % (‘=‘ * int(math.floor(cur * 50 / total)), percent),) sys.stdout.flush() time.sleep(0.01) if cur == total: sys.stdout.write(‘\n‘) if __name__ == ‘__main__‘: client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(("localhost",PORT)) client.send("True".encode("utf-8")) # print("True 發送成功") # main_func(client,client.recv(1024)) main_func(client,"connect") client.close()
2、FTPServer/conf/settings.py
import os basedir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) user_home = "%s/FTPServer/home"%basedir user_info = "%s/db"%basedir # print(user_home) # print(user_info) HOST = "0.0.0.0" PORT = 9998
3、FTPServer/src/common.py
from __future__ import division import logging,os,pickle,sys,uuid,math,time frame = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(frame) # from conf import setting # # def sys_logging(content,levelname): # ‘‘‘ # 程序記錄日誌函數 # :param content: 日誌的內容 # :param levelname: 日誌的等級 # :return: none # ‘‘‘ # _filename = os.path.join(setting.log_dir,"log_sys.log") # log = logging.getLogger(_filename) # logging.basicConfig(filename=_filename,level=logging.INFO,format=‘%(asctime)s-%(levelname)s-%(message)s‘, datefmt=‘%m/%d/%Y %I:%M:%S %p‘) # if levelname == ‘debug‘: # logging.debug(content) # elif levelname == ‘info‘: # logging.info(content) # elif levelname == ‘warning‘: # logging.warning(content) # elif levelname == ‘error‘: # logging.error(content) # elif levelname == ‘critical‘: # logging.critical(content) def show(msg,msg_type): ‘‘‘ 程序不同信息打印的字體顏色 :param msg: 打印信息 :param msg_type: 打印信息的類型 :return: none ‘‘‘ if msg_type == "info": show_msg = "\033[1;35m%s\033[0m"%msg elif msg_type == "error": show_msg = "\033[1;31m%s\033[0m"%msg elif msg_type == "msg": show_msg = "\033[1;37m%s\033[0m"%msg else: show_msg = "\033[1;32m%s\033[0m"%msg print(show_msg) def progressbar(cur, total): ‘‘‘ 進度條輸出函數 :param cur: 目前的長度 :param total: 總共的長度 :return: none ‘‘‘ percent = ‘{:.2%}‘.format(cur / total) sys.stdout.write(‘\r‘) sys.stdout.write(‘[%-50s] %s‘ % (‘=‘ * int(math.floor(cur * 50 / total)), percent)) # print(‘[%-50s] %s\r‘ % (‘=‘ * int(math.floor(cur * 50 / total)), percent),) sys.stdout.flush() time.sleep(0.01) if cur == total: sys.stdout.write(‘\n‘)
4、FTPServer/src/Server_start.py
import socketserver,os,sys Base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(Base_dir) from conf import settings from common import show from user import User class FTPserver(socketserver.BaseRequestHandler): ‘‘‘ 服務端類 ‘‘‘ def handle(self): ‘‘‘ 重構handle :return: none ‘‘‘ if self.request.recv(1024) == b‘True‘: show("收到{0}的連接請求,正在通信中。。。".format(self.client_address),"info") # try: while True: self.cmd = self.request.recv(4069) # print(self.cmd) if not self.cmd: break elif self.cmd == b‘‘: break else: data = str(self.cmd.decode(encoding="utf-8")) res = data.split("+") if hasattr(self,res[0]): func = getattr(self,res[0]) func(res) else: show("wrong action","error") # except Exception as e: # print(e) # show("客戶端發生錯誤","erroe") def login(self,res): ‘‘‘ 登陸函數 :param res: 命令結果 :return: none ‘‘‘ show("收到客戶端登陸的請求,正在登陸。。。", "msg") name = res[1] psd = res[2] user = User(name, psd) sign = user.login() if sign: self.request.sendall(bytes("Success", encoding="utf-8")) else: self.request.sendall(bytes("Failure", encoding="utf-8")) def register(self,res): ‘‘‘ 註冊 :param res: 命令結果 :return: none ‘‘‘ show("收到客戶端註冊的請求,正在註冊。。。", "msg") name = res[1] psd = res[2] user = User(name, psd) if user.register(): self.request.sendall(bytes("Success", encoding="utf-8")) else: self.request.sendall(bytes("Failure", encoding="utf-8")) def view(self,res): ‘‘‘ 查看當前目錄下文件函數 :param res: 命令結果 :return: none ‘‘‘ show("收到客戶端查看當前目錄文件的請求。。。", "msg") name = res[1] psd = res[2] user = User(name, psd) dirs,files = user.view_file() # print(dirs,files) dir = str(dirs) file = str(files) if len(dirs) == 0: self.request.sendall("False".encode("utf-8")) else: self.request.sendall(bytes(dir, encoding="utf-8")) self.request.sendall(bytes(file, encoding="utf-8")) self.request.recv(1024) dic = User.info_read(name) storage = str(dic["storage"]) # print(storage,type(storage)) self.request.sendall(bytes(storage, encoding="utf-8")) show("當前目錄文件查看或創建成功", "info") def upload(self,res): ‘‘‘ 上傳文件函數 :param res: 命令結果 :return: none ‘‘‘ show("收到客戶端上傳文件的請求。。。", "msg") name = res[1] filename = res[3] self.request.sendall(bytes("True", encoding="utf-8")) res = int(self.request.recv(1024).decode()) # print(res,"nice") if User.receive(filename, name, res,self.request): self.request.sendall(bytes("True", encoding="utf-8")) else: self.request.sendall(bytes("False", encoding="utf-8")) def download(self,res): ‘‘‘ 下載文件函數 :param res: 命令結果 :return: none ‘‘‘ show("收到客戶端下載文件的請求。。。", "msg") name = res[1] psd = res[2] user = User(name, psd) dirs,files = user.view_file() file = str(files) self.request.sendall(bytes(file, encoding="utf-8")) res = self.request.recv(1024).decode() # print(str(res)) if User.download_file(res,name,self.request): show("文件下載成功", "info") else: show("文件下載失敗", "error") def cd(self,res): ‘‘‘ “cd”函數 :param res: 命令結果 :return: none ‘‘‘ show("收到客戶端“cd”的請求。。。", "msg") name = res[1] psd = res[2] user = User(name,psd) res = user.cd_command(self.request) dirs = str(res) self.request.sendall(str(dirs).encode("utf-8")) dir1 = self.request.recv(1024).decode() if dir1 == "error": show("客戶端輸入錯誤","error") self.request.close() else: sign = user.cd_dir(self.request,dir1,name) # print(sign) self.request.sendall(str(sign).encode("utf-8")) def mkdir(self,res): show("收到客戶端“mkdir”的請求。。。", "msg") name = res[1] psd = res[2] user = User(name,psd) dir_name = res[3] # print(dir_name) sign = user.mkdir(self.request,dir_name) # print(sign) if sign: self.request.sendall("True".encode("utf-8")) else: self.request.sendall("False".encode("utf-8")) def rm(self,res): show("收到客戶端“rm”的請求。。。", "msg") name = res[1] psd = res[2] user = User(name,psd) files = user.rm(self.request) # print(files) self.request.sendall(str(files).encode("utf-8")) file_name = self.request.recv(1024).decode() if file_name == "error": show("客戶端不穩定","error") else: sign = user.rm_file(self.request,file_name) if sign: show("文件刪除成功", "info") self.request.sendall("True".encode("utf-8")) else: self.request.sendall("False".encode("utf-8")) if __name__ == ‘__main__‘: show("等待客戶端連接。。。", "info") server = socketserver.ThreadingTCPServer(("localhost",settings.PORT),FTPserver) server.serve_forever()
5、FTPServer/src/user.py
import os,sys,pickle,socket,time,random,hashlib Base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(Base_dir) from conf import settings from common import show,progressbar class User(object): ‘‘‘ 用戶類 ‘‘‘ def __init__(self,username,psd): self.name = username self.password = psd self.home_path = settings.user_home + "/" +self.name def login(self): ‘‘‘ 用戶登陸方法 :return: ‘‘‘ user_dic = User.info_read(self.name) print(user_dic,"qqqq") if user_dic.get(self.name) == self.password: show("登陸成功","info") user_dic["dir"] = self.home_path User.info_write(self.name,user_dic) return True else: show("登陸失敗,用戶名或密碼錯誤","error") return False def register(self): ‘‘‘ 用戶註冊方法 :return: ‘‘‘ dic = {} dic[self.name] = self.password dic["storage"] = random.randint(10240,20480) dic["dir"] = self.home_path if User.info_write(self.name,dic): show("註冊成功","info") os.mkdir(self.home_path) os.mkdir("%s/others" % self.home_path) with open("%s\空白文件" % self.home_path, "w") as f: f.write("空白文件") return True else: show("註冊失敗","error") return False def view_file(self): ‘‘‘ 查看當前目錄下文件 :return: 目錄下文件名組成的列表 ‘‘‘ if not os.path.exists(self.home_path): os.mkdir(self.home_path) os.mkdir("%s/others"%self.home_path) with open("%s\空白文件"%self.home_path,"w") as f: f.write("空白文件") user_dic = User.info_read(self.name) if user_dic["dir"] == os.path.join(os.path.join(Base_dir, "home"), self.name): dir = os.path.join(os.path.join(Base_dir, "home"), self.name) else: dir = user_dic["dir"] for root, dirs, files in os.walk(dir): return dirs,files def cd_command(self,con): for root,dirs,files in os.walk(self.home_path): return dirs def cd_dir(self,con,dir,name): next_dir = self.home_path+"/" +dir user_dic = User.info_read(name) # print(user_dic) user_dic["dir"] = next_dir User.info_write(name,user_dic) return True def mkdir(self,con,res_dir): user_dic = User.info_read(self.name) if user_dic["dir"] == os.path.join(os.path.join(Base_dir, "home"), self.name): dir = os.path.join(os.path.join(Base_dir, "home"), self.name) else: dir = user_dic["dir"] next_dir = dir+"/" +res_dir if os.path.exists(next_dir): show("該目錄已存在", "error") return False else: os.mkdir(next_dir) show("目錄創建成功","info") return True def rm(self,con): user_dic = User.info_read(self.name) if user_dic["dir"] == os.path.join(os.path.join(Base_dir, "home"), self.name): dir = os.path.join(os.path.join(Base_dir, "home"), self.name) else: dir = user_dic["dir"] for root,dirs,files in os.walk(dir): return files def rm_file(self,con,file_name): user_dic = User.info_read(self.name) if user_dic["dir"] == os.path.join(os.path.join(Base_dir, "home"), self.name): dir = os.path.join(os.path.join(Base_dir, "home"), self.name) else: dir = user_dic["dir"] os.remove(dir+"/" +file_name) return True @staticmethod def download_file(filename,name,con): ‘‘‘ 下載文件靜態方法 :param filename: 文件名 :param name: 用戶名 :param con: 標誌 :return: none ‘‘‘ user_dic = User.info_read(name) if user_dic["dir"] == os.path.join(os.path.join(Base_dir, "home"), name): user_dir = os.path.join(os.path.join(Base_dir, "home"), name) else: user_dir = user_dic["dir"] dir = os.path.join(user_dir, filename) f = open(dir,"rb") m = hashlib.md5() data = f.read() m.update(data) a = str(len(data))+"+"+ m.hexdigest() # print(a) con.sendall(bytes(a, encoding="utf-8")) con.sendall(data) f.close() return True @staticmethod def receive(filename,name,res,con): ‘‘‘ 接收文件靜態方法 :param filename: 文件名 :param name: 用戶名 :param con: 標誌 :return: none ‘‘‘ user_dic = User.info_read(name) if user_dic["dir"] == os.path.join(os.path.join(Base_dir, "home"), name): dir = os.path.join(os.path.join(os.path.join(Base_dir, "home"), name), filename) else: dir_name = user_dic["dir"] dir = os.path.join(dir_name, filename) # print(res,user_dic["storage"]) if res/1024 < user_dic["storage"]: con.sendall("ok".encode("utf-8")) length = 0 f = open(dir, "wb") md5 = hashlib.md5() while length < res: if res - length > 1024: # 要收不止一次 size = 1024 else: # 最後一次了,剩多少收多少 size = res - length # print("最後一次剩余的:", size) data = con.recv(size) length += len(data) md5.update(data) f.write(data) progressbar(length, res) else: new_md5 = md5.hexdigest() f.close() # Num = User.download_Progress(res, length, Num) or_md5 = con.recv(1024) # print(new_md5) # print(or_md5) if new_md5 == or_md5.decode(): show("文件下載成功", "info") return True else: show("文件不一致", "error") return False elif res/1024 > user_dic["storage"]: con.sendall("no".encode("utf-8")) show("磁盤空間不足", "error") return False @staticmethod def info_read(name): ‘‘‘ 讀取用戶數據的靜態方法 :param name: 用戶名 :return: 字典 ‘‘‘ user_dir = os.path.join(settings.user_info,name) if os.path.exists(user_dir): with open(user_dir,"rb") as f: dic = pickle.load(f) return dic else: print("用戶數據為空") @staticmethod def info_write(name,dic): ‘‘‘ 寫入用戶數據的靜態方法 :param name:用戶名 :param dic:用戶信息字典 :return:True ‘‘‘ user_dir = os.path.join(settings.user_info, name) with open(user_dir,"wb") as f: pickle.dump(dic,f) return True
六、界面顯示
python之路——作業:高級FTP(僅供參考)