使用自动化脚本刷新和预热

阿里云CDN为您提供刷新预热自动化脚本,可以帮助您分批进行刷新或预热任务,对文件或目录快速进行刷新和预热,替代手动分批提交的繁琐操作。本文介绍Python自动化脚本的使用说明,并以Windows系统示例为您说明。

功能简介

当您指定刷新或预热URL列表文件后,脚本按照指定的并发刷新或预热数量对URL列表文件进行切割,分批进行刷新或预热。任务运行后会自动进行判断,等待上一个任务完成,脚本自动进行下一个刷新或预热任务的操作。具体的操作逻辑如下:

  1. 分批处理:假设您的URL列表中有100URL,同时您设定了每批次最多处理10URL,那么脚本会将URL列表切割成10个小批次,每个批次包含10URL。而如果设定的并发数量更大或更小,批次的大小会相应调整。例如设定的并发数量是20,那么脚本会将100URL分成5个批次,每个批次包含20URL。

  2. 按批次运行任务:脚本在启动时会按照批次依次提交刷新或预热请求。每个批次的任务是并发执行的。

  3. 等待完成后进行下一批任务:当一个批次的刷新或预热任务完成后,脚本会继续执行下一个批次的任务。这个判断和操作是自动进行的,不需要人工干预。

适用场景

如果您有以下情况,建议您使用刷新预热自动化脚本:

  • 无开发人员,需手动提交刷新预热任务,运维成本高。

  • 刷新或预热URL过多,分批提交导致刷新或预热效率低。

  • 需要人工或程序判断刷新预热任务是否正常进行,费时费力。

使用限制

请确保操作系统的Python版本为3.x版本。您可以通过在命令行输入python --versionpython3 --version来检查Python版本是否符合要求。

前提条件

  1. 由于阿里云账号(主账号)拥有资源的所有权限,其AccessKey一旦泄露风险巨大,所以建议您使用RAM用户的AccessKey。获取方法请参见创建AccessKey

  2. RAM用户授予操作域名资源的权限。本示例选择AliyunDomainFullAccess系统策略。

    1. 使用系统策略。

      • AliyunCDNFullAccess:管理CDN资源的权限。

    2. 使用自定义权限。

      关于如何创建自定义权限,请参见创建自定义权限策略授权信息

步骤一:安装依赖

执行以下命令安装Python CDN SDK模块包,目前使用版本为v20180510。

pip install alibabacloud_cdn20180510

步骤二:准备URL文件

创建一个包含需要刷新或预热的URL列表的文件。例如:urllist.txt,每行一个URL。请确保每个URLhttp://https://开头,并且是合法的URL格式。内容示例如下:

http://example.com/file1.jpg
http://example.com/file2.jpg
http://example.com/file3.jpg
...
http://example.com/fileN.jpg

步骤三:创建脚本

将如下代码保存为自动化脚本,并命名为Refresh.py。您可以自定义脚本名称,此处为举例说明。

脚本示例代码

#!/usr/bin/env python3
# coding=utf-8
# __author__ = 'aliyun.cdn'
# __date__ = '2025-08-15'

# SDK安装命令 pip install alibabacloud_cdn20180510

'''Check Package'''
# 导入所需库
import re, sys, getopt, time, logging, os

try:
    from alibabacloud_cdn20180510.client import Client as Cdn20180510Client
    from alibabacloud_credentials.models import Config as CreConfig
    from alibabacloud_credentials.client import Client as CredentialClient
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_cdn20180510 import models as cdn_20180510_models
    from alibabacloud_tea_util import models as util_models

# 捕获导入异常
except ImportError as e:
    sys.exit(f"[error] Please pip install alibabacloud_cdn20180510. Details: {e}")

# 初始化日志记录
logging.basicConfig(level=logging.DEBUG, filename='./RefreshAndPredload.log')

