【FC控制台】通过函数计算转存离线日志到对象存储OSS

通过阿里云函数计算服务,将阿里云CDN产生的离线日志自动、定期地转存到对象存储OSS中,以实现日志的长期归档和分析。

背景信息

阿里云CDN为您的加速域名提供详细的访问日志,这些离线日志是进行用户行为分析、服务问题排查和运营数据统计的重要依据。根据阿里云CDN的服务策略,离线日志文件在阿里云CDN服务器上仅保留30,超过该期限后将被自动清理。

为了满足数据合规、长期审计或历史数据回溯等需求,您可能需要对这些日志进行永久性保存。对象存储OSS提供了高可用、低成本且可持久化的存储方案,是日志数据长期归档的理想选择。函数计算用于监听产生阿里云CDN日志的事件,并且调用任务函数将阿里云CDN的离线日志转存到对象存储OSS中。通过本方案,您可以搭建一个自动化的工作流,将阿里云CDN日志无缝转存至您对象存储OSS Bucket中。

执行逻辑

整个自动化转存方案的核心是利用函数计算作为“调度器”和“搬运工”,连接阿里云CDN和对象存储OSS。其工作流程如下:

  1. 事件触发:在函数计算中设置一个触发器,每当阿里云CDN产生日志的时候,都会触发该触发器。

  2. 函数执行:事件触发器被触发后,会自动执行关联的函数代码。

  3. 拉取日志:函数代码会根据当前日期计算出需要拉取的前一天的日志文件名,并生成阿里云CDN离线日志的下载地址。随后,函数向该地址发起请求,下载日志文件到函数计算的临时环境中。

  4. 转存对象存储OSS:函数成功下载日志文件后,会调用对象存储OSSAPI,将该文件上传到您指定的对象存储OSS Bucket中的指定目录下。

整个过程全自动执行,深度集成阿里云CDN、函数计算、对象存储OSS三项阿里云服务,提高云上管理服务的效率。

mermaid-diagram-2025-08-11-113109

计费说明

本方案涉及以下产品的计费,请您关注:

  • 阿里云CDN:阿里云CDN生成和提供离线日志下载的功能免费

  • 函数计算:函数计算会根据函数执行次数、消耗的资源(vCPU和内存)以及执行时长等进行计费。对于每日仅执行少量几次的轻量级日志转存任务,费用通常极低。详情请参见函数计算计费概述

  • 对象存储OSS:对象存储OSS会根据您占用的存储空间、API请求次数以及可能产生的外网流出流量进行计费。详情请参见对象存储OSS计费概述

前提条件

  • 请确保您的阿里云CDN服务、函数计算服务以及对象存储OSS服务均在同一个阿里云账号下开通,保证服务间授权和顺利访问。

  • 参考创建存储空间,提前在对象存储OSS中创建一个用于存放日志文件的Bucket,并记录好Bucket存储空间名称外网访问的Endpoint存放日志文件的目录名称

配置步骤

1. 获取Bucket配置

在函数计算中创建任务函数的时候,需要填写存储日志的对象存储OSS信息,因此,需要提前准备好Bucket存储空间名称外网访问的Endpoint存放日志文件的目录名称信息。请参考以下步骤获取相应的信息:

获取Bucket配置

  1. 点击进入对象存储OSS控制台的Bucket列表页签,选择存储日志的Bucket。

  2. 点击Bucket名称进入Bucket详情页。

  3. Bucket中选择概览页签。从基本信息模块获取存储空间名称,从访问宽口模块获取外网访问Endpoint(地域节点)值。

    image

  4. 点击文件管理的文件列表,在文件列表中点击新建目录并输入目录名称(推荐目录名称为cdn_log)。

    image

2. 创建函数计算任务

