批量删除作业

1 背景

由于历史原因,提交了大量的批量计算 DAG 作业,但是作业没有及时清理;或者单次提交了大量的作业,需要做清理操作。目前批量计算控制台支持单页批量删除,但若存在几十页作业的情况,清理动作也比较麻烦。因此提供了一个批量清理批量计算作业的工作。

2 工具

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import threading
import sys
PY2 = sys.version_info[0] == 2
if PY2:
    import Queue as bcQueue
else:
    import queue as bcQueue
import json
import time
from batchcompute import Client, ClientError
import argparse
from functools import wraps

def retryWrapper(func):
    @wraps(func)
    def wrapper(*args,**kwargs):
        index = 0
        while True:
            try:
                res = func(*args,**kwargs)
                break
            except ClientError as e:
                status = e.get_status_code()
                if status >= 400 and status < 500:
                    raise Exception(str(e))
                if status >= 500 and status < 600:
                    if index > 6:
                        raise Exception(str(e))
                    else:
                        time.sleep(0.5 * pow(2,index))
                        index += 1
            except Exception as e:
                if index > 6:
                    raise Exception(str(e))
                else:
                    time.sleep(0.5 * pow(2,index))
                    index += 1
        return res
    return wrapper

class ShowProcess():
    """
    显示处理进度的类
    调用该类相关函数即可实现处理进度的显示
    """
    i = 0 # 当前的处理进度
    max_steps = 0 # 总共需要处理的次数
    max_arrow = 50 #进度条的长度
    infoDone = 'done'

    # 初始化函数,需要知道总共的处理次数
    def __init__(self, max_steps, infoDone = 'Done'):
        self.max_steps = max_steps
        self.i = 0
        self.infoDone = infoDone

    # 显示函数,根据当前的处理进度i显示进度
    # 效果为[>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>]100.00%
    def show_process(self, i=None):
        if i is not None:
            self.i = i
        else:
            self.i += 1
        num_arrow = int(self.i * self.max_arrow / self.max_steps) #计算显示多少个'>'
        num_line = self.max_arrow - num_arrow #计算显示多少个'-'
        percent = self.i * 100.0 / self.max_steps #计算完成进度,格式为xx.xx%
        process_bar = '[' + '>' * num_arrow + '-' * num_line + ']'\
                      + '%.2f' % percent + '%' + '\r' #带输出的字符串,'\r'表示不换行回到最左边
        sys.stdout.write(process_bar) #这两句打印字符到终端
        sys.stdout.flush()
        if self.i >= self.max_steps:
            self.close()

    def close(self):
        print('')
        print(self.infoDone)
        self.i = 0

class DeleteJob(threading.Thread):
    def __init__(self, client, jobQueue, error_queue):
        threading.Thread.__init__(self)
        self.jobQueue = jobQueue
        self.error_queue = error_queue
        self.client = client

    @retryWrapper
    def delteSpecJob(self, jobId):
        self.client.delete_job(jobId)

    def run(self):
        while True:
            try:
                jobId = self.jobQueue.get(block=False)
                self.delteSpecJob(jobId)
                self.jobQueue.task_done()
            except bcQueue.Empty:
                break
            except Exception as e:
                self.jobQueue.task_done()
                self.error_queue.put('Delte Job(%s) exception %s' % (jobId, str(e)))

class DeleteJobs:
    def __init__(self, client, days):
        self.client = client
        self.days = days

    @retryWrapper
    def listSpecIdxJob(self, marker, max_item):
        response = self.client.list_jobs(marker, max_item)
        return response

    def listAndDeleteJobs(self):
        marker = ""
        max_item = 100
        index = 0
        jobQueue = bcQueue.Queue(0)
        error_queue = bcQueue.Queue(0)

        print("Begin List Spec Jobs...")
        while marker or index == 0:
            response = self.listSpecIdxJob(marker, max_item)
            marker = response.NextMarker
            for job in response.Items:
                if job.State == "Running" or job.State == "Waiting":
                    continue
                if self.days == 0:
                    jobQueue.put(job.Id)
                else:
                    createTime = job.CreationTime.strftime("%Y-%m-%d %H:%M:%S.%f")
                    timestap = int(time.mktime(time.strptime(createTime, "%Y-%m-%d %H:%M:%S.%f")))
                    detaTime = self.days * 3600 * 24
                    if int(time.time()) - timestap >= detaTime:
                        jobQueue.put(job.Id)
                        # print "jobId: %s" % job.Id
            index += 1

        print("List Spec Jobs Finish...")
        threadnum = multiprocessing.cpu_count()
        size = jobQueue.qsize()
        if size < threadnum:
            threadnum = size

        thread_pool = []
        for threadnum in range(threadnum):
            current = DeleteJob(self.client, jobQueue, error_queue)
            thread_pool.append(current)
            current.start()
        print("Begin Delete Jobs...")
        process_bar = ShowProcess(size, 'OK')
        totalSize = size
        while size != 0:
            size = jobQueue.qsize()
            process_bar.show_process(totalSize - size)
            time.sleep(0.5)

        jobQueue.join()
        for thread in thread_pool:
            thread.join()

        errs = []
        while True:
            try:
                error = error_queue.get(block=False)
                errs.append(error)
            except bcQueue.Empty:
                break
        return errs

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        formatter_class = argparse.ArgumentDefaultsHelpFormatter,
        description = 'python scripyt for Delete Batchcompute jobs',
        usage='deleteJobs.py <positional argument> [<args>]',
    )
    region_surpport = ["cn-beijing", "cn-zhangjiakou", "cn-hangzhou", "cn-huhehaote", "cn-shanghai", "cn-shenzhen"]

    parser.add_argument('--region', action='store', type = str, choices=region_surpport, help = 'the region info.')
    parser.add_argument('--id', action='store', type = str, required=True, help = 'keyid.')
    parser.add_argument('--key', action='store', type = str, required=True, help = 'key secrete.')
    parser.add_argument('--day', action='store', type=int, required=True, help = 'delete the jobs which creating time before the spec day.')
    args = parser.parse_args()

    BatchHost = "batchcompute.%s.aliyuncs.com" % args.region
    client = Client(BatchHost, args.id, args.key)

    deleteJobs = DeleteJobs(client, args.day)
    err = deleteJobs.listAndDeleteJobs()
    if len(err) != 0 :
        a = ""
        print("\n".join(err))
    else:
        print("delete Jobs success!!!")

3 执行删除动作

3.1 准备工作

复制脚本到执行机器,执行机器 必须安装批量计算的SDK:

pip install batchcompute

若执行机器已经安装批量计算的 SDK ,则建议更新到最新版本。

pip install --upgrade batchcompute

准备好阿里云 AK 信息,且该 AK 已经开通批量计算相关权限。

执行机器安装好 Python,建议 Python 2.7及以上。

3.2 执行脚本

python delete.py --region cn-beijing --id xxxx --key xxx --day 10
  • id 表示 AK的 ID 信息

  • key 表示 AK的 KEY 信息

  • day 表示保留最近 N 天的意思

以上的示例表示:删除北京 region 10天前创建的非等待和运行状态的作业。

3.3 执行结果

deletejobs