Python SDK示例程序

更新时间:

Python SDK 简易使用示例

本节以示例的方式展示如何使用SDK高级接口进行开发。用户在阅读本节后,可模仿示例,并参考高级接口一节进行开发。

其中,方括号内的参数用户应根据实际需求进行替换。

  • 创建 Vault
  1. from oas.oas_api import OASAPI
  2. from oas.ease.vault import Vault
  3. # 创建 OASAPI 对象
  4. api = OASAPI('[Server Host]', '[Access Key ID]', '[Access Key Secret]')
  5. # 创建 Vault
  6. vault = Vault.create_vault(api, '[Vault Name]')
  • 查找Vault
  1. # 创建 OASAPI 对象
  2. api = OASAPI('[Server Host]', '[Access Key ID]', '[Access Key Secret]')
  3. # 根据名称获取 Vault
  4. vault = Vault.get_vault_by_name(api, '[Vault Name]')
  5. # 根据 ID 获取 Vault
  6. vault = Vault.get_vault_by_id(api, '[Vault ID]')
  • 上传文件
  1. archive_id = vault.upload_archive('[File Path]')
  • 删除 Archive
  1. vault.delete_archive('[Archive ID]')
  • 续传 Multipart Upload 任务
  1. uploader = vault.recover_uploader('[Upload ID]')
  2. uploader.resume('[File Path]')
  • 获取 Archive 列表
  1. job = vault.retrieve_inventory()
  2. job.download_to_file('[File Path]')
  • 下载 Archive
  1. job = vault.retrieve_archive('[Archive ID]')
  2. job.download_to_file('[File Path]')
  • 从OSS上转储Object到OAS
  1. job = vault.pull_from_oss(conf.osshost, conf.bucket, conf.object, "test desc")
  • 从OAS上转储Archive到OSS
  1. job = vault.push_to_oss(archive_id, conf.osshost, conf.bucket, archive_id, "test desc")

Python SDK 完整使用演示代码

下面函数中test_single_archive_upload提供单一文件archive上传;

函数test_multi_upload() 使用sdk低级接口实现分段上传;

函数test_uploader()使用sdk高级接口实数据上传(当数据大于64MB时,会自动选择分段上传);

函数test_vault_retrieve()实现vault信息查询;

函数test_download_archive(archive_id)实现archive下载;

函数test_delete_archive(archive_id)实现archive删除。

函数test_pull_from_oss() 实现从OSS直接转储到OAS