本自动化转存方案的核心是利用函数计算作为“调度器”和“搬运工”,因此需要在函数计算中配置对应触发器和任务函数。

  1. 进入函数计算3.0控制台,在左侧导航栏选择函数

  2. 函数页签中,点击创建函数,选择事件函数,点击创建事件函数

    image

  3. 创建事件函数时 只需配置 影响函数正常执行的关键参数。

    • 基础配置-函数名称:后续操作需要使用该函数名(推荐使用cdn-log-dump)。

    • 函数代码-运行环境:任务函数是Python代码,所以此处选择内置运行时PythonPython 3.10

    • 更多配置-环境变量:任务函数中需要获取存储的Bucket信息,因此需要在环境变量中传入Bucket的配置信息。创建三个环境变量并填入对应参数:

      • target_oss_bucketBucket存储空间名称

      • target_oss_endpoint外网访问的Endpoint

      • target_oss_prefix存放日志文件的目录名称

      image

  4. 参数配置完成之后,点击创建即可完成函数的创建。

  5. 函数详情中,点击进入触发器页签,点击创建触发器

    image

  6. 按照以下指引,完成对触发器关键参数的配置。点击确定

    • 触发器类型:选择CDN 同步调用

    • 名称:填写触发器名称(推荐使用cdn-logs-triggers)。

    • 触发事件:选择LogFileCreated

    • 域名:此处必须填写同一个阿里云账号下并且正常运行的阿里云CDN加速域名。

    • 描述:填写该触发器的描述信息(推荐使用:CDN离线日志文件生成触发器)。

    • 角色名称:选择AliyunCDNEventNotificationRole

  7. 在完成触发器参数的配置之后,点击确定。如果此时出现尚未创建 CDN 触发器使用的默认角色的提示,点击立即授权,根据指引完成默认角色的创建;如果没有出现该提示,则直接完成触发器的创建。

    image

  8. 函数详情中,点击进入代码页签,在在线编译器中,输入下方的代码(从阿里云CDN拉取离线日志,并且存入对象存储OSS中)。

    转存任务代码

    # coding=utf-8
    
    import os, time, json, requests, traceback, oss2, fc2
    from requests.exceptions import *
    from fc2.fc_exceptions import *
    from oss2.models import PartInfo
    from oss2.exceptions import *
    from multiprocessing import Pool
    from contextlib import closing
    
    MAX_PROCCESSES = 20 # 每个子任务中的工作进程数量
    BLOCK_SIZE = 6 * 1024 * 1024 # 每个分片的大小
    BLOCK_NUM_INTERNAL = 18 # 内部URL情况下每个子任务中的默认块数量
    BLOCK_NUM = 10 # 每个子任务中的默认块数量
    MAX_SUBTASKS = 49 # 执行子任务的工作进程数量
    CHUNK_SIZE = 8 * 1024 # 每个块的大小
    SLEEP_TIME = 0.1 # 重试时的初始等待秒数
    MAX_RETRY_TIME = 10 # 最大重试次数
    
    def retry(func):
        """
        带重试机制的函数执行器。
        :param func: (必需, lambda) 要执行的函数。
        :return: func的执行结果。
        """
        wait_time = SLEEP_TIME
        retry_cnt = 1
        while True:
            if retry_cnt > MAX_RETRY_TIME:
                return func()
            try:
                return func()
            except (ConnectionError, SSLError, ConnectTimeout, Timeout) as e:
                print(traceback.format_exc())
            except (OssError) as e:
                if 500 <= e.status < 600:
                    print(traceback.format_exc())
                else:
                    raise Exception(e)
            except (FcError) as e:
                if (500 <= e.status_code < 600) or (e.status_code == 429):
                    print(traceback.format_exc())
                else:
                    raise Exception(e)
            print('重试第 %d 次...' % retry_cnt)
            time.sleep(wait_time)
            wait_time *= 2
            retry_cnt += 1
    
    def get_info(url):
        """
        获取文件的CRC64和总长度。
        :param url: (必需, string) 文件的URL地址。
        :return: CRC64, 长度
        """
        with retry(lambda : requests.get(url, {}, stream = True)) as r:
            return r.headers['x-oss-hash-crc64ecma'], int(r.headers['content-length'])
    
    class Response(object):
        """
        支持分块读取的响应类。
        """
        def __init__(self, response):
            self.response = response
            self.status = response.status_code
            self.headers = response.headers
    
        def read(self, amt = None):
            if amt is None:
                content = b''
                for chunk in self.response.iter_content(CHUNK_SIZE):
                    content += chunk
                return content
            else:
                try:
                    return next(self.response.iter_content(amt))
                except StopIteration:
                    return b''
    
        def __iter__(self):
            return self.response.iter_content(CHUNK_SIZE)
    
    def migrate_part(args):
        """
        从URL下载一个分片,然后上传到OSS。
        :param args: (bucket, object_name, upload_id, part_number, url, st, en)
        :bucket: (必需, Bucket) 目标OSS存储桶。
        :object_name: (必需, string) 目标对象名。
        :upload_id: (必需, integer) 此上传任务的上传ID。
        :part_number: (integer) 此分片的分片号。
        :url: (必需, string) 文件的URL地址。
        :st, en: (必需, integer) 此分片的字节范围,表示[st, en]。
        :return: (part_number, etag)
        :part_number: (integer) 此分片的分片号。
        :etag: (string) upload_part结果的etag。
        """
        bucket = args[0]
        object_name = args[1]
        upload_id = args[2]
        part_number = args[3]
        url = args[4]
        st = args[5]
        en = args[6]
        try:
            headers = {'Range' : 'bytes=%d-%d' % (st, en)}
            resp = Response(retry(lambda : requests.get(url, headers = headers, stream = True)))
            result = retry(lambda : bucket.upload_part(object_name, upload_id, part_number, resp))
            return (part_number, result.etag)
        except Exception as e:
            print(traceback.format_exc())
            raise Exception(e)
    
    def do_subtask(event, context):
        """
        从URL下载文件的一个范围,然后上传到OSS。
        :param event: (必需, json) 事件的JSON格式。
        :param context: (必需, FCContext) 处理器的上下文。
        :return: parts
        :parts: ([(integer, string)]) 每个进程的分片号和etag。
        """
        oss_endpoint = os.environ.get('target_oss_endpoint')
        oss_bucket_name = os.environ.get('target_oss_bucket')
        access_key_id = context.credentials.access_key_id
        access_key_secret = context.credentials.access_key_secret
        security_token = context.credentials.security_token
        auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
        bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name)
        object_name = event['object_name']
        upload_id = event['upload_id']
        part_number = event['part_number']
        url = event['url']
        st = event['st']
        en = event['en']
        if part_number == 1:
            return [migrate_part((bucket, object_name, upload_id, part_number, url, st, en))]
        pool = Pool(MAX_PROCCESSES)
        tasks = []
        while st <= en:
            nxt = min(en, st + BLOCK_SIZE - 1)
            tasks.append((bucket, object_name, upload_id, part_number, url, st, nxt))
            part_number += 1
            st = nxt + 1
        parts = pool.map(migrate_part, tasks)
        pool.close()
        pool.join()
        return parts
    
    def invoke_subtask(args):
        """
        同步调用同一个函数来启动子任务。
        :param args: (object_name, upload_id, part_number, url, st, en, context)
        :object_name: (必需, string) 目标对象名。
        :upload_id: (必需, integer) 此上传任务的上传ID。
        :part_number: (integer) 此子任务中第一个分片的分片号。
        :url: (必需, string) 文件的URL地址。
        :st, en: (必需, integer) 此子任务的字节范围,表示[st, en]。
        :context: (必需, FCContext) 处理器的上下文。
        :return: 被调用函数的返回值。
        """
        object_name = args[0]
        upload_id = args[1]
        part_number = args[2]
        url = args[3]
        st = args[4]
        en = args[5]
        context = args[6]
        account_id = context.account_id
        access_key_id = context.credentials.access_key_id
        access_key_secret = context.credentials.access_key_secret
        security_token = context.credentials.security_token
        region = context.region
        service_name = context.service.name
        function_name = context.function.name
        endpoint = 'http://%s.%s-internal.fc.aliyuncs.com' % (account_id, region)
        client = fc2.Client(
            endpoint = endpoint,
            accessKeyID = access_key_id,
            accessKeySecret = access_key_secret,
            securityToken = security_token
        )
        payload = {
            'object_name' : object_name,
            'upload_id' : upload_id,
            'part_number' : part_number,
            'url' : url,
            'st' : st,
            'en' : en,
            'is_children' : True
        }
        if part_number == 1:
            return json.dumps(do_subtask(payload, context))
        ret = retry(lambda : client.invoke_function(service_name, function_name, payload = json.dumps(payload)))
        return ret.data
    
    def divide(n, m):
        """
        不使用浮点运算计算ceil(n / m)。
        :param n, m: (integer)
        :return: (integer) ceil(n / m)。
        """
        ret = n // m
        if n % m > 0:
            ret += 1
        return ret
    
    def migrate_file(url, oss_object_name, context):
        """
        从URL下载文件,然后上传到OSS。
        :param url: (必需, string) 文件的URL地址。
        :param oss_object_name: (必需, string) 目标对象名。
        :param context: (必需, FCContext) 处理器的上下文。
        :return: actual_crc64, expect_crc64
        :actual_crc64: (string) 上传的CRC64。
        :expect_crc64: (string) 源文件的CRC64。
        """
        crc64, total_size = get_info(url)
        oss_endpoint = os.environ.get('target_oss_endpoint')
        oss_bucket_name = os.environ.get('target_oss_bucket')
        access_key_id = context.credentials.access_key_id
        access_key_secret = context.credentials.access_key_secret
        security_token = context.credentials.security_token
        auth = oss2.StsAuth(access_key_id, access_key_secret, security_token)
        bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name)
        upload_id = retry(lambda : bucket.init_multipart_upload(oss_object_name)).upload_id
        pool = Pool(MAX_SUBTASKS)
        st = 0
        part_number = 1
        tasks = []
        block_num = BLOCK_NUM_INTERNAL if '-internal.aliyuncs.com' in oss_endpoint else BLOCK_NUM
        block_num = min(block_num, divide(divide(total_size, BLOCK_SIZE), MAX_SUBTASKS + 1))
        while st < total_size:
            en = min(total_size - 1, st + block_num * BLOCK_SIZE - 1)
            tasks.append((oss_object_name, upload_id, part_number, url, st, en, context))
            size = en - st + 1
            cnt = divide(size, BLOCK_SIZE)
            part_number += cnt
            st = en + 1
        subtasks = pool.map(invoke_subtask, tasks)
        pool.close()
        pool.join()
        parts = []
        for it in subtasks:
            for part in json.loads(it):
                parts.append(PartInfo(part[0], part[1]))
        res = retry(lambda : bucket.complete_multipart_upload(oss_object_name, upload_id, parts))
        return str(res.crc), str(crc64)
    
    def get_oss_object_name(url):
        """
        获取OSS对象名。
        :param url: (必需, string) 文件的URL地址。
        :return: (string) OSS对象名。
        """
        prefix = os.environ.get('target_oss_prefix')
        tmps = url.split('?')
        if len(tmps) != 2:
            raise Exception('无效的URL: %s' % url)
        urlObject = tmps[0]
        if urlObject.count('/') < 3:
            raise Exception('无效的URL: %s' % url)
        objectParts = urlObject.split('/')
        objectParts = [prefix] + objectParts[len(objectParts) - 3 : len(objectParts)]
        return '/'.join(objectParts)
    
    def handler(event, context):
        evt = json.loads(event)
        if list(evt.keys()).count('is_children'):
            return json.dumps(do_subtask(evt, context))
        url = evt['events'][0]['eventParameter']['filePath']
        if not (url.startswith('http://') or url.startswith('https://')):
            url = 'https://' + url
        oss_object_name = get_oss_object_name(url)
        st_time = int(time.time())
        wait_time = SLEEP_TIME
        retry_cnt = 1
        while True:
            actual_crc64, expect_crc64 = migrate_file(url, oss_object_name, context)
            if actual_crc64 == expect_crc64:
                break
            print('迁移对象CRC64不匹配,期望值: %s,实际值: %s' % (expect_crc64, actual_crc64))
            if retry_cnt > MAX_RETRY_TIME:
                raise Exception('超过最大重试次数。')
            print('重试第 %d 次...' % retry_cnt)
            time.sleep(wait_time)
            wait_time *= 2
            retry_cnt += 1
        print('成功!总耗时: %d 秒。' % (int(time.time()) - st_time))
    
  9. 点击部署代码,即可完成整个函数的配置。

