1 背景
由于历史原因,提交了大量的批量计算 DAG 作业,但是作业没有及时清理;或者单次提交了大量的作业,需要做清理操作。目前批量计算控制台支持单页批量删除,但若存在几十页作业的情况,清理动作也比较麻烦。因此提供了一个批量清理批量计算作业的工作。
2 工具
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing
import threading
import sys
PY2 = sys.version_info[0] == 2
if PY2:
import Queue as bcQueue
else:
import queue as bcQueue
import json
import time
from batchcompute import Client, ClientError
import argparse
from functools import wraps
def retryWrapper(func):
@wraps(func)
def wrapper(*args,**kwargs):
index = 0
while True:
try:
res = func(*args,**kwargs)
break
except ClientError as e:
status = e.get_status_code()
if status >= 400 and status < 500:
raise Exception(str(e))
if status >= 500 and status < 600:
if index > 6:
raise Exception(str(e))
else:
time.sleep(0.5 * pow(2,index))
index += 1
except Exception as e:
if index > 6:
raise Exception(str(e))
else:
time.sleep(0.5 * pow(2,index))
index += 1
return res
return wrapper
class ShowProcess():
"""
显示处理进度的类
调用该类相关函数即可实现处理进度的显示
"""
i = 0 # 当前的处理进度
max_steps = 0 # 总共需要处理的次数
max_arrow = 50 #进度条的长度
infoDone = 'done'
# 初始化函数,需要知道总共的处理次数
def __init__(self, max_steps, infoDone = 'Done'):
self.max_steps = max_steps
self.i = 0
self.infoDone = infoDone
# 显示函数,根据当前的处理进度i显示进度
# 效果为[>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>]100.00%
def show_process(self, i=None):
if i is not None:
self.i = i
else:
self.i += 1
num_arrow = int(self.i * self.max_arrow / self.max_steps) #计算显示多少个'>'
num_line = self.max_arrow - num_arrow #计算显示多少个'-'
percent = self.i * 100.0 / self.max_steps #计算完成进度,格式为xx.xx%
process_bar = '[' + '>' * num_arrow + '-' * num_line + ']'\
+ '%.2f' % percent + '%' + '\r' #带输出的字符串,'\r'表示不换行回到最左边
sys.stdout.write(process_bar) #这两句打印字符到终端
sys.stdout.flush()
if self.i >= self.max_steps:
self.close()
def close(self):
print('')
print(self.infoDone)
self.i = 0
class DeleteJob(threading.Thread):
def __init__(self, client, jobQueue, error_queue):
threading.Thread.__init__(self)
self.jobQueue = jobQueue
self.error_queue = error_queue
self.client = client
@retryWrapper
def delteSpecJob(self, jobId):
self.client.delete_job(jobId)
def run(self):
while True:
try:
jobId = self.jobQueue.get(block=False)
self.delteSpecJob(jobId)
self.jobQueue.task_done()
except bcQueue.Empty:
break
except Exception as e:
self.jobQueue.task_done()
self.error_queue.put('Delte Job(%s) exception %s' % (jobId, str(e)))
class DeleteJobs:
def __init__(self, client, days):
self.client = client
self.days = days
@retryWrapper
def listSpecIdxJob(self, marker, max_item):
response = self.client.list_jobs(marker, max_item)
return response
def listAndDeleteJobs(self):
marker = ""
max_item = 100
index = 0
jobQueue = bcQueue.Queue(0)
error_queue = bcQueue.Queue(0)
print("Begin List Spec Jobs...")
while marker or index == 0:
response = self.listSpecIdxJob(marker, max_item)
marker = response.NextMarker
for job in response.Items:
if job.State == "Running" or job.State == "Waiting":
continue
if self.days == 0:
jobQueue.put(job.Id)
else:
createTime = job.CreationTime.strftime("%Y-%m-%d %H:%M:%S.%f")
timestap = int(time.mktime(time.strptime(createTime, "%Y-%m-%d %H:%M:%S.%f")))
detaTime = self.days * 3600 * 24
if int(time.time()) - timestap >= detaTime:
jobQueue.put(job.Id)
# print "jobId: %s" % job.Id
index += 1
print("List Spec Jobs Finish...")
threadnum = multiprocessing.cpu_count()
size = jobQueue.qsize()
if size < threadnum:
threadnum = size
thread_pool = []
for threadnum in range(threadnum):
current = DeleteJob(self.client, jobQueue, error_queue)
thread_pool.append(current)
current.start()
print("Begin Delete Jobs...")
process_bar = ShowProcess(size, 'OK')
totalSize = size
while size != 0:
size = jobQueue.qsize()
process_bar.show_process(totalSize - size)
time.sleep(0.5)
jobQueue.join()
for thread in thread_pool:
thread.join()
errs = []
while True:
try:
error = error_queue.get(block=False)
errs.append(error)
except bcQueue.Empty:
break
return errs
if __name__ == "__main__":
parser = argparse.ArgumentParser(
formatter_class = argparse.ArgumentDefaultsHelpFormatter,
description = 'python scripyt for Delete Batchcompute jobs',
usage='deleteJobs.py <positional argument> [<args>]',
)
region_surpport = ["cn-beijing", "cn-zhangjiakou", "cn-hangzhou", "cn-huhehaote", "cn-shanghai", "cn-shenzhen"]
parser.add_argument('--region', action='store', type = str, choices=region_surpport, help = 'the region info.')
parser.add_argument('--id', action='store', type = str, required=True, help = 'keyid.')
parser.add_argument('--key', action='store', type = str, required=True, help = 'key secrete.')
parser.add_argument('--day', action='store', type=int, required=True, help = 'delete the jobs which creating time before the spec day.')
args = parser.parse_args()
BatchHost = "batchcompute.%s.aliyuncs.com" % args.region
client = Client(BatchHost, args.id, args.key)
deleteJobs = DeleteJobs(client, args.day)
err = deleteJobs.listAndDeleteJobs()
if len(err) != 0 :
a = ""
print("\n".join(err))
else:
print("delete Jobs success!!!")
3 执行删除动作
3.1 准备工作
复制脚本到执行机器,执行机器 必须安装批量计算的SDK:
pip install batchcompute
若执行机器已经安装批量计算的 SDK ,则建议更新到最新版本。
pip install --upgrade batchcompute
准备好阿里云 AK 信息,且该 AK 已经开通批量计算相关权限。
执行机器安装好 Python,建议 Python 2.7及以上。
3.2 执行脚本
python delete.py --region cn-beijing --id xxxx --key xxx --day 10
id 表示 AK的 ID 信息
key 表示 AK的 KEY 信息
day 表示保留最近 N 天的意思
以上的示例表示:删除北京 region 10天前创建的非等待和运行状态的作业。
3.3 执行结果
文档内容是否对您有帮助?