# 定义全局变量类,存储AK、SK、FD等信息
class Envariable(object):
    LISTS = []
    # Endpoint 请参考 https://api.aliyun.com/product/Cdn
    ENDPOINT = 'cdn.aliyuncs.com'
    AK = None
    SK = None
    FD = None
    CLI = None
    TASK_TYPE = None
    TASK_AREA = None
    TASK_OTYPE = None

    # 设置AK
    @staticmethod
    def set_ak():
        Envariable.AK = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID')

    # 获取AK
    @staticmethod
    def get_ak():
        return Envariable.AK

    # 设置SK
    @staticmethod
    def set_sk():
        Envariable.SK = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET')

    # 获取SK
    @staticmethod
    def get_sk():
        return Envariable.SK

    # 设置FD
    @staticmethod
    def set_fd(fd):
        Envariable.FD = fd

    # 获取FD
    @staticmethod
    def get_fd():
        return Envariable.FD

    # 设置任务类型
    @staticmethod
    def set_task_type(task_type):
        Envariable.TASK_TYPE = task_type

    # 获取任务类型
    @staticmethod
    def get_task_type():
        return Envariable.TASK_TYPE
    
    # 设置任务区域
    @staticmethod
    def set_task_area(task_area):
        Envariable.TASK_AREA = task_area

    # 获取任务区域
    @staticmethod
    def get_task_area():
        return Envariable.TASK_AREA

    # 设置任务对象类型
    @staticmethod
    def set_task_otype(task_otype):
        Envariable.TASK_OTYPE = task_otype

    # 获取任务对象类型
    @staticmethod
    def get_task_otype():
        return Envariable.TASK_OTYPE

    # 创建新的Client
    @staticmethod
    def set_acs_client():
        try:
            # 使用AK 初始化Credentials Client。
            credentialsConfig = CreConfig(
                # 凭证类型。
                type='access_key',
                # 设置为AccessKey ID值。
                access_key_id=Envariable.get_ak(),
                # 设置为AccessKey Secret值。
                access_key_secret=Envariable.get_sk(),
            )
            credentialClient = CredentialClient(credentialsConfig)

            cdnConfig = Config(credential=credentialClient)
            # 配置云产品服务接入地址(endpoint)。
            cdnConfig.endpoint = Envariable.ENDPOINT
            # 初始化cdn Client。
            Envariable.CLI = Cdn20180510Client(cdnConfig)
        except Exception as e:
            logging.error(f"Failed to create client: {e}")
            raise

    # 获取Client
    @staticmethod
    def get_acs_client():
        return Envariable.CLI


# 模块级别的初始化函数
def initialize_credentials_and_client():
    """在模块加载时初始化AK、SK和Client"""
    try:
        # 从环境变量初始化AK和SK
        Envariable.set_ak()
        Envariable.set_sk()
        
        # 检查AK和SK是否成功获取
        if not Envariable.get_ak() or not Envariable.get_sk():
            logging.warning("AK or SK not found in environment variables")
            return False
            
        # 初始化Client
        Envariable.set_acs_client()
        logging.info("Credentials and client initialized successfully")
        return True
    except Exception as e:
        logging.error(f"Failed to initialize credentials and client: {e}")
        return False


# 在模块加载时执行初始化
_initialization_success = initialize_credentials_and_client()





