全部产品

日志服务触发器

更新时间:2018-08-16 19:32:36

日志服务触发器通过日志服务定时获取更新的数据并触发函数进行日志加工。增量消费日志服务 Logstore 的数据,在函数里完成自定义加工任务。

日志服务触发器适用以下场景:

  • 数据清洗、加工场景。通过日志服务,快速完成日志采集、加工、查询、分析。如下图:数据加工
  • 数据投递场景。为数据的目的端落地提供支撑,构建云上大数据产品间的数据管道。如下图:数据投递

日志服务配置

触发器示例 :slsTrigger.yml

  1. triggerConfig:
  2. sourceConfig:
  3. project: "etl"
  4. logstore: "etl-log"
  5. jobConfig:
  6. maxRetryTime: 3
  7. triggerInterval: 60
  8. functionParameter:
  9. a: "b"
  10. c: "d"
  11. logConfig:
  12. project: "ali-fc-test"
  13. logstore: "test-store"
  14. enable: true

触发器参数说明 :

  • sourceConfig 为数据源配置参数,包含如下属性:
    • Logstore:日志服务日志库(Logstore)名称,亦即数据源。触发器会定时从该 Logstore 订阅数据到函数计算自定义加工。该参数创建后不允许修改。
  • jobConfig 为任务配置参数,包含如下属性:
    • triggerInterval:日志服务触发函数运行的间隔。取值范围:[3, 600],单位:秒。例如,triggerInterval: 60 表示每60秒读取 Logstore 的每个分区(Shard)最近60秒数据位置,并作为函数 event 调用函数执行,在函数内有用户逻辑读取 Shard 数据做计算。如果您的 logstore Shard 流量超过 1 MB/s 或者更高,建议缩短函数的触发间隔,使函数运行所处理的数据量合理大小。
    • maxRetryTime:单次触发允许的最大重试次数。取值范围:[0, 100]。日志服务根据触发间隔每次触发函数执行时,如果遇到错误时,例如权限不足、网络失败或者函数执行异常等,如果超过最大重试次数仍无法成功的,需要等到下一次触发间隔到来时,由日志服务再次触发函数执行。重试对业务造成的影响,因具体的函数代码实现逻辑而异。
  • functionParameter:日志服务将该配置内容作为函数 event 一部分传入函数,如何使用该函数由函数自定义逻辑决定。每一种函数实现所要求的函数配置可能是不同的,绝大部分默认提供的函数模板也需要参考说明填写您的参数。默认值为空({})。
  • logConfig 为触发器的日志文件配置,包含如下属性:
    • project:日志服务项目(Project)名称。
    • logstore:存放日志的 Logstore 名称。
  • enable 为是否启用触发器。取值范围:true|false。

事件格式:

  1. {
  2. "parameter":{
  3. "a":"b",
  4. "c":"d"
  5. },
  6. "source":{
  7. "endpoint":"http://cn-shanghai-intranet.log.aliyuncs.com",
  8. "projectName":"vangie-fc-test",
  9. "logstoreName":"fc-test",
  10. "shardId":0,
  11. "beginCursor":"MTUyMzI2NzI5NDY1NjI4MzgzNg==",
  12. "endCursor":"MTUyMzI2NzI5NDY1NjI4MzgzNw=="
  13. },
  14. "jobName":"05c79f637c6b46eaa85911cae032cf47551af7bb",
  15. "taskId":"d22697c0-2a41-4d35-b27c-dccec8856768",
  16. "cursorTime":1523323454
  17. }
  • parameter : 用户配置触发器时候填写的函数配置。
  • source : 为读取到日志服务日志块信息。
    • endpoint : Project 所属日志服务Region的。
    • projectName : Project 名称。
    • logstoreName : Logstore 名称。
    • shardId : Logstore 下的某一个确定 Shard。
    • beginCursor : 需要从 Shard 的什么位置开始消费数据。
    • endCursor : 需要消费 Shard 数据到什么位置。
  • jobName : 日志服务 ETL Job 名字,函数计算服务上的日志服务触发器对应一个日志服务的 ETL Job。
  • taskId : 对于一个 ETL Job,taskId 是某一次确定性的函数调用的标识。
  • cursorTime : 本次函数调用包括的数据中,最后一条日志到达日志服务服务端的 unix_timestamp。

示例 1. 控制台

本示例展示了使用控制台设置日志服务事件触发器的方式。可以在创建函数的时候设置触发器,也可以在函数创建完成后再设置触发器。

在创建函数时设置触发器步骤:

  1. 单击 【选择全部的语言】,在下拉菜单中选择 python2.7(本示例以python代码进行介绍)。
  2. 选择 【空白函数】。
  3. 按如下图示配置日志服务触发器,单击 【下一步】。

创建触发器其中函数配置可以在在函参数 event 中获取

  1. 创建函数并填写所在服务、函数名称、描述信息和运行环境信息。
  2. 单击【下一步】。
  3. 核对信息无误后,单击【创建】。

示例 2. sdk

fc-python-sdk 为例:

  1. import fc2
  2. client = fc2.Client(
  3. endpoint = '<Your Endpoint>',
  4. accessKeyID = '<Your AccessKeyID>',
  5. accessKeySecret = '<Your AccessKeySecret>')
  6. service_name = '<service_name>'
  7. function_name = '<function_name>'
  8. trigger_name = '<trigger_name>'
  9. # Create log trigger
  10. log_trigger_config = {
  11. 'sourceConfig': {
  12. 'logstore': 'log_store_source'
  13. },
  14. 'jobConfig': {
  15. 'triggerInterval': 60,
  16. 'maxRetryTime': 10
  17. },
  18. 'functionParameter': {},
  19. 'logConfig': {
  20. 'project': 'log_project',
  21. 'logstore': 'log_store'
  22. },
  23. 'enable': False
  24. }
  25. source_arn = 'acs:log:cn-shanghai:12345678:project/log_project'
  26. invocation_role = 'acs:ram::12345678:role/aliyunlogetlrole'
  27. client.create_trigger('service_name', 'function_name', 'trigger_name', 'oss',
  28. log_trigger_config, source_arn, invocation_role)