1. 程式人生 > >七牛雲-python sdk 下載完整實現

七牛雲-python sdk 下載完整實現

目標:

  • 熟悉七牛python sdk 的使用
  • 呼叫七牛的domain/list 介面獲取空間繫結的域名列表
  • 通過七牛python sdk 實現下載

概述:

七牛提供了python sdk, 可以進行上傳、下載等功能; 本文主要是將一些知識點連線起來,在熟悉python 的基礎上,同時實現下載的功能。 涉及到的一些小的知識點有:匯入模組、正則表示式、字串切片、requests包、熟悉七牛sdk、熟悉七牛獲取空間繫結域名列表的介面,以及Python File的一些知識; 雖然在本文的程式碼示例中體現的不是很多,但是在程式編寫的過程中, 對於上述的知識點都有了很多的鞏固。

程式流程圖:

qiniu-python-download-process

程式示例:

from qiniu import Auth, urlsafe_base64_encode
from common.AccountMgr import AccountMgr
import requests, json, array, re, os

'''
author: xuhuanchao
date: 2017-07-11
AccountMgr 是自己定義的模組,存放了AK, SK 等賬號資訊
'''
accountMgr = AccountMgr()
accessKey = accountMgr.getAccessKey()
secretKey = accountMgr.getSecretKey()

#通過AK, SK 建立 auth物件
auth = Auth(accessKey, secretKey) def __getDownloadUrl(bucketName, key): ''' 獲取下載的URL :param bucketName: 空間名稱 :param key: 空間儲存的檔名稱 :return: ''' flag = __isPrivateBucket(bucketName) domainList = getDomainByBucket(bucketName) print(domainList) domain = '' for
i in range(len(domainList)): pattern = re.compile("[a-z0-9.]*clouddn.com]") match = pattern.match(domainList[i]) if match: continue else: domain = domainList[i] #如果空間沒有繫結自定義域名, 則使用測試域名 if(domain == ''): domain = domainList[0] baseUrl = "http://" + domain + "/" + key if(flag): downloadUrl = auth.private_download_url(baseUrl) else: downloadUrl = baseUrl return downloadUrl def __isPrivateBucket(bucketName): ''' 判斷是否是私有空間, 用admin 開頭命名的空間都是 私有空間; tips: 自己的業務控制 :param bucketName: 空間名稱 :return: bool ''' if bucketName == "admin" or bucketName[0:5] == "admin": return True else: return False def getDomainByBucket(bucketName): ''' 通過七牛雲介面, 獲取空間繫結的域名 參考文件:https://developer.qiniu.com/kodo/api/1612/bucket-domainlist :param bucketName: 空間名稱 :return: list of bucket's domain ''' connector = "/v6/domain/list?tbl={0}".format(bucketName) url = "http://api.qiniu.com" + connector try: accessToken = auth.token_of_request(connector) headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization": "QBox " + accessToken } resp = requests.get(url, headers=headers) resp.raise_for_status() if(resp.status_code == requests.codes.ok): return list(resp.json()) except Exception as e: print("出現異常...資訊如下:\n" + e) def download(bucketName, key, localPath): ''' 開始下載 :param bucketName: 空間名稱 :param key: 下載的檔名稱 :param localPath: 儲存到本地的路徑 :return: None ''' downloadUrl = __getDownloadUrl(bucketName, key) print(downloadUrl) resp = requests.get(downloadUrl) path = localPath + "/" + key with open(path, 'wb+') as f: f.write(resp.content) if(__name__ == "__main__"): bucketName = "test-bucket" key = "stage.jpg" localPath = "/Users/ryanxu/Documents" download(bucketName, key, localPath)