依托阿里云函数计算服务,日志服务提供流式数据加工服务。您可以通过配置一个函数计算触发器任务,定时获取更新的数据并触发函数的执行,进而增量消费日志服务Logstore的数据,并完成自定义加工任务。日志服务提供的函数模板或者用户自定义函数均可作为数据加工函数。
前提条件
已为日志服务触发函数执行授权。更多信息,请参见云资源访问授权。
已创建Project和Logstore。更多信息,请参见创建Project和Logstore。
使用限制
单个日志项目(Project)关联的SLS触发器数量最大不得超过该Project下已有的Logstore数量的5倍。
建议每个Logstore配置的SLS触发器数量不超过5个,否则可能会影响数据投递到函数计算的效率。
适用场景
数据清洗、加工场景
通过日志服务,快速完成日志采集、加工、查询及分析。
数据投递场景
为数据的目的端提供支撑,构建云上大数据产品之间的数据管道。
数据加工函数
函数类型
模板函数
更多信息,请参见aliyun-log-fc-functions。
用户自定义函数
函数配置的格式与函数的具体实现有关。更多信息,请参见ETL函数开发指南。
函数计算触发机制
函数计算触发器任务对应于函数计算的一个触发器,当创建函数计算触发器任务后,日志服务会根据该触发器任务的配置启动定时器,定时器轮询Logstore中的Shard信息,当发现有新的数据写入时,即生成
<shard_id,begin_cursor,end_cursor >
三元组信息作为函数Event,并触发函数执行。说明当存储系统升级时,即使没有新数据写入,也可能发生Cursor变化,在这种情况下,每个Shard会额外空触发一次。针对这种情况,您可以在函数内通过Cursor尝试获取Shard的数据,如果获取不到数据说明是一次空触发,可以在函数内做忽略处理。更多信息,请参见自定义函数开发指南。
函数计算触发器任务触发机制是时间触发。例如:您设置的函数计算触发器任务触发间隔为60秒,Logstore的Shard0一直有数据写入,那么Shard0每60秒就会触发一次函数执行(如果Shard没有新的数据写入则不会触发函数执行),函数执行的输入为最近60秒的Cursor区间。在函数内,可以根据Cursor读取Shard0数据进行下一步处理。
步骤一:创建SLS触发器
登录函数计算控制台,在左侧导航栏,单击函数。
在顶部菜单栏,选择地域,然后在函数页面,单击目标函数。
在函数详情页面,选择配置页签,在左侧导航栏,单击触发器,然后单击创建触发器。
在创建触发器面板,填写相关信息,然后单击确定。
配置项
操作
本文示例
触发器类型
选择日志服务 SLS。
日志服务SLS
名称
填写自定义的触发器名称。
log_trigger
版本或别名
默认值为LATEST,如果您需要创建其他版本或别名的触发器,需先在函数详情页的右上角切换到该版本或别名。关于版本和别名的简介,请参见版本管理和别名管理。
LATEST
日志项目
选择已创建的日志项目。
aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****
日志库
选择已创建的日志库,当前触发器会定时从该日志库中订阅数据到函数服务进行自定义加工。
function-log
触发间隔
填写日志服务触发函数运行的时间间隔。
取值范围:[3,600],单位:秒。默认值:60。
60
重试次数
填写单次触发允许的最大重试次数。
取值范围:[0,100]。默认值:3。
说明执行成功的情况为status=200并且header中参数
X-Fc-Error-Type
的值不是UnhandledInvocationError
和HandledInvocationError
的错误。其他情况表示执行失败,会触发重试。关于参数X-Fc-Error-Type
请参见返回数据。如果函数执行失败,会一直重试当前请求,直到函数执行成功。首先会按照配置的重试次数进行重试,超过最大重试次数仍然无法成功的,会增加时间间隔进入退避重试。
3
触发器日志
选择已创建的日志库,日志服务触发函数执行过程的日志会记录到该日志库中。
function-log2
调用参数
如果您想传入自定义参数,可以在此处配置。该参数将作为event的parameter参数传入函数。该参数取值必须是JSON格式的字符串。
默认值为空。
无
角色名称
选择AliyunLogETLRole。
说明如果您第一次创建该类型的触发器,则需要在单击确定后,在弹出的对话框中选择立即授权。
AliyunLogETLRole
创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理。
步骤二:配置函数的入口参数
在函数详情页面的代码页签,单击测试函数右侧的图标,从下拉列表中,选择配置测试参数。
在配置测试参数面板,选择创建新测试事件或编辑已有测试事件,填写事件名称和事件内容,然后单击确定。
event是函数计算的入口参数。具体格式如下:
{ "parameter": {}, "source": { "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }, "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****", "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****", "cursorTime": 1529486425 }
参数
描述
本文示例
parameter
您配置触发器时填写的调用参数的值。
无。
source
设置函数读取的日志块信息。
endpoint:日志服务Project所属的阿里云地域。
projectName:日志服务Project名称。
logstoreName:Logstore名称。
shardId:Logstore中一个确定的Shard。
beginCursor:开始消费数据的位置。
endCursor:停止消费数据的位置。
{ "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }
jobName
日志服务ETL Job名字,函数配置的SLS触发器对应一个日志服务的ETL Job。
1f7043ced683de1a4e3d8d70b5a412843d81****
taskId
对于ETL Job而言,taskId是一个确定性的函数调用标识。
c2691505-38da-4d1b-998a-f1d4bb8c****
cursorTime
最后一条日志到达日志服务端的Unix时间戳,单位:秒。
1529486425
步骤三:编写函数并测试
完成创建日志触发器后,您可以编写函数代码并测试以验证代码的正确性。在实际操作过程中,当日志服务收集增量日志时触发该函数,函数计算获取对应日志,然后打印收集的日志。
在函数详情页面的代码页签,在代码编辑器中编写代码,然后单击部署代码。
本文以Python函数代码为例。以下示例代码可以作为提取大部分逻辑日志的模板。其中
accessKeyId
和accessKey
可以从context
和creds
中获取。""" 本代码样例主要实现以下功能: * 从 event 中解析出 SLS 事件触发相关信息 * 根据以上获取的信息,初始化 SLS 客户端 * 从源 log store 获取实时日志数据 This sample code is mainly doing the following things: * Get SLS processing related information from event * Initiate SLS client * Pull logs from source log store """ #!/usr/bin/env python # -*- coding: utf-8 -*- import logging import json import os from aliyun.log import LogClient logger = logging.getLogger() def handler(event, context): # 可以通过 context.credentials 获取密钥信息 # Access keys can be fetched through context.credentials print("The content in context entity is: \n") print(context) creds = context.credentials access_key_id = creds.access_key_id access_key_secret = creds.access_key_secret security_token = creds.security_token # 解析 event 参数至 object 格式 # parse event in object event_obj = json.loads(event.decode()) print("The content in event entity is: \n") print(event_obj) # 从 event.source 中获取日志项目名称、日志仓库名称、日志服务访问 endpoint、日志起始游标、日志终点游标以及分区 id # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source source = event_obj['source'] log_project = source['projectName'] log_store = source['logstoreName'] endpoint = source['endpoint'] begin_cursor = source['beginCursor'] end_cursor = source['endCursor'] shard_id = source['shardId'] # 初始化 sls 客户端 # Initialize client of sls client = LogClient(endpoint=endpoint, accessKeyId=access_key_id, accessKey=access_key_secret, securityToken=security_token) # 基于日志的游标从源日志库中读取日志,本示例中的游标范围包含了触发本次执行的所有日志内容 # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation while True: response = client.pull_logs(project_name=log_project, logstore_name=log_store, shard_id=shard_id, cursor=begin_cursor, count=100, end_cursor=end_cursor, compress=False) log_group_cnt = response.get_loggroup_count() if log_group_cnt == 0: break logger.info("get %d log group from %s" % (log_group_cnt, log_store)) logger.info(response.get_loggroup_list()) begin_cursor = response.get_next_cursor() return 'success'
单击测试函数。
执行完成后,您可以在函数代码页签的上方查看执行结果。
相关操作
常见问题
当您创建触发器后但未触发函数执行,如何解决?
您可以从以下两个方面排查。
确认函数计算触发器任务配置的Logstore是否有数据增量修改,当Shard数据有变化时会触发函数执行。
查看触发器日志、函数运行日志查看是否有异常。
为什么函数触发频次有时会高于预期的触发频次?
每个Shard是单独触发的,您看到的可能是一个Logstore整体触发次数很多,但每个Shard实时触发时间是符合间隔的。
单个Shard的触发间隔和每次处理的数据范围相同(时间区间)。触发间隔在函数执行时分如下两种情况,假设触发间隔为60秒。
触发没有延迟:按照设定周期触发,每60秒触发一次,处理的数据范围为
[now -60s, now)
。说明函数触发是分Shard独立进行的, 假设Logstore有10个Shard,在实时处理数据时(触发无延迟),每60秒对应10次函数触发执行。
触发发生延迟(当前处理到的日志服务Shard位置落后于最新写入数据超过10秒):触发器会进行追赶,可能缩短到2秒触发一次,每次处理的数据范围仍是60秒窗口。