日志服务自定义ETL功能的数据消费端运行在阿里云函数计算服务上,根据不同的ETL用途,您可以选择使用日志服务提供的函数模板,或是自定义实现个性化函数。

本文介绍如何实现一个自定义日志服务ETL函数。

函数Event

函数Event即运行函数的输入参数,格式为一个JSON Object序列化后字符串。

字段说明
  • jobName字段

    日志服务ETL Job名字,函数计算服务上的日志服务触发器对应一个日志服务的ETL Job。

  • taskId字段

    对于一个ETL Job,taskId是某一次确定性的函数调用的标识。

  • cursorTime字段

    本次函数调用包括的数据中,最后一条日志到达日志服务服务端的unix_timestamp。

  • source字段

    该字段由日志服务生成,日志服务根据ETL Job定义的任务间隔定时触发函数执行,source字段是函数Event中很重要的一部分,定义了本次函数调用应该消费哪些数据。

    该数据源范围由如下字段(相关字段定义请参考日志服务概念介绍)组成:

    字段 说明
    endpoint Project所属日志服务Region的服务入口
    projectName Project名称
    logstoreName Logstore名称
    shardId Logstore下的某一个确定Shard
    beginCursor 需要从Shard的什么位置开始消费数据
    endCursor 需要消费Shard数据到什么位置
    说明
    Shard对应的[beginCursor, endCursor)是一个左闭右开区间。
  • parameter字段

    JSON Object类型,由用户在创建ETL Job(函数计算的LOG触发器)时设定。自定义函数运行时解析该字段,可以获取到函数所需要的运行参数。

    在函数计算控制台创建日志服务触发器时,通过填写 函数配置进行设置,如下图:
    图 1. 函数配置


函数Event示例

{
    "source": {
        "endpoint": "http://cn-shanghai-intranet.log.aliyuncs.com", 
        "projectName": "fc-1584293594287572", 
        "logstoreName": "demo", 
        "shardId": 0, 
        "beginCursor": "MTUwNTM5MDI3NTY1ODcwNzU2Ng==", 
        "endCursor": "MTUwNTM5MDI3NTY1ODcwNzU2OA=="
    }, 
    "parameter": {
        ...
    }, 
    "jobName": "fedad35f51a2a97b466da57fd71f315f539d2234", 
    "taskId": "9bc06c96-e364-4f41-85eb-b6e579214ae4",
    "cursorTime": 1511429883
}

在函数调试时候,你也可以根据GetCursor接口获取cursor并按上述格式自行组装一个函数Event用于测试。

函数开发

您可以使用Java/Python/Node.js等多种语言实现函数功能,日志服务提供了相应runtime的各种语言SDK以便您在函数中进行集成。

本节以Java8 runtime为例,介绍如何开发日志服务ETL函数。由于涉及Java8函数编程细节,请先阅读函数计算服务Java编程指南

Java函数模板

目前,日志服务提供了基于Java8执行环境的自定义ETL函数模板,您可以在这基础上完成自定义需求的实现。

模板已实现以下功能:

  • 函数Event中source、taskId、jobName字段的解析。
  • 根据source中定义的数据源,通过日志服务Java SDK拉取数据,并对每一批数据调用processData接口进行处理。

在模板中,您还需要实现以下功能:

  • 函数Event中parameter字段的解析,UserDefinedFunctionParameter.java实现。
  • 函数内针对数据的自定义业务逻辑,UserDefinedFunction.java的processData接口实现。
  • 为您的函数取一个可以恰当描述功能的名字,替换UserDefinedFunction
processData方法实现

您需要在processData内完成对一批数据的消费、加工、投递,视具体需求而定。

可以参考LogstoreReplication,它实现了将数据从一个Logstore中读出后写到另一个日志服务Logstore。

note
说明
  1. processData处理数据成功请返回true,处理数据遇到异常无法重试成功的请返回false,但此时函数还会继续运行下去,且日志服务判定这是一次成功的ETL任务,只是忽略了一些处理异常的数据。
  2. 如果遇到致命错误或业务逻辑上认为有异常需要提前终止函数执行,请通过throw Exception方式跳出函数运行,日志服务可以据此判断函数运行异常,并会按照ETL Job设置的规则重新调用函数执行。

注意事项

  • 如果Shard流量较大,请注意为函数配置足够的内存规格,以避免函数OOM导致异常终止。
  • 如果函数内执行耗时操作,或者Shard流量较大,请注意设置较短的函数触发间隔和较长的函数运行超时时间。
  • 请为函数服务配置足够的权限,例如在函数内写OSS就需要配置OSS写权限。

ETL日志

  • ETL调度日志

    调度日志只记录ETL任务开始的时间、结束时间,任务是否成功以及成功返回的信息。如果ETL任务出错了,不仅要生成ETL出错日志,而且要向系统管理员发送报警邮件或短信。在创建触发器时设置触发器日志Logstore,并为该Logstore开通索引查询功能。

    对于函数执行的统计,可以通过函数写出返回,例如Java8函数的outputStream。日志服务默认提供的模板会写出一个JSON Object序列化后的字符串,该字符串将一并记录在ETL任务调度日志中,方便您进行统计、查询。

  • ETL过程日志

    这一部分日志是在ETL执行过程中每执行一步的记录关键点和错误,包括某一步骤的开始、结束时间、初始化动作完成情况,模块出错信息等。ETL过程日志的意义是随时可以感知ETL运行情况,如果发生错误,可以及时通过过程日志查找原因。

    您可以通过context.getLogger()记录过程日志到日志服务指定的Project、Logstore,建议为该Logstore开启索引查询功能。