3. 创建专属角色和权限策略

函数计算使用对象存储OSS的时候,需要具有访问对象存储OSS的权限。为了简化授权流程,函数计算支持关联角色。按照以下步骤,给这个离线日志转存函数配置一个可以使用对象存储OSS的角色。

  1. 打开RAM 访问控制控制台,选择权限管理权限策略页签。

  2. 点击创建权限策略,选择脚本编辑

  3. 将下边策略中的BucketName改为自己的Bucket存储空间名称,将三处FC-NAME都替换成配置步骤2中的函数名称(推荐使用cdn-log-dump)。

    {
      "Version": "1",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "oss:PutObject",
          "Resource": "acs:oss:*:*:BucketName/*"
        },
        {
          "Effect": "Allow",
          "Action": "fc:InvokeFunction",
          "Resource": [
            "acs:fc:*:*:services/FC-NAME/functions/FC-NAME",
            "acs:fc:*:*:services/FC-NAME.*/functions/*"
          ]
        }
      ]
    }
  4. 点击确定,填写策略名称备注。然后再次点击确定,完成创建权限策略(策略名称推荐使用:AliyunCDNLogDumpAccess;备注推荐使用:管理CDN离线日志转存的权限)。

  5. 点击身份管理中的角色页签,点击创建角色

  6. 信任主体类型选择云账号信任主体名称选择当前云账号xxxxxxx,然后点击确定

  7. 创建角色弹出框中输入角色名称(推荐使用AliyunCDNLogDumpRole)然后点击确定,完成角色的创建。

  8. 在创建角色的基本信息中,选择权限管理页签,点击精确授权策略类型选择自定义策略策略名称填写第四步创建的权限策略名称(推荐使用AliyunCDNLogDumpAccess)。然后点击确定

  9. 信任策略页签中,点击编辑信任策略。在脚本编辑中,输入下方的信任策略,然后点击确定

    {
      "Statement": [
        {
          "Action": "sts:AssumeRole",
          "Effect": "Allow",
          "Principal": {
            "Service": [
              "fc.aliyuncs.com"
            ]
          }
        }
      ],
      "Version": "1"
    }

