全部产品

批量删除作业

更新时间:2020-04-28 14:23:02

1 背景

由于历史原因,提交了大量的批量计算 DAG 作业,但是作业没有及时清理;或者单次提交了大量的作业,需要做清理操作。目前批量计算控制台支持单页批量删除,但若存在几十页作业的情况,清理动作也比较麻烦。因此提供了一个批量清理批量计算作业的工作。

2 工具

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import multiprocessing
  4. import threading
  5. import sys
  6. PY2 = sys.version_info[0] == 2
  7. if PY2:
  8. import Queue as bcQueue
  9. else:
  10. import queue as bcQueue
  11. import json
  12. import time
  13. from batchcompute import Client, ClientError
  14. import argparse
  15. from functools import wraps
  16. def retryWrapper(func):
  17. @wraps(func)
  18. def wrapper(*args,**kwargs):
  19. index = 0
  20. while True:
  21. try:
  22. res = func(*args,**kwargs)
  23. break
  24. except ClientError as e:
  25. status = e.get_status_code()
  26. if status >= 400 and status < 500:
  27. raise Exception(str(e))
  28. if status >= 500 and status < 600:
  29. if index > 6:
  30. raise Exception(str(e))
  31. else:
  32. time.sleep(0.5 * pow(2,index))
  33. index += 1
  34. except Exception as e:
  35. if index > 6:
  36. raise Exception(str(e))
  37. else:
  38. time.sleep(0.5 * pow(2,index))
  39. index += 1
  40. return res
  41. return wrapper
  42. class ShowProcess():
  43. """
  44. 显示处理进度的类
  45. 调用该类相关函数即可实现处理进度的显示
  46. """
  47. i = 0 # 当前的处理进度
  48. max_steps = 0 # 总共需要处理的次数
  49. max_arrow = 50 #进度条的长度
  50. infoDone = 'done'
  51. # 初始化函数,需要知道总共的处理次数
  52. def __init__(self, max_steps, infoDone = 'Done'):
  53. self.max_steps = max_steps
  54. self.i = 0
  55. self.infoDone = infoDone
  56. # 显示函数,根据当前的处理进度i显示进度
  57. # 效果为[>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>]100.00%
  58. def show_process(self, i=None):
  59. if i is not None:
  60. self.i = i
  61. else:
  62. self.i += 1
  63. num_arrow = int(self.i * self.max_arrow / self.max_steps) #计算显示多少个'>'
  64. num_line = self.max_arrow - num_arrow #计算显示多少个'-'
  65. percent = self.i * 100.0 / self.max_steps #计算完成进度,格式为xx.xx%
  66. process_bar = '[' + '>' * num_arrow + '-' * num_line + ']'\
  67. + '%.2f' % percent + '%' + '\r' #带输出的字符串,'\r'表示不换行回到最左边
  68. sys.stdout.write(process_bar) #这两句打印字符到终端
  69. sys.stdout.flush()
  70. if self.i >= self.max_steps:
  71. self.close()
  72. def close(self):
  73. print('')
  74. print(self.infoDone)
  75. self.i = 0
  76. class DeleteJob(threading.Thread):
  77. def __init__(self, client, jobQueue, error_queue):
  78. threading.Thread.__init__(self)
  79. self.jobQueue = jobQueue
  80. self.error_queue = error_queue
  81. self.client = client
  82. @retryWrapper
  83. def delteSpecJob(self, jobId):
  84. self.client.delete_job(jobId)
  85. def run(self):
  86. while True:
  87. try:
  88. jobId = self.jobQueue.get(block=False)
  89. self.delteSpecJob(jobId)
  90. self.jobQueue.task_done()
  91. except bcQueue.Empty:
  92. break
  93. except Exception as e:
  94. self.jobQueue.task_done()
  95. self.error_queue.put('Delte Job(%s) exception %s' % (jobId, str(e)))
  96. class DeleteJobs:
  97. def __init__(self, client, days):
  98. self.client = client
  99. self.days = days
  100. @retryWrapper
  101. def listSpecIdxJob(self, marker, max_item):
  102. response = self.client.list_jobs(marker, max_item)
  103. return response
  104. def listAndDeleteJobs(self):
  105. marker = ""
  106. max_item = 100
  107. index = 0
  108. jobQueue = bcQueue.Queue(0)
  109. error_queue = bcQueue.Queue(0)
  110. print("Begin List Spec Jobs...")
  111. while marker or index == 0:
  112. response = self.listSpecIdxJob(marker, max_item)
  113. marker = response.NextMarker
  114. for job in response.Items:
  115. if job.State == "Running" or job.State == "Waiting":
  116. continue
  117. if self.days == 0:
  118. jobQueue.put(job.Id)
  119. else:
  120. createTime = job.CreationTime.strftime("%Y-%m-%d %H:%M:%S.%f")
  121. timestap = int(time.mktime(time.strptime(createTime, "%Y-%m-%d %H:%M:%S.%f")))
  122. detaTime = self.days * 3600 * 24
  123. if int(time.time()) - timestap >= detaTime:
  124. jobQueue.put(job.Id)
  125. # print "jobId: %s" % job.Id
  126. index += 1
  127. print("List Spec Jobs Finish...")
  128. threadnum = multiprocessing.cpu_count()
  129. size = jobQueue.qsize()
  130. if size < threadnum:
  131. threadnum = size
  132. thread_pool = []
  133. for threadnum in range(threadnum):
  134. current = DeleteJob(self.client, jobQueue, error_queue)
  135. thread_pool.append(current)
  136. current.start()
  137. print("Begin Delete Jobs...")
  138. process_bar = ShowProcess(size, 'OK')
  139. totalSize = size
  140. while size != 0:
  141. size = jobQueue.qsize()
  142. process_bar.show_process(totalSize - size)
  143. time.sleep(0.5)
  144. jobQueue.join()
  145. for thread in thread_pool:
  146. thread.join()
  147. errs = []
  148. while True:
  149. try:
  150. error = error_queue.get(block=False)
  151. errs.append(error)
  152. except bcQueue.Empty:
  153. break
  154. return errs
  155. if __name__ == "__main__":
  156. parser = argparse.ArgumentParser(
  157. formatter_class = argparse.ArgumentDefaultsHelpFormatter,
  158. description = 'python scripyt for Delete Batchcompute jobs',
  159. usage='deleteJobs.py <positional argument> [<args>]',
  160. )
  161. region_surpport = ["cn-beijing", "cn-zhangjiakou", "cn-hangzhou", "cn-huhehaote", "cn-shanghai", "cn-shenzhen"]
  162. parser.add_argument('--region', action='store', type = str, choices=region_surpport, help = 'the region info.')
  163. parser.add_argument('--id', action='store', type = str, required=True, help = 'keyid.')
  164. parser.add_argument('--key', action='store', type = str, required=True, help = 'key secrete.')
  165. parser.add_argument('--day', action='store', type=int, required=True, help = 'delete the jobs which creating time before the spec day.')
  166. args = parser.parse_args()
  167. BatchHost = "batchcompute.%s.aliyuncs.com" % args.region
  168. client = Client(BatchHost, args.id, args.key)
  169. deleteJobs = DeleteJobs(client, args.day)
  170. err = deleteJobs.listAndDeleteJobs()
  171. if len(err) != 0 :
  172. a = ""
  173. print("\n".join(err))
  174. else:
  175. print("delete Jobs success!!!")

3 执行删除动作

3.1 准备工作

复制脚本到执行机器,执行机器 必须安装批量计算的SDK:

  1. pip install batchcompute

若执行机器已经安装批量计算的 SDK ,则建议更新到最新版本

  1. pip install --upgrade batchcompute

准备好阿里云 AK 信息,且该 AK 已经开通批量计算相关权限

执行机器安装好 Python,建议 Python 2.7及以上

3.2 执行脚本

  1. python delete.py --region cn-beijing --id xxxx --key xxx --day 10
  • id 表示 AK的 ID 信息
  • key 表示 AK的 KEY 信息
  • day 表示保留最近 N 天的意思,
  • 示例表示:删除北京 region 10天前创建的非等待和运行状态的作业

3.3 执行结果

deletejobs