class BaseCheck(object):
    def __init__(self):
        self.invalidurl = ''
        self.lines = 0
        self.urllist = Envariable.get_fd()

    # 检查配额
    def printQuota(self):
        try:
            client = Envariable.get_acs_client()
            if not client:
                raise Exception("CDN client not initialized")
            
            # 使用SDK调用方式
            request = cdn_20180510_models.DescribeRefreshQuotaRequest()
            runtime = util_models.RuntimeOptions()
            response = client.describe_refresh_quota_with_options(request, runtime)
            quotaResp = response.body.to_map()
        except Exception as e:
            logging.error(f"\n[error]: initial Cdn20180510Client failed: {e}\n")
            sys.exit(1)

        if Envariable.TASK_TYPE:
            if Envariable.TASK_TYPE == 'push':
                if self.lines > int(quotaResp['PreloadRemain']):
                    sys.exit("\n[error]:PreloadRemain is not enough {0}".format(quotaResp['PreloadRemain']))
                return True
            if Envariable.TASK_TYPE == 'clear':
                if Envariable.get_task_otype() == 'File' and self.lines > int(quotaResp['UrlRemain']):
                    sys.exit("\n[error]:UrlRemain is not enough {0}".format(quotaResp['UrlRemain']))
                elif Envariable.get_task_otype() == 'Directory' and self.lines > int(quotaResp['DirRemain']):
                    sys.exit("\n[error]:DirRemain is not enough {0}".format(quotaResp['DirRemain']))
                else:
                    return True

    # 验证URL格式
    def urlFormat(self):
        try:
            with open(self.urllist, "r") as f:
                for line in f.readlines():
                    self.lines += 1
                    if not re.match(r'^((https)|(http))', line):
                        self.invalidurl = line + '\n' + self.invalidurl
                if self.invalidurl != '':
                    sys.exit("\n[error]: URL format is illegal \n{0}".format(self.invalidurl))
                return True
        except FileNotFoundError:
            sys.exit(f"\n[error]: File not found: {self.urllist}\n")
        except Exception as e:
            sys.exit(f"\n[error]: Failed to read file {self.urllist}: {e}\n")

# 批量处理类,将URL列表按指定数量分成多个批次
class doTask(object):
    @staticmethod
    def urlencode_pl(inputs_str):
        len_str = len(inputs_str)
        if inputs_str == "" or len_str <= 0:
            return ""
        result_end = ""
        for chs in inputs_str:
            if chs.isalnum() or chs in {":", "/", ".", "-", "_", "*"}:
                result_end += chs
            elif chs == ' ':
                result_end += '+'
            else:
                result_end += f'%{ord(chs):02X}'
        return result_end

    # 分批处理URL
    @staticmethod
    def doProd():
        gop = 20  # 这里定义了每个批次的最大URL数量
        mins = 1
        maxs = gop
        current_batch = []  # 使用局部变量而不是全局变量
        
        try:
            with open(Envariable.get_fd(), "r") as f:
                for line in f.readlines():
                    line = doTask.urlencode_pl(line.strip()) + "\n"
                    current_batch.append(line)
                    if mins >= maxs:
                        yield current_batch
                        current_batch = []
                        mins = 1
                    else:
                        mins += 1
            if current_batch:
                yield current_batch
        except FileNotFoundError:
            sys.exit(f"\n[error]: File not found: {Envariable.get_fd()}\n")
        except Exception as e:
            sys.exit(f"\n[error]: Failed to read file {Envariable.get_fd()}: {e}\n")

    # 执行刷新或预热任务
    @staticmethod
    def doRefresh(lists):
        try:
            client = Envariable.get_acs_client()
            if not client:
                raise Exception("CDN client not initialized")

            runtime = util_models.RuntimeOptions()
            taskID = None
            response_data = None

            if Envariable.get_task_type() == 'clear':
                taskID = 'RefreshTaskId'
                request = cdn_20180510_models.RefreshObjectCachesRequest()
                if Envariable.get_task_otype():
                    request.object_type = Envariable.get_task_otype()
                request.object_path = lists
                response = client.refresh_object_caches_with_options(request, runtime)
                response_data = response.body.to_map()
            elif Envariable.get_task_type() == 'push':
                taskID = 'PushTaskId'
                request = cdn_20180510_models.PushObjectCacheRequest()
                if Envariable.get_task_area():
                    request.area = Envariable.get_task_area()
                request.object_path = lists
                response = client.push_object_cache_with_options(request, runtime)
                response_data = response.body.to_map()

            if response_data and taskID:
                print(response_data)

                timeout = 0
                while True:
                    count = 0
                    # 使用SDK调用方式查询任务状态
                    taskreq = cdn_20180510_models.DescribeRefreshTasksRequest()
                    taskreq.task_id = response_data[taskID]
                    taskresp = client.describe_refresh_tasks_with_options(taskreq, runtime)
                    taskresp_data = taskresp.body.to_map()
                    print(f"[{response_data[taskID]}] is doing... ...")
                    
                    for t in taskresp_data['Tasks']['CDNTask']:
                        if t['Status'] != 'Complete':
                            count += 1
                    if count == 0:
                        logging.info(f"[{response_data[taskID]}] is finish")
                        break
                    elif timeout > 5:  # 最多等待50秒 (5 * 10秒)
                        logging.info(f"[{response_data[taskID]}] timeout after 50 seconds")
                        break
                    else:
                        timeout += 1
                        time.sleep(10)  # 每10秒检查一次状态
                        continue
        except Exception as e:
            logging.error(f"\n[error]:{e}")
            sys.exit(1)


