通过CLI工具使用边缘脚本
阿里云为您提供边缘脚本CLI工具,可以将您本地编写的边缘脚本规则发布到模拟环境进行测试,并发布至生产环境,也可以对模拟环境和生产环境的边缘脚本规则进行查询、修改和删除。本文为您介绍边缘脚本CLI工具的使用方法。
在本地新建CLI工具脚本文件
在本地新建一个文件夹,例如命名为
cdn-api
。在文件夹内新建一个命名为
aliyun.ini
的文本文件,用于记录阿里云RAM账号的鉴权信息,文件内容如下:[Credentials] accesskeyid = $accesskeyid accesskeysecret = $accesskeysecret
在文件夹内新建一个命名为
es.py
的文本文件,用于记录CLI工具的python脚本代码,文件内容如下:#!/usr/bin/python # -*- coding:utf-8 -*- import sys, os import urllib, urllib2 import base64 import hmac import hashlib from hashlib import sha1 import time import uuid import json from optparse import OptionParser import ConfigParser import traceback import urllib2 access_key_id = ''; access_key_secret = ''; cdn_server_address = 'https://cdn.aliyuncs.com' CONFIGFILE = os.getcwd() + '/aliyun.ini' CONFIGSECTION = 'Credentials' cmdlist = ''' 1. Publish the ES rule to the simulated environment or production environment ./es.py action=push_test_env domain=<domain> rule='{"pos":"<head|foot>","pri":"0-999","rule_path":"<the es code path>","enable":"<on|off>"}' ./es.py action=push_product_env domain=<domain> rule='{"pos":"<head|foot>","pri":"0-999","rule_path":"<the es code path>","enable":"<on|off>","configid":"<configid>"}' 2. Query the ES rule in the simulated environment or production environment ./es.py action=query_test_env domain=<domain> ./es.py action=query_product_env domain=<domain> 3. Delete the ES rule in the simulated environment or production environment ./es.py action=del_test_env domain=<domain> configid=<configid> ./es.py action=del_product_env domain=<domain> configid=<configid> 4. Publish the ES rule from the simulated to production environment, or Rollback the ES rule in the simulated environment ./es.py action=publish_test_env domain=<domain> ./es.py action=rollback_test_env domain=<domain> ''' def percent_encode(str): res = urllib.quote(str.decode(sys.stdin.encoding).encode('utf8'), '') res = res.replace('+', '%20') res = res.replace('*', '%2A') res = res.replace('%7E', '~') return res def compute_signature(parameters, access_key_secret): sortedParameters = sorted(parameters.items(), key=lambda parameters: parameters[0]) canonicalizedQueryString = '' for (k,v) in sortedParameters: canonicalizedQueryString += '&' + percent_encode(k) + '=' + percent_encode(v) stringToSign = 'GET&%2F&' + percent_encode(canonicalizedQueryString[1:]) h = hmac.new(access_key_secret + "&", stringToSign, sha1) signature = base64.encodestring(h.digest()).strip() return signature def compose_url(user_params): timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) parameters = { \ 'Format' : 'JSON', \ 'Version' : '2018-05-10', \ 'AccessKeyId' : access_key_id, \ 'SignatureVersion' : '1.0', \ 'SignatureMethod' : 'HMAC-SHA1', \ 'SignatureNonce' : str(uuid.uuid1()), \ 'Timestamp' : timestamp, \ } for key in user_params.keys(): parameters[key] = user_params[key] signature = compute_signature(parameters, access_key_secret) parameters['Signature'] = signature url = cdn_server_address + "/?" + urllib.urlencode(parameters) return url def make_request(user_params, quiet=False): url = compose_url(user_params) try: req = urllib2.Request(url) r = urllib2.urlopen(req) except urllib2.HTTPError, err: print "Response Code:\n=============" print err body=err.read() body_json = json.loads(body) body_str =json.dumps(body_json,indent=4) print "\nResponse Info:\n==============" print body_str return if r.getcode() == 200: print "Response Code:\n=============\n200 OK" print "\nResponse Info:\n==============" body=r.read() body_json = json.loads(body) body_str =json.dumps(body_json,indent=4) print body_str def configure_accesskeypair(args, options): if options.accesskeyid is None or options.accesskeysecret is None: print("config miss parameters, use --id=[accesskeyid] --secret=[accesskeysecret]") sys.exit(1) config = ConfigParser.RawConfigParser() config.add_section(CONFIGSECTION) config.set(CONFIGSECTION, 'accesskeyid', options.accesskeyid) config.set(CONFIGSECTION, 'accesskeysecret', options.accesskeysecret) cfgfile = open(CONFIGFILE, 'w+') config.write(cfgfile) cfgfile.close() def setup_credentials(): config = ConfigParser.ConfigParser() try: config.read(CONFIGFILE) global access_key_id global access_key_secret access_key_id = config.get(CONFIGSECTION, 'accesskeyid') access_key_secret = config.get(CONFIGSECTION, 'accesskeysecret') except Exception, e: print traceback.format_exc() print("can't get access key pair, use config --id=[accesskeyid] --secret=[accesskeysecret] to setup") sys.exit(1) def parse_args(user_params): req_args = {} if user_params['action'] == 'push_test_env' or user_params['action'] == 'push_product_env': if not user_params.has_key('domain') or not user_params.has_key('rule'): parser.print_help() sys.exit(0) data = [] for rule in user_params['rule']: rule_cfg = { 'functionId' : 180, 'functionName' : 'edge_function', 'functionArgs' : [] } for k in rule: arg_cfg = {} if k == 'configid': rule_cfg['configId'] = int(rule[k]) elif k == 'rule_path': try: f = open(rule[k], "r") code = f.read() except IOError: print "io error" sys.exit(0) else: f.close() arg_cfg['argName'] = 'rule' arg_cfg['argValue'] = code rule_cfg['functionArgs'].append(arg_cfg) else: arg_cfg['argName'] = k arg_cfg['argValue'] = rule[k] rule_cfg['functionArgs'].append(arg_cfg) data.append(rule_cfg) rule_str = json.dumps(data) if user_params['action'] == 'push_test_env': req_args = {'Action':'SetCdnDomainStagingConfig', 'DomainName':user_params['domain'], 'Functions':rule_str} else: req_args = {'Action':'BatchSetCdnDomainConfig', 'DomainNames':user_params['domain'], 'Functions':rule_str} elif user_params['action'] == 'query_test_env': if not user_params.has_key('domain'): parser.print_help() sys.exit(0) req_args = {'Action':'DescribeCdnDomainStagingConfig', 'DomainName':user_params['domain'], 'FunctionNames':'edge_function'} elif user_params['action'] == 'query_product_env': if not user_params.has_key('domain'): parser.print_help() sys.exit(0) req_args = {'Action':'DescribeCdnDomainConfigs', 'DomainName':user_params['domain'], 'FunctionNames':'edge_function'} elif user_params['action'] == 'del_test_env': if not user_params.has_key('domain') or not user_params.has_key('configid'): parser.print_help() sys.exit(0) req_args = {'Action':'DeleteSpecificStagingConfig', 'DomainName':user_params['domain'], 'ConfigId':user_params['configid']} elif user_params['action'] == 'del_product_env': if not user_params.has_key('domain') or not user_params.has_key('configid'): parser.print_help() sys.exit(0) req_args = {'Action':'DeleteSpecificConfig', 'DomainName':user_params['domain'], 'ConfigId':user_params['configid']} elif user_params['action'] == 'publish_test_env': if not user_params.has_key('domain'): parser.print_help() sys.exit(0) req_args = {'Action':'PublishStagingConfigToProduction', 'DomainName':user_params['domain'], 'FunctionName':'edge_function'} elif user_params['action'] == 'rollback_test_env': if not user_params.has_key('domain'): parser.print_help() sys.exit(0) req_args = {'Action':'RollbackStagingConfig', 'DomainName':user_params['domain'], 'FunctionName':'edge_function'} else: parser.print_help() sys.exit(0) return req_args if __name__ == '__main__': parser = OptionParser("%s Action=action Param1=Value1 Param2=Value2 %s\n" % (sys.argv[0], cmdlist)) parser.add_option("-i", "--id", dest="accesskeyid", help="specify access key id") parser.add_option("-s", "--secret", dest="accesskeysecret", help="specify access key secret") (options, args) = parser.parse_args() if len(args) < 1: parser.print_help() sys.exit(0) if args[0] == 'help': parser.print_help() sys.exit(0) if args[0] != 'config': setup_credentials() else: #it's a configure id/secret command configure_accesskeypair(args, options) sys.exit(0) user_params = {} idx = 1 if sys.argv[1].lower().startswith('action='): _, value = sys.argv[1].split('=') user_params['action'] = value idx = 2 else: parser.print_help() sys.exit(0) for arg in sys.argv[idx:]: try: key, value = arg.split('=', 1) if key == 'rule': # push_test_env / push_product_env if not user_params.has_key('rule'): user_params['rule'] = [] user_params['rule'].append(json.loads(value)) else: user_params[key.strip()] = value except ValueError, e: print(e.read().strip()) raise SystemExit(e) req_args = parse_args(user_params) make_request(req_args)
CLI工具的使用说明
配置AccessKey ID和AccessKey Secret。
$python ./es.py config --id=AK_ID --secret=AK_SECRET $cat aliyun.ini [Credentials] accesskeyid = 更新后AK accesskeysecret = 更新后AK Secret
发布边缘脚本规则至模拟环境或生产环境。
./es.py action=push_test_env domain=<domain> rule='{"pos":"<head|foot>","pri":"0-999","rule_path":"<the es code path>","enable":"<on|off>"}' ./es.py action=push_product_env domain=<domain> rule='{"pos":"<head|foot>","pri":"0-999","rule_path":"<the es code path>","enable":"<on|off>","configid":"<configid>"}'
规则字段说明,请参见下表:
字段名称
是否必选
说明
enable
是
是否生效。取值:
on
off
pos
是
执行位置。取值:
head
foot
pri
是
执行优先级。不同执行位置的优先级各自独立。取值范围:0~999。
0表示优先级最高
999表示优先级最低。
rule
是
规则内容。
brk
否
本规则命中情况下,是否终止本阶段剩余规则的执行。取值:
on
off
testip
否
测试客户IP。默认为空。如果配置了该参数,则只有客户端IP地址可触发本条规则执行。
option
否
扩展项。当前支持扩展。
_es_dbg=signature
用于增加调试响应头。说明为域名添加功能配置时会生成ConfigId(功能配置ID,唯一性),通过指定ConfigId可更新或删除域名配置,通过ConfigId使用说明可以了解到ConfigId的生成、查询和使用方法。
新增规则不需要指定configid。
修改规则需要指定configid,使用查询接口可获取到对应规则的configid。
您可以指定多条rule。
查询模拟环境或生产环境下的边缘脚本规则。
./es.py action=query_test_env domain=<domain> ./es.py action=query_product_env domain=<domain>
删除模拟环境或生产环境下的边缘脚本规则。
./es.py action=del_test_env domain=<domain> configid=<configid> ./es.py action=del_product_env domain=<domain> configid=<configid>
模拟环境的边缘脚本规则执行正式发布或回滚。
./es.py action=publish_test_env domain=<domain> ./es.py action=rollback_test_env domain=<domain>
基于m3u8.es规则拦截所有.m3u8请求做示例,为您详细介绍边缘规则的编写、保存、测试和发布的操作方法。具体操作,请参见示例:通过CLI工具使用边缘脚本。
实时调试方式
配置实时调试。
您可以通过控制台的WebIDE图形化操作页面进行
_es_dbg
配置,也可以使用如下命令进行配置。./es.py action=push_test_env domain=<domain> rule='{"pos":"<head|foot>","pri":"0-999","rule_path":"<the es code path>","enable":"<on|off>","configid":"<configid>", "option":"_es_dbg=123"}'
测试请求。
测试时请求参数中携带
_es_dbg
参数,参数值为配置的option中_es_dbg
的值,关注如下响应头信息。Trace信息:
X-DEBUG-ES-TRACE-RULE-{规则ID}
,查看对应规则的执行流,格式为_行号_函数名(入参):返回值{_执行耗时}
。