函数test_push_to_oss() 实现从OAS直接转储到OSS

  1. import random
  2. import time
  3. import logging
  4. import logging.handlers
  5. from oas.oas_api import OASAPI
  6. from oas.ease.api import APIProxy
  7. from oas.ease.exceptions import *
  8. from oas.ease.response import *
  9. from oas.ease.utils import *
  10. from oas.ease.vault import *
  11. from oas.ease.uploader import *
  12. from oas.ease.job import *
  13. import os
  14. LOG_FILE="test.log"
  15. handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes = 1024*1024, backupCount = 5)
  16. fmt = '%(asctime)s - %(filename)s:%(lineno)s - %(name)s - %(message)s'
  17. formatter = logging.Formatter(fmt)
  18. handler.setFormatter(formatter)
  19. log.addHandler(handler)
  20. log.setLevel(logging.DEBUG)
  21. class TestConf(object):
  22. def __init__(self):
  23. self.host = 'oas域名'
  24. self.accesskey_id = "您的access key Id"
  25. self.accesskey_secret = "您的access key secret"
  26. self.vault_name = "normal"
  27. self.vault_name_test = "test"
  28. self.osshost = "您要转储的oss域名"
  29. self.bucket = "您要转储的bucket"
  30. self.object = "您要转储的Object"
  31. conf = TestConf()
  32. class TestDemo():
  33. _MEGABYTE = 1024 * 1024
  34. def __init__(self):
  35. self.api = OASAPI(conf.host, conf.accesskey_id, conf.accesskey_secret)
  36. self.api_proxy = APIProxy(self.api)
  37. self.vault_name = conf.vault_name
  38. self.vault_name_test = conf.vault_name_test
  39. self.file_big = "random100M.bin"
  40. self.file_big_size = 200*self._MEGABYTE
  41. with open(self.file_big, 'wb+') as f:
  42. self.write_random_data(f, self.file_big_size)
  43. self.file_small = "random30M.bin"
  44. self.file_small_size = 30*self._MEGABYTE
  45. with open(self.file_small, 'wb+') as f:
  46. self.write_random_data(f, self.file_small_size)
  47. def write_random_data(self, file, size):
  48. remaining = size
  49. while remaining > 0:
  50. n = min(remaining, 1024)
  51. random_data = os.urandom(n)
  52. file.write(random_data)
  53. remaining = remaining - n
  54. n = min(remaining, 1024 * random.randint(256, 1024))
  55. file.write('\0' * n)
  56. remaining = remaining - n
  57. def test_single_archive_upload(self):
  58. print '''
  59. ########################################################
  60. Using the low level api method to upload a small archive
  61. #########################################################'''
  62. res = self.api_proxy.create_vault(self.vault_name)
  63. vault_id = res['x-oas-vault-id']
  64. etag = compute_etag_from_file(self.file_small)
  65. tree_etag = compute_tree_etag_from_file(self.file_small)
  66. with open(self.file_small, 'rb') as f:
  67. content = f.read()
  68. res = self.api_proxy.post_archive(vault_id, content, etag, tree_etag)
  69. archive_id = res['x-oas-archive-id']
  70. print "archive_id",archive_id
  71. #test the multipart upload by the proxy api, and this is low level api
  72. def test_multi_upload(self):
  73. print '''\n\n\n
  74. ########################################################
  75. Using the low level api to invoke the multipart api and implement the archive uplod
  76. #########################################################'''
  77. res = self.api_proxy.create_vault(self.vault_name)
  78. vault_id = res['x-oas-vault-id']
  79. part_size = 1024 * 1024 * 64
  80. etag_array= []
  81. tree_etag_array= []
  82. offset = 0
  83. cur_part_num=0
  84. cur_start_pos = 0
  85. cur_end_pos = 0
  86. print "1. comput the etag,tree-etag"
  87. print "1.1 comput the etag , tree_etag of part"
  88. while True:
  89. tmpsize = part_size
  90. if(cur_start_pos + tmpsize > self.file_big_size):
  91. tmpsize = self.file_big_size - cur_start_pos;
  92. cur_end_pos += tmpsize -1
  93. etag_array.append(compute_etag_from_file(self.file_big, cur_start_pos, tmpsize) )
  94. tree_etag_array.append(compute_tree_etag_from_file(self.file_big, cur_start_pos, tmpsize))
  95. cur_start_pos += tmpsize
  96. cur_part_num += 1
  97. if(cur_start_pos >= self.file_big_size-1):
  98. break;
  99. print "1.2 comput the total tree_etag of the archive"
  100. tree_etag_total = compute_combine_tree_etag_from_list( tree_etag_array) ;
  101. print "2.1 init the upload task, and get the uploadId"
  102. res = self.api_proxy.create_multipart_upload(vault_id, part_size)
  103. upload_id = res['x-oas-multipart-upload-id']
  104. print "upload_id",upload_id
  105. f = open(self.file_big, 'rb')
  106. cur_part_num = 0
  107. cur_start_pos = 0
  108. cur_end_pos = 0
  109. while True:
  110. tmpsize = part_size
  111. if(cur_start_pos + tmpsize > self.file_big_size):
  112. tmpsize = self.file_big_size - cur_start_pos;
  113. cur_end_pos += tmpsize
  114. print "2.2 upload every part to oas server, and the etag of the part will be used. current part is:", cur_part_num+1
  115. self.api_proxy.post_multipart_from_reader(vault_id, upload_id, f, tmpsize,('%d-%d' %(cur_start_pos, cur_end_pos-1)), etag_array[cur_part_num],tree_etag_array[cur_part_num])
  116. cur_start_pos += tmpsize
  117. cur_part_num += 1
  118. if(cur_end_pos>= self.file_big_size -1):
  119. break;
  120. print "2.3 complete the multipart task, and the total etag will be used"
  121. res = self.api_proxy.complete_multipart_upload(
  122. vault_id, upload_id, self.file_big_size, tree_etag_total)
  123. print "output the archive id"
  124. archive_id = res['x-oas-archive-id']
  125. print "archive_id",archive_id
  126. return archive_id
  127. #test the uploader to upload big file, and this is high level api
  128. def test_uploader(self):
  129. print '''\n\n\n
  130. ########################################################
  131. Using the High level api to invoke the multipart api
  132. #########################################################'''
  133. vault = Vault.create_vault(self.api, self.vault_name)
  134. print "initial a uploadId"
  135. uploader = vault.initiate_uploader(self.file_big)
  136. uploader_id = uploader.id
  137. print "uploader_id",uploader_id
  138. print "start the multipart"
  139. archive_id = uploader.start()
  140. print "finish the upload, and output the archive_id"
  141. print "archive_id",archive_id
  142. return archive_id
  143. #to inquire the archive list of vault
  144. def test_vault_retrieve(self):
  145. print '''\n\n\n
  146. ########################################################
  147. Retrieve the vault info, and inquire the archive list of the vault
  148. #########################################################'''
  149. vault = Vault.create_vault(self.api, self.vault_name)
  150. job = vault.retrieve_inventory()
  151. job.download_to_file(job.id)
  152. #to test download archive
  153. def test_download_archive(self, archive_id):
  154. print '''\n\n\n
  155. ########################################################
  156. Submit the archive job and download the job to local
  157. #########################################################'''
  158. vault = Vault.create_vault(self.api, self.vault_name)
  159. job = vault.retrieve_archive(archive_id)
  160. job.update_status()
  161. if not job.completed:
  162. job.download_to_file(file_path = job.id, block = True)
  163. #test delete archive
  164. def test_delete_archive(self, archive_id):
  165. print '''\n\n\n
  166. ########################################################
  167. Delete the archive
  168. #########################################################'''
  169. vault = Vault.create_vault(self.api, self.vault_name)
  170. vault.delete_archive(archive_id)
  171. def test_pull_from_oss(self):
  172. print '''\n\n\n
  173. ########################################################
  174. submit a pull-from-oss job and OAS will finish the pull of OSS object to OAS
  175. #########################################################'''
  176. vault = Vault.create_vault(self.api, self.vault_name)
  177. # create vault
  178. job = vault.pull_from_oss(conf.osshost, conf.bucket, conf.object, "test desc")
  179. job._check_status(block=True)
  180. # delete archive
  181. print "bucket:%s object:%s finish pull from oss,\n archiveId:%s" % (conf.bucket, conf.object, job.archive_id)
  182. def test_push_to_oss(self):
  183. print '''\n\n\n
  184. ########################################################
  185. submit a push-to-oss job and OAS will finish the push of OAS archive to OSS
  186. #########################################################'''
  187. vault = Vault.create_vault(self.api, self.vault_name)
  188. archive_id = vault.upload_archive(self.file_big)
  189. job = vault.push_to_oss(archive_id, conf.osshost, conf.bucket, archive_id, "test desc")
  190. job._check_status(block=True)
  191. print archive_id + " finish push to oss"
  192. if __name__ == '__main__':
  193. t = TestDemo()
  194. t.test_single_archive_upload()
  195. t.test_multi_upload();
  196. archive_id = t.test_uploader();
  197. t.test_vault_retrieve();
  198. t.test_download_archive(archive_id)
  199. t.test_delete_archive(archive_id)
  200. t.test_pull_from_oss()
  201. t.test_push_to_oss()