class Refresh(object):
    def main(self, argv):
        if len(argv) < 1:
            sys.exit(f"\n[usage]: {sys.argv[0]} -h ")
        try:
            opts, args = getopt.getopt(argv, "hr:t:a:o:")
        except getopt.GetoptError as e:
            sys.exit(f"\n[usage]: {sys.argv[0]} -h ")

        for opt, arg in opts:
            if opt == '-h':
                self.help()
                sys.exit()
            elif opt == '-r':
                Envariable.set_fd(arg)
            elif opt == '-t':
                Envariable.set_task_type(arg)
            elif opt == '-a':
                Envariable.set_task_area(arg)
            elif opt == '-o':
                Envariable.set_task_otype(arg)
            else:
                sys.exit(f"\n[usage]: {sys.argv[0]} -h ")
        
        # 只有在非帮助命令时才检查初始化状态
        if not _initialization_success:
            sys.exit("\n[error]: Failed to initialize credentials and client. Please check environment variables.\n")

        try:
            if not (Envariable.get_ak() and Envariable.get_sk() and Envariable.get_fd() and Envariable.get_task_type()):
                sys.exit("\n[error]: Must set environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET, and parameters '-r', '-t'\n")
            if Envariable.get_task_type() not in {"push", "clear"}:
                sys.exit("\n[error]: taskType Error, '-t' option in 'push' or 'clear'\n")
            if Envariable.get_task_area() and Envariable.get_task_otype():
                sys.exit("\n[error]: -a and -o cannot exist at same time\n")
            if Envariable.get_task_area():
                if Envariable.get_task_area() not in {"domestic", "overseas"}:
                    sys.exit("\n[error]: Area value Error, '-a' option in 'domestic' or 'overseas'\n")
            if Envariable.get_task_otype():
                if Envariable.get_task_otype() not in {"File", "Directory"}:
                    sys.exit("\n[error]: ObjectType value Error, '-a' options in 'File' or 'Directory'\n")
                if Envariable.get_task_type() == 'push':
                    sys.exit("\n[error]: -t must be clear and 'push' -a use together\n")
        except Exception as e:
            logging.error(f"\n[error]: Parameter {e} error\n")
            sys.exit(1)

        handler = BaseCheck()
        if handler.urlFormat() and handler.printQuota():
            for g in doTask.doProd():
                doTask.doRefresh(''.join(g))
                time.sleep(1)

    def help(self):
        print("\nscript options explain: \
                    \n\t -r <filename>                   filename指“文件所在的路径+文件名称”,自动化脚本运行后将会读取文件内记录的URL;文件内的URL记录方式为每行一条URL,有特殊字符先做URLencode,以http或https开头; \
                    \n\t -t <taskType>                   任务类型,clear:刷新,push:预热; \
                    \n\t -a [String,<domestic|overseas>] 可选项,预热范围,不传默认是全球;\
                    \n\t    domestic                     仅中国内地; \
                    \n\t    overseas                     全球(不包含中国内地); \
                    \n\t -o [String,<File|Directory>]    可选项,刷新的类型; \
                    \n\t    File                         文件刷新(默认值); \
                    \n\t    Directory                    目录刷新")


if __name__ == '__main__':
    fun = Refresh()
    fun.main(sys.argv[1:])

代码执行流程

  1. gop指定的数量(100个)将文件拆分成多个批次。

  2. 顺序处理每个批次的URL。

  3. 等待当前批次任务完成后,再执行下一个批次。

