您可以通过创建日志服务SLS触发器(以下简称SLS触发器)将日志服务SLS与函数计算连接起来,当有新日志产生时触发函数执行,对日志进行处理。本文以日志服务SLS调用函数计算,函数计算获取日志并打印为例,介绍如何在函数计算控制台上完成SLS触发器的配置,包括创建触发器、编写函数和测试函数。

示例场景

您可以配置一个SLS触发器,该触发器将定时获取更新的数据并触发函数执行,增量消费日志服务Logstore中的数据,在函数里完成自定义加工任务(例如,数据清洗和加工)以及将数据投递给第三方服务。本示例中只演示如何获取日志数据并打印。

说明 用于数据加工的函数可以是日志服务提供的模板,也可以是您的自定义函数。

前提条件

创建触发器

  1. 登录函数计算控制台
  2. 在左侧导航栏,单击服务及函数
  3. 在顶部菜单栏,选择地域。
  4. 服务列表页面,找到目标服务,在其右侧操作列单击函数管理
  5. 函数管理页面,单击目标函数名称。
    hanshuguanli
  6. 在函数详情页面,单击触发器管理页签,从版本或别名下拉列表选择要创建触发器的版本或别名,然后单击创建触发器
    创建触发器
  7. 在创建触发器面板,填写相关信息。然后单击确定
    创建触发器面板-sls
    参数 操作 本文示例
    触发器类型 选择日志服务 SLS 日志服务SLS
    名称 填写自定义的触发器名称。 log-trigger
    版本或别名 默认值为LATEST,如果您需要创建其他版本或别名的触发器,需先在函数详情页的右上角切换到该版本或别名。关于版本简介,请参见版本简介 LATEST
    日志项目 选择已创建的日志项目。 aliyun-fc-cn-chengdu-2238f0df-a742-524f-9f90-976ba457****
    日志仓库 选择已创建的日志仓库,当前触发器会定时从该日志仓库中订阅数据到函数服务进行自定义加工。 function-log
    触发间隔 填写日志服务触发函数运行的时间间隔。

    取值范围:[3,600],单位:秒。默认值:60。

    60
    重试次数 填写单次触发允许的最大重试次数。

    取值范围:[0,100]。默认值:3。

    3
    触发器日志 选择已创建的日志仓库,日志服务触发函数执行过程的日志会记录到该日志仓库中。 function-log2
    调用参数 如果您想传入自定义参数,可以在此处配置。该参数将作为event的parameter参数传入函数。该参数取值必须是JSON格式的字符串。

    默认值为空。

    角色名称 选择AliyunLogETLRole
    说明 如果您第一次创建该类型的触发器,则需要在单击确定后,在弹出的对话框中选择立即授权
    AliyunLogETLRole

    创建完成后,在触发器名称列表中显示已创建的触发器。

    log-trigger

编写函数

完成日志触发器创建后,您可以开始编写函数代码。日志服务收集增量日志时触发该函数,函数计算获取对应日志,然后打印收集的日志。

在函数详情页面,单击函数代码页签,在代码编辑器中编写代码。
本文以Python函数代码为例。以下示例代码可以作为提取大部分逻辑日志的模板。其中accessKeyIdaccessKey可以填写在代码内,也可以从contextcreds中获取。
# -*- coding: utf-8 -*-
import logging
import json
from aliyun.log import LogClient
from time import time

def logClient(endpoint, creds):
  logger = logging.getLogger()
  logger.info('creds info')
  logger.info(creds.access_key_id)
  logger.info(creds.access_key_secret)
  logger.info(creds.security_token)
  accessKeyId = 'your accessKeyId'
  accessKey = 'your accessKeyId scr'
  client = LogClient(endpoint, accessKeyId, accessKey)
  return client

def handler(event, context):
  logger = logging.getLogger()
  logger.info('start deal SLS data')
  logger.info(event.decode().encode())
  info_arr = json.loads(event.decode())
  fetchdata(info_arr['source'],context)
  return 'hello world'

 def fetchdata(event,context):
  logger = logging.getLogger()
  endpoint = event['endpoint']
  creds = context.credentials
  client = logClient(endpoint, creds)
  if client == None :
      logger.info("client create failed")
      return False
  project = event['projectName']
  logstore = event['logstoreName']
  start_cursor = event['beginCursor']
  end_cursor = event['endCursor']
  loggroup_count = 10
  shard_id = event['shardId']
  while True:
      res = client.pull_logs(project, logstore, shard_id, start_cursor, loggroup_count, end_cursor)
      res.log_print()
      next_cursor = res.get_next_cursor()
      if next_cursor == start_cursor :
          break
      start_cursor = next_cursor

    #log_data =  res.get_loggroup_json_list()
  return True                        

测试函数

完成函数编写后,您需要调试函数以验证代码的正确性。在实际操作过程中当日志仓库中有新增日志进入时会自动触发函数的执行。

  1. 在函数详情页面,单击函数代码页签,然后单击xialatubiao图标,从下拉列表中,选择配置测试参数
  2. 配置测试参数对话框,选择创建新测试事件编辑已有测试事件页签,填写事件名称和事件内容。然后单击确定
    event是函数计算的入口参数。具体格式如下:
    {
        "parameter": {},
        "source": {
            "endpoint": "http://cn-shanghai-intranet.log.aliyuncs.com",
            "projectName": "log-com",
            "logstoreName": "log-en",
            "shardId": 0,
            "beginCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==",
            "endCursor": "MTUyOTQ4MDIwOTY1NTk3ODQ2NA=="
        },
        "jobName": "1f7043ced683de1a4e3d8d70b5a412843d81****",
        "taskId": "c2691505-38da-4d1b-998a-f1d4bb8c****",
        "cursorTime": 1529486425
    }                       
    参数 描述 本文示例
    parameter 您配置触发器时填写的调用参数的值。 无。
    source 设置函数读取的日志块信息。
    • endpoint:日志服务Project所属的阿里云地域。
    • projectName:日志服务Project名称。
    • logstoreName:Logstore名称。
    • shardId:Logstore中一个确定的Shard。
    • beginCursor:开始消费数据的位置。
    • endCursor:停止消费数据的位置。
    {
        "endpoint":"http://cn-shanghai-intranet.log.aliyuncs.com",
        "projectName":"log-com",
        "logstoreName":"log-en",
        "shardId":0,
        "beginCursor":"MTUyOTQ4MDIwOTY1NTk3ODQ2Mw==",
        "endCursor":"MTUyOTQ4MDIwOTY1NTk3ODQ2NA=="
    }
    jobName 日志服务ETL Job名字,函数配置的SLS触发器对应一个日志服务的ETL Job。 1f7043ced683de1a4e3d8d70b5a412843d81****
    taskId 对于ETL Job而言,taskId是一个确定性的函数调用标识。 c2691505-38da-4d1b-998a-f1d4bb8c****
    cursorTime 最后一条日志到达日志服务端的unix_timestamp。 1529486425
  3. 在函数详情页面,单击函数代码页签,然后单击测试函数

验证结果

函数代码页签,您可以看到执行成功的指示。

result

更多信息

除了函数计算控制台,您还可通过以下方式配置触发器:
  • 通过Serverless Devs工具配置触发器。更多操作,请参见Serverless Devs
  • 通过SDK配置触发器。更多操作,请参见SDK列表

如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理