1. 程式人生 > >ceph 對象存儲 創建api

ceph 對象存儲 創建api

content code keys min ger sta stderr vat 單例模式

# -*- coding:utf-8 -*- import boto import boto.s3.connection import paramiko class Accountinfo(): """ 用法詳見 http://docs.ceph.org.cn/man/8/radosgw-admin/ """ def __init__(self): self.hostname = ‘192.168.44.70‘ self.port = 22 self.username = ‘root‘ self.passwd = ‘123456‘ def new_connect(self): try: paramiko.util.log_to_file(‘paramiko.log‘) ssh = paramiko.SSHClient() ssh.load_system_host_keys() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=‘192.168.44.70‘,port=22,username=‘root‘,password=‘123456‘) return ssh except Exception as e: return ‘error‘ def user_manager(self,username,flag): """ 為s3訪問創建radosgw用戶 flag: c -> create,d -> delete """ ssh = self.new_connect() if not isinstance(ssh,str): c_command = ‘/usr/bin/radosgw-admin user create --uid="%s" --display-name="%s"‘ % (username, username.title()) d_command = ‘/usr/bin/radosgw-admin user rm --uid="%s"‘ % (username) if flag == ‘c‘: stdin, stdout, stderr = ssh.exec_command(c_command) outstr = stdout.read() errstr = stdout.read() ssh.close() return outstr,errstr elif flag == ‘d‘: stdin, stdout, stderr = ssh.exec_command(d_command) ssh.close() return ‘delete %s success‘% username # return stdout, stderr else: ssh.close() return ‘flag == c or d‘,‘flag error‘ return ‘connect error‘ class CephS3(): # 單例模式 __instance = None def __init__(self): self.access_key = "BKOLF8C5319QK2UIMQ09" self.secret_key = "jBiFwY3LeHh78tM9W6Y8oQUM2VNIbieGVViB3wEB" self.host = ‘192.168.44.70‘ self.port = 7480 self.conn = boto.connect_s3( aws_access_key_id= self.access_key, aws_secret_access_key= self.secret_key, host = self.host, port = self.port, is_secure=False, calling_format = boto.s3.connection.OrdinaryCallingFormat() ) @staticmethod def get_connect(): if CephS3.__instance: return CephS3.__instance else: CephS3.__instance = CephS3().conn return CephS3.__instance def list_all_buckets(self): con = CephS3.get_connect() all_buckets = con.get_all_buckets() for bucket in all_buckets: print("{name}\t{created}".format(name=bucket.name,created=bucket.creation_date)) def create_bucket(self,bucketname): # con = self.connect() con = CephS3.get_connect() all_bucket = con.get_all_buckets() all_bucket_name = [i.name for i in all_bucket] try: if bucketname not in all_bucket_name: bucket = con.create_bucket(bucketname) return ‘ok‘ # print(‘Bucket %s create success‘% bucketname) else: return ‘fail‘ # print(‘Bucket %s already exists!‘ % bucketname) except Exception as e: return str(e) def delete_bucket(self,bucketname): con = CephS3.get_connect() all_bucket = con.get_all_buckets() all_bucket_name = [i.name for i in all_bucket] try: if bucketname in all_bucket_name: bucket = con.delete_bucket(bucketname) return ‘ok‘ else: return ‘fail‘ except Exception as e: print(str(e)) def list_bucket_object(self,bucketname): con = CephS3.get_connect() bucket = con.get_bucket(bucketname) list_buckets = bucket.list() print("%-10s\t%-10s\t%-10s\n"%(‘Name‘,‘Size‘,‘Modified‘)) for key in list_buckets: print("{name:<10}\t{size:<10}\t{modified:<10}".format(name=key.name,size=key.size,modified=key.last_modified)) def create_object(self,bucketname,obj_key,obj_value): """新建一個內容是字符串 obj_value 的文件 obj_key""" con = CephS3.get_connect() bucket = con.get_bucket(bucketname) list_buckets = bucket.list() obj_keys = [i.name for i in list_buckets] try: if obj_key not in obj_keys: bucket_key = bucket.new_key(obj_key) bucket_key.set_contents_from_string(obj_value) return ‘ok‘ return ‘exists‘ except Exception as e: return str(e) def modifi_object_acl(self,bucketname,obj_key,acl=‘public-read‘): """ 修改對象的權限 :param bucketname: :param obj_key: :param acl: public-read 公開可讀 private 私有 :return: """ # con = self.connect() con = CephS3.get_connect() bucket = con.get_bucket(bucketname) try: bucket_key = bucket.get_key(obj_key) bucket_key.set_canned_acl(acl) return ‘ok‘ except Exception as e: return str(e) def make_obj_url(self,bucketname,obj_key,sign=False): try: con = CephS3.get_connect() bucket = con.get_bucket(bucketname) key = bucket.get_key(obj_key) if not sign: key_url_no_sign = key.generate_url(0,query_auth=False,force_http=True) return key_url_no_sign key_url_sign = key.generate_url(3600,query_auth=True,force_http=True) return key_url_sign except Exception as e: return str(e) def download_to_file(self,bucketname,obj_key,dest_file): try: # con = self.connect() con = CephS3.get_connect() bucket = con.get_bucket(bucketname) key = bucket.get_key(obj_key) key.get_contents_to_filename(dest_file) return ‘ok‘ except Exception as e: return str(e) def delete_bucket_key(self,bucketname,obj_key): try: # con = self.connect() con = CephS3.get_connect() bucket = con.get_bucket(bucketname) bucket.delete_key(obj_key) return ‘ok‘ except Exception as e: return str(e) if __name__ == ‘__main__‘: cephapi = CephS3() url = cephapi.make_obj_url(‘bucket_bamboo‘,‘haproxy.txt‘,True) print(url) # cephapi.list_bucket_object(‘bucket_bamboo‘) # 創建用戶 獲取keys 信息 # um = Accountinfo() # out, err = um.user_manager(‘wuye‘,‘c‘) # # if type(out) == str: # res = json.loads(out) # print(json.dumps(res["keys"][0],indent=2)) # else: # print(out,err)

ceph 對象存儲 創建api