说明

您可以通过调整gop变量调整每个批次的大小。

查看帮助信息

脚本创建完成后,您可以在命令行(CMD,PowerShell或终端)中运行python $script -h,用于请求并显示Python脚本的命令行帮助信息。

说明

$script通常是指一个变量,这个变量是Python脚本的文件名。例如,如果您的脚本文件名是Refresh.py,您可以运行python Refresh.py -h

在命令行(CMD,PowerShell或终端)运行以下命令,脚本会显示帮助信息,告诉您如何正确使用该脚本及其所有参数。

python Refresh.py -h

运行命令后可能会输出以下内容:

script options explain:
              -r <filename>                    //filename指“文件所在的路径+文件名称”,自动化脚本运行后将会读取文件内记录的URL;文件内的URL记录方式为每行一条URL,有特殊字符先做URLencode,以http或https开头;
              -t <taskType>                    //任务类型,clear:刷新,push:预热;
              -a [String,<domestic|overseas>   //可选项,预热范围,不传默认是全球;            
                   domestic                    //仅中国内地;             
                   overseas                    //全球(不包含中国内地);             
              -o [String,<File|Directory>]     //可选项,刷新的类型;             
                   File                        //文件刷新(默认值);             
                   Directory                   //目录刷新;

步骤四:在环境变量中设置阿里云AccessKey

使用AccessKey调用API时,不推荐在代码中明文传入阿里云AccessKey。步骤三的脚本代码会从环境变量中来读取AccessKey IDAccessKey Secret,请参考Linux、macOSWindows系统配置环境变量,在执行代码前配置阿里云AccessKey。

重要

LinuxmacOS系统中,使用export命令配置的阿里云AccessKey仅当前会话有效,当会话退出之后所设置的阿里云AccessKey将会丢失。

若需长期保留阿里云AccessKey,可将export命令配置到对应操作系统的启动配置文件中。

步骤五:运行脚本

在命令行(CMD,PowerShell或终端)使用以下命令行运行脚本:

python Refresh.py -r <PathToUrlFile> -t <TaskType>
说明

<PathToUrlFile>:包含URL列表的文件路径,如urllist.txt

<TaskType>:任务类型,clear(刷新)或push(预热)。

资源刷新命令

  • 假设URL文件为urllist.txt,且文件和Refresh.py脚本在相同目录下,任务类型为clear(刷新),在命令行(CMD,PowerShell或终端)执行以下命令。

    python Refresh.py -r urllist.txt -t clear
  • 如果文件在不同目录,例如D:\example\filename\urllist.txt,在命令行(CMD,PowerShell或终端)执行以下命令。

    python Refresh.py -r D:\example\filename\urllist.txt -t clear

运行示例如下:

python Refresh.py -r urllist.txt -t clear
{'RequestId': 'C1686DCA-F3B5-5575-ADD1-05F96617D770', 'RefreshTaskId': '18392588710'}
[18392588710] is doing... ...

如果出现Failed to initialize credentials and client. Please check environment variables.,请先按步骤四在环境变量中配置阿里云AccessKey,然后在同一个终端窗口中执行命令。

资源预热命令

  • 假设URL文件为urllist.txt,且文件和Refresh.py脚本在相同目录下,任务类型为push(预热),在命令行(CMD,PowerShell或终端)执行以下命令。

    python Refresh.py -r urllist.txt -t push
  • 如果文件在不同目录,例如D:\example\filename\urllist.txt,在命令行(CMD,PowerShell或终端)执行以下命令。

    python Refresh.py -r D:\example\filename\urllist.txt -t push

运行示例如下:

python Refresh.py -r urllist.txt -t push
{'RequestId': 'C1686DCA-F3B5-5575-ADD1-05F96617D771', 'RefreshTaskId': '18392588711'}
[18392588710] is doing... ...

如果出现Failed to initialize credentials and client. Please check environment variables.,请先按步骤四在环境变量中配置阿里云AccessKey,然后在同一个终端窗口中执行命令。