至此,已经完成了整个角色和权限的配置,接下来需要把这个角色绑定到函数计算任务上。

4. 函数计算任务绑定角色

  1. 在函数计算控制台的函数页签,选择步骤2创建的函数,点击配置

  2. 配置页签中,选择高级配置,点击对应的配置按钮。

    image

  3. 在高级配置中,找到权限-函数角色选项,选择步骤3创建的角色(推荐使用AliyunCDNLogDumpRole)。然后点击部署,完成函数计算任务绑定角色的操作。

    image

5. 测试函数计算任务(可选)

在完成前四个步骤之后,整个阿里云CDN离线日志转存到对象存储OSS的操作已经全部完成。但是由于离线日志的生成存在24小时左右的延迟,因此无法及时看到配置的函数计算任务是否能够正常执行。您可以按照以下步骤测试配置的函数计算任务。

  1. 在函数计算控制台的函数页签,选择步骤2创建的函数,点击配置

  2. 测试页签中,测试请求操作选择创建新测试事件事件模板选择CDN(LogFileCreated)事件名称填写测试cdn_log_dump

    image

  3. 用下方获取的参数,替换修改事件模板中的filePath参数。

    如何获取测试的filePath参数

    1. 进入阿里云CDN离线日志下载控制台。

    2. 选择触发器配置的加速域名,日期选择当前日期前一天,点击查询。

    3. 选择其中一个文件,复制文件的下载链接(鼠标放置在下载按钮上,右键选择复制链接)。

  4. 点击测试函数,待执行完成之后,可以看到返回结果为null,并且执行成功的状态。

    image

  5. 在对象存储Bucket控制台,选择用作存储阿里云CDN日志的Bucket。

  6. 选择文件列表,进入到配置的存储阿里云CDN日志的目录中,可以看到以加速域名命名的文件夹,文件夹内有以日期命名的子文件夹,里面包含了 5.3 步骤中配置的文件。说明函数计算任务已成功处理阿里云CDN离线日志转存。

    image