配置SLS触发器
您可以通过创建日志服务SLS触发器(以下简称SLS触发器)将日志服务SLS与函数计算连接起来,当有新日志产生时触发函数执行,对日志进行处理。本文以日志服务SLS调用函数计算,函数计算获取日志并打印为例,介绍如何在函数计算控制台为目标函数配置SLS触发器,包括创建触发器、配置入口参数以及编写函数代码并测试。
示例场景
您可以配置一个SLS触发器,该触发器将定时获取更新的数据并触发函数执行,增量消费日志服务Logstore中的数据,在函数里完成自定义加工任务(例如数据清洗和加工)以及将数据投递给第三方服务。本示例中只演示如何获取日志数据并打印。
用于数据加工的函数可以是日志服务提供的模板,也可以是您的自定义函数。
前提条件
函数计算
- 说明
在创建服务时,请配置好服务角色,函数会获得该角色所拥有的权限,否则在测试函数代码时会报错。本文示例配置的服务角色为
AliyunFCDefaultRole
,并在权限策略中增加AliyunLogReadOnlyAccess
。关于服务角色的信息,请参见授予函数计算访问其他云服务的权限。
日志服务SLS
说明需要创建一个日志项目和两个日志库。一个日志库用于处理日志及数据源,另一个日志库用于存储函数计算产生的日志。
步骤一:创建SLS触发器
- 登录函数计算控制台。
- 在左侧导航栏,单击服务及函数。
- 在顶部菜单栏,选择地域。
在服务列表页面,找到目标服务,在其右侧操作列单击函数管理。
在函数管理页面,单击目标函数名称。
在函数详情页面,单击触发器管理页签,从版本或别名下拉列表选择要创建触发器的版本或别名,然后单击创建触发器。
在创建触发器面板,填写相关信息。然后单击确定。
配置项
操作
本文示例
触发器类型
选择日志服务 SLS。
日志服务SLS
名称
填写自定义的触发器名称。
log_trigger
版本或别名
默认值为LATEST,如果您需要创建其他版本或别名的触发器,需先在函数详情页的右上角切换到该版本或别名。关于版本和别名的简介,请参见管理版本和管理别名。
LATEST
日志项目
选择已创建的日志项目。
aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****
日志库
选择已创建的日志库,当前触发器会定时从该日志库中订阅数据到函数服务进行自定义加工。
function-log
触发间隔
填写日志服务触发函数运行的时间间隔。
取值范围:[3,600],单位:秒。默认值:60。
60
重试次数
填写单次触发允许的最大重试次数。
取值范围:[0,100]。默认值:3。
3
触发器日志
选择已创建的日志库,日志服务触发函数执行过程的日志会记录到该日志库中。
function-log2
调用参数
如果您想传入自定义参数,可以在此处配置。该参数将作为event的parameter参数传入函数。该参数取值必须是JSON格式的字符串。
默认值为空。
无
角色名称
选择AliyunLogETLRole。
说明如果您第一次创建该类型的触发器,则需要在单击确定后,在弹出的对话框中选择立即授权。
AliyunLogETLRole
创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理。
步骤二:配置函数的入口参数
在函数详情页面,单击函数代码页签,然后单击测试函数右侧
图标,从下拉列表中,选择配置测试参数。
在配置测试参数面板,选择创建新测试事件或编辑已有测试事件页签,填写事件名称和事件内容。然后单击确定。
event是函数计算的入口参数。具体格式如下:
{ "parameter": {}, "source": { "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }, "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****", "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****", "cursorTime": 1529486425 }
参数
描述
本文示例
parameter
您配置触发器时填写的调用参数的值。
无。
source
设置函数读取的日志块信息。
endpoint:日志服务Project所属的阿里云地域。
projectName:日志服务Project名称。
logstoreName:Logstore名称。
shardId:Logstore中一个确定的Shard。
beginCursor:开始消费数据的位置。
endCursor:停止消费数据的位置。
{ "endpoint": "http://cn-hangzhou-intranet.log.aliyuncs.com", "projectName": "aliyun-fc-cn-hangzhou-2238f0df-a742-524f-9f90-976ba457****", "logstoreName": "function-log", "shardId": 0, "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==", "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA==" }
jobName
日志服务ETL Job名字,函数配置的SLS触发器对应一个日志服务的ETL Job。
1f7043ced683de1a4e3d8d70b5a412843d81****
taskId
对于ETL Job而言,taskId是一个确定性的函数调用标识。
c2691505-38da-4d1b-998a-f1d4bb8c****
cursorTime
最后一条日志到达日志服务端的Unix时间戳,单位:秒。
1529486425
步骤三:编写函数并测试
完成创建日志触发器后,您可以编写函数代码并测试以验证代码的正确性。在实际操作过程中,当日志服务收集增量日志时触发该函数,函数计算获取对应日志,然后打印收集的日志。
在函数详情页面,单击函数代码页签,在代码编辑器中编写代码,然后单击部署代码。
本文以Python函数代码为例。以下示例代码可以作为提取大部分逻辑日志的模板。其中
accessKeyId
和accessKey
可以从context
和creds
中获取。# -*- coding: utf-8 -*- import logging import json from aliyun.log import LogClient from time import time # 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。 # 建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。 # 本示例以从context和creds中获取AccessKeyId/AccessKey为例。 def logClient(endpoint, creds): logger = logging.getLogger() logger.info('creds info') logger.info(creds.access_key_id) logger.info(creds.access_key_secret) logger.info(creds.security_token) accessKeyId = creds.access_key_id accessKey = creds.access_key_secret client = LogClient(endpoint, accessKeyId, accessKey) return client def handler(event, context): logger = logging.getLogger() logger.info('start deal SLS data') logger.info(event.decode().encode()) info_arr = json.loads(event.decode()) fetchdata(info_arr['source'],context) return 'hello world' def fetchdata(event,context): logger = logging.getLogger() endpoint = event['endpoint'] creds = context.credentials client = logClient(endpoint, creds) if client == None : logger.info("client create failed") return False project = event['projectName'] logstore = event['logstoreName'] start_cursor = event['beginCursor'] end_cursor = event['endCursor'] loggroup_count = 10 shard_id = event['shardId'] while True: res = client.pull_logs(project, logstore, shard_id, start_cursor, loggroup_count, end_cursor) res.log_print() next_cursor = res.get_next_cursor() if next_cursor == start_cursor : break start_cursor = next_cursor #log_data = res.get_loggroup_json_list() return True
单击函数代码页签的测试函数。
执行完成后,您可以在函数代码页签的上方查看执行结果。
更多信息
除了函数计算控制台,您还可通过以下方式配置触发器:
通过Serverless Devs工具配置触发器。更多操作,请参见Serverless Devs。
通过SDK配置触发器。更多操作,请参见SDK列表。
如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理。