自定义函数开发指南

当您通过函数计算消费日志数据时,在不同场景您可以选择使用日志服务提供的函数模板或自定义函数。本文介绍如何构建一个自定义函数。

函数Event

在通过函数计算消费日志数据的过程中,步骤二需要配置函数的入口参数(函数Event),格式为一个JSON Object序列化后字符串。

  • 参数说明

    参数

    说明

    jobName

    日志服务ETL Job名称。函数计算服务上的日志服务触发器对应一个日志服务的ETL Job。

    taskId

    对于一个ETL Job,taskId是某一次确定性的函数调用标识。

    cursorTime

    本次函数调用包括的数据中,最后一条日志到达日志服务的服务器端的unix_timestamp。

    source

    该字段由日志服务生成,日志服务根据ETL Job定义的任务间隔定时触发函数执行,source字段是函数Event中的重要组成部分,定义了本次函数调用的消费范围。

    • endpoint:Project所属地域的服务入口,详情请参见服务入口

    • projectName:Project名称。

    • logstoreName:Logstore名称。

    • shardId:Logstore下的某一个确定Shard。

    • beginCursor:需要从Shard的什么位置开始消费数据。

    • endCursor:需要消费Shard数据到什么位置。

      说明

      Shard对应的[beginCursor,endCursor)是一个左闭右开区间。

    parameter

    JSON Object类型,在您创建触发器函数配置时设置。自定义日志服务ETL函数运行时解析该字段,可以获取到函数所需要的运行参数。详情请参见SLS触发器

  • 示例

    {
        "source": {
            "endpoint": "http://cn-shanghai-intranet.log.aliyuncs.com", 
            "projectName": "fc-****************", 
            "logstoreName": "demo", 
            "shardId": 0, 
            "beginCursor": "MTUwNTM5MDI3NTY1ODcwNzU2Ng==", 
            "endCursor": "MTUwNTM5MDI3NTY1ODcwNzU2OA=="
        }, 
        "parameter": {
            ...
        }, 
        "jobName": "fedad35f51a2a97b466da57fd71f315f539d2234", 
        "taskId": "9bc06c96-e364-4f41-85eb-b6e579214ae4",
        "cursorTime": 1511429883
    }

    在函数调试时候,您可以调用GetCursor - 通过时间查询Cursor接口获取cursor并按上述示例构建一个函数Event用于测试。

函数开发

您可以通过Java、Python、Node.js等多种语言实现函数开发,日志服务提供了相应Runtime的SDK,以便您在函数中进行集成,详情请参见SDK参考

以下内容以Java 8 Runtime为例,介绍如何开发日志服务ETL函数。关于Java 8函数编程细节,详情请参见函数计算服务Java编程指南

  • Java函数模板

    目前,日志服务提供了基于Java8 Runtime的自定义数据模板,您可以在这基础上完成自定义需求的实现。

    模板已实现以下功能:

    • 函数Event中source、taskId、jobName字段的解析。

    • 根据source中定义的数据源,通过Log Service Java SDK获取数据,并对每一批数据调用processData接口进行处理。

    在模板中,您还需要实现以下功能:

    • 函数Event中parameter字段的解析,通过UserDefinedFunctionParameter.java实现。

    • 函数内针对数据的自定义业务逻辑,通过UserDefinedFunction.java的processData接口实现。

    • 为您的函数取一个可以描述功能的名字,替换UserDefinedFunction

  • processData接口实现

    您可以在processData内完成对一批数据的消费、加工、投递。例如在LogstoreReplication中,实现了将数据从一个Logstore中读出后写到另一个Logstore。

    说明
    • processData处理数据成功则返回true,处理数据遇到异常无法重试成功则返回false,但此时函数还会继续运行下去,且日志服务判定这是一次成功的ETL任务,会忽略其中处理异常的数据。

    • 如果遇到致命错误或业务逻辑上认为有异常需要提前终止函数执行,会通过throw Exception方式跳出函数运行,日志服务可以据此判断函数运行异常,并会按照ETL Job设置的规则重新调用函数执行。

    • 如果Shard流量较大,请为函数配置足够的内存规格,以避免函数OOM导致异常终止。

    • 如果在函数内执行耗时操作或者Shard流量较大,请您设置较短的函数触发间隔和较长的函数运行超时时间。

    • 请您为函数服务配置足够的权限,例如在函数内写OSS就需要配置OSS写权限。

ETL日志

  • ETL调度日志

    调度日志记录ETL任务开始时间、结束时间、任务是否成功以及成功返回的信息。如果ETL任务出错会生成ETL出错日志,并向系统管理员发送报警邮件或短信。请您在创建触发器时设置触发器日志Logstore,并为该Logstore开启并配置索引,详情请参见创建索引

    对于函数执行结果的统计,可以通过函数返回,例如在Java8 Runtime函数的outputStream中体现。日志服务提供的函数模板会写出一个JSON Object序列化后的字符串,该字符串将记录在ETL任务调度日志中,方便您进行统计、查询。

  • ETL过程日志

    这一部分日志是在ETL执行过程中每执行一步记录的关键点和错误信息,包括某一步骤的开始和结束时间、初始化动作完成情况、模块出错信息等。ETL过程日志的意义是随时可以感知ETL运行情况,如果发生错误,可以及时通过过程日志查找原因。

    您可以通过context.getLogger()记录过程日志并存放在日志服务指定Project的Logstore中,建议您为该Logstore开启索引查询功能。