Python SDK示例程序
更新时间:
Python SDK 简易使用示例
本节以示例的方式展示如何使用SDK高级接口进行开发。用户在阅读本节后,可模仿示例,并参考高级接口一节进行开发。
其中,方括号内的参数用户应根据实际需求进行替换。
- 创建 Vault
from oas.oas_api import OASAPIfrom oas.ease.vault import Vault# 创建 OASAPI 对象api = OASAPI('[Server Host]', '[Access Key ID]', '[Access Key Secret]')# 创建 Vaultvault = Vault.create_vault(api, '[Vault Name]')
- 查找Vault
# 创建 OASAPI 对象api = OASAPI('[Server Host]', '[Access Key ID]', '[Access Key Secret]')# 根据名称获取 Vaultvault = Vault.get_vault_by_name(api, '[Vault Name]')# 根据 ID 获取 Vaultvault = Vault.get_vault_by_id(api, '[Vault ID]')
- 上传文件
archive_id = vault.upload_archive('[File Path]')
- 删除 Archive
vault.delete_archive('[Archive ID]')
- 续传 Multipart Upload 任务
uploader = vault.recover_uploader('[Upload ID]')uploader.resume('[File Path]')
- 获取 Archive 列表
job = vault.retrieve_inventory()job.download_to_file('[File Path]')
- 下载 Archive
job = vault.retrieve_archive('[Archive ID]')job.download_to_file('[File Path]')
- 从OSS上转储Object到OAS
job = vault.pull_from_oss(conf.osshost, conf.bucket, conf.object, "test desc")
- 从OAS上转储Archive到OSS
job = vault.push_to_oss(archive_id, conf.osshost, conf.bucket, archive_id, "test desc")
Python SDK 完整使用演示代码
下面函数中test_single_archive_upload提供单一文件archive上传;
函数test_multi_upload() 使用sdk低级接口实现分段上传;
函数test_uploader()使用sdk高级接口实数据上传(当数据大于64MB时,会自动选择分段上传);
函数test_vault_retrieve()实现vault信息查询;
函数test_download_archive(archive_id)实现archive下载;
函数test_delete_archive(archive_id)实现archive删除。
函数test_pull_from_oss() 实现从OSS直接转储到OAS
函数test_push_to_oss() 实现从OAS直接转储到OSS
import randomimport timeimport loggingimport logging.handlersfrom oas.oas_api import OASAPIfrom oas.ease.api import APIProxyfrom oas.ease.exceptions import *from oas.ease.response import *from oas.ease.utils import *from oas.ease.vault import *from oas.ease.uploader import *from oas.ease.job import *import osLOG_FILE="test.log"handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes = 1024*1024, backupCount = 5)fmt = '%(asctime)s - %(filename)s:%(lineno)s - %(name)s - %(message)s'formatter = logging.Formatter(fmt)handler.setFormatter(formatter)log.addHandler(handler)log.setLevel(logging.DEBUG)class TestConf(object):def __init__(self):self.host = 'oas域名'self.accesskey_id = "您的access key Id"self.accesskey_secret = "您的access key secret"self.vault_name = "normal"self.vault_name_test = "test"self.osshost = "您要转储的oss域名"self.bucket = "您要转储的bucket"self.object = "您要转储的Object"conf = TestConf()class TestDemo():_MEGABYTE = 1024 * 1024def __init__(self):self.api = OASAPI(conf.host, conf.accesskey_id, conf.accesskey_secret)self.api_proxy = APIProxy(self.api)self.vault_name = conf.vault_nameself.vault_name_test = conf.vault_name_testself.file_big = "random100M.bin"self.file_big_size = 200*self._MEGABYTEwith open(self.file_big, 'wb+') as f:self.write_random_data(f, self.file_big_size)self.file_small = "random30M.bin"self.file_small_size = 30*self._MEGABYTEwith open(self.file_small, 'wb+') as f:self.write_random_data(f, self.file_small_size)def write_random_data(self, file, size):remaining = sizewhile remaining > 0:n = min(remaining, 1024)random_data = os.urandom(n)file.write(random_data)remaining = remaining - nn = min(remaining, 1024 * random.randint(256, 1024))file.write('\0' * n)remaining = remaining - ndef test_single_archive_upload(self):print '''########################################################Using the low level api method to upload a small archive#########################################################'''res = self.api_proxy.create_vault(self.vault_name)vault_id = res['x-oas-vault-id']etag = compute_etag_from_file(self.file_small)tree_etag = compute_tree_etag_from_file(self.file_small)with open(self.file_small, 'rb') as f:content = f.read()res = self.api_proxy.post_archive(vault_id, content, etag, tree_etag)archive_id = res['x-oas-archive-id']print "archive_id",archive_id#test the multipart upload by the proxy api, and this is low level apidef test_multi_upload(self):print '''\n\n\n########################################################Using the low level api to invoke the multipart api and implement the archive uplod#########################################################'''res = self.api_proxy.create_vault(self.vault_name)vault_id = res['x-oas-vault-id']part_size = 1024 * 1024 * 64etag_array= []tree_etag_array= []offset = 0cur_part_num=0cur_start_pos = 0cur_end_pos = 0print "1. comput the etag,tree-etag"print "1.1 comput the etag , tree_etag of part"while True:tmpsize = part_sizeif(cur_start_pos + tmpsize > self.file_big_size):tmpsize = self.file_big_size - cur_start_pos;cur_end_pos += tmpsize -1etag_array.append(compute_etag_from_file(self.file_big, cur_start_pos, tmpsize) )tree_etag_array.append(compute_tree_etag_from_file(self.file_big, cur_start_pos, tmpsize))cur_start_pos += tmpsizecur_part_num += 1if(cur_start_pos >= self.file_big_size-1):break;print "1.2 comput the total tree_etag of the archive"tree_etag_total = compute_combine_tree_etag_from_list( tree_etag_array) ;print "2.1 init the upload task, and get the uploadId"res = self.api_proxy.create_multipart_upload(vault_id, part_size)upload_id = res['x-oas-multipart-upload-id']print "upload_id",upload_idf = open(self.file_big, 'rb')cur_part_num = 0cur_start_pos = 0cur_end_pos = 0while True:tmpsize = part_sizeif(cur_start_pos + tmpsize > self.file_big_size):tmpsize = self.file_big_size - cur_start_pos;cur_end_pos += tmpsizeprint "2.2 upload every part to oas server, and the etag of the part will be used. current part is:", cur_part_num+1self.api_proxy.post_multipart_from_reader(vault_id, upload_id, f, tmpsize,('%d-%d' %(cur_start_pos, cur_end_pos-1)), etag_array[cur_part_num],tree_etag_array[cur_part_num])cur_start_pos += tmpsizecur_part_num += 1if(cur_end_pos>= self.file_big_size -1):break;print "2.3 complete the multipart task, and the total etag will be used"res = self.api_proxy.complete_multipart_upload(vault_id, upload_id, self.file_big_size, tree_etag_total)print "output the archive id"archive_id = res['x-oas-archive-id']print "archive_id",archive_idreturn archive_id#test the uploader to upload big file, and this is high level apidef test_uploader(self):print '''\n\n\n########################################################Using the High level api to invoke the multipart api#########################################################'''vault = Vault.create_vault(self.api, self.vault_name)print "initial a uploadId"uploader = vault.initiate_uploader(self.file_big)uploader_id = uploader.idprint "uploader_id",uploader_idprint "start the multipart"archive_id = uploader.start()print "finish the upload, and output the archive_id"print "archive_id",archive_idreturn archive_id#to inquire the archive list of vaultdef test_vault_retrieve(self):print '''\n\n\n########################################################Retrieve the vault info, and inquire the archive list of the vault#########################################################'''vault = Vault.create_vault(self.api, self.vault_name)job = vault.retrieve_inventory()job.download_to_file(job.id)#to test download archivedef test_download_archive(self, archive_id):print '''\n\n\n########################################################Submit the archive job and download the job to local#########################################################'''vault = Vault.create_vault(self.api, self.vault_name)job = vault.retrieve_archive(archive_id)job.update_status()if not job.completed:job.download_to_file(file_path = job.id, block = True)#test delete archivedef test_delete_archive(self, archive_id):print '''\n\n\n########################################################Delete the archive#########################################################'''vault = Vault.create_vault(self.api, self.vault_name)vault.delete_archive(archive_id)def test_pull_from_oss(self):print '''\n\n\n########################################################submit a pull-from-oss job and OAS will finish the pull of OSS object to OAS#########################################################'''vault = Vault.create_vault(self.api, self.vault_name)# create vaultjob = vault.pull_from_oss(conf.osshost, conf.bucket, conf.object, "test desc")job._check_status(block=True)# delete archiveprint "bucket:%s object:%s finish pull from oss,\n archiveId:%s" % (conf.bucket, conf.object, job.archive_id)def test_push_to_oss(self):print '''\n\n\n########################################################submit a push-to-oss job and OAS will finish the push of OAS archive to OSS#########################################################'''vault = Vault.create_vault(self.api, self.vault_name)archive_id = vault.upload_archive(self.file_big)job = vault.push_to_oss(archive_id, conf.osshost, conf.bucket, archive_id, "test desc")job._check_status(block=True)print archive_id + " finish push to oss"if __name__ == '__main__':t = TestDemo()t.test_single_archive_upload()t.test_multi_upload();archive_id = t.test_uploader();t.test_vault_retrieve();t.test_download_archive(archive_id)t.test_delete_archive(archive_id)t.test_pull_from_oss()t.test_push_to_oss()
该文章对您有帮助吗?