通过函数计算访问表格存储,对表格存储增量数据进行实时计算。

背景信息

函数计算(Function Compute)是一个事件驱动的服务,通过函数计算,无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而您只需根据实际代码运行所消耗的资源进行付费。更多信息,请参见什么是函数计算函数计算示例请参见表格存储函数计算示例

Tablestore Stream是用于获取Tablestore表中增量数据的一个数据通道,通过创建Tablestore触发器,能够实现Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Tablestore表中发生的数据修改。关于Tablestore触发器的更多信息,请参见Tablestore触发器

使用场景

如下图所示,使用函数计算可以实现如下任务。

  • 数据同步:同步表格存储中的实时数据到数据缓存、搜索引擎或者其他数据库实例中。
  • 数据归档:增量归档表格存储中的数据到OSS等做冷备份。
  • 事件驱动:利用触发器触发函数调用IoT套件、云应用API或者做消息通知等。
fig_fuc001

准备工作

注意事项

请确保表格存储数据表与函数计算服务处于同一地域。

步骤一:为数据表开启Stream功能

使用触发器功能需要先在表格存储控制台开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。

  1. 登录表格存储控制台
  2. 概览页面,单击实例名称或在操作列单击实例管理
  3. 实例详情页签的数据表列表区域,单击数据表名称后选择实时消费通道页签或单击fig_001后选择实时消费通道
  4. 实时消费通道页签,单击Stream信息对应的开启
  5. 开启Stream功能对话框,设置日志过期时长,单击开启

    日志过期时长取值为非零整数,单位为小时,最长时长为168小时。

    说明 日志过期时长设置后不能修改,请谨慎设置。

步骤二:配置Tablestore触发器

在函数计算控制台创建Tablestore触发器来处理Tablestore数据表的实时数据流。

  1. 创建函数计算的服务。
    1. 登录函数计算控制台
    2. 在左侧导航栏,单击服务及函数
    3. 在顶部菜单栏,选择地域。
    4. 服务列表页面,单击创建服务
    5. 创建服务面板,填写服务名称和描述,按需设置日志与链路追踪功能。
      关于参数说明的更多信息,请参见管理服务
    6. 单击确定
      创建完成后,在服务列表页面,您可以查看到已创建的服务及其配置信息。
  2. 创建函数计算的函数。
    说明 系统支持使用标准Runtime从零创建、使用自定义运行时平滑迁移Web Server、使用容器镜像创建、使用模板创建等方式创建函数。此处以使用标准Runtime从零创建方式为例介绍,其他创建方式,请分别参见使用容器镜像创建函数使用模板创建函数
    1. 服务列表页面,单击目标服务名称。
    2. 函数管理页面,单击创建函数
    3. 创建函数页面,选择创建函数的方式为使用标准Runtime从零创建
    4. 基本设置区域,根据下表说明填写函数基本信息。
      参数 是否必填 说明 本文示例
      函数名称 填写自定义的函数名称。
      说明 如果不填写名称,函数计算会自动为您创建。
      Function
      运行环境 选择您熟悉的语言,例如Python、Java、PHP、Node.js等。函数计算支持的运行环境,请参见管理函数 Python 3.6
      代码上传方式 默认选择使用示例代码创建函数,函数创建完成后自带函数计算平台为其配置的示例代码。您也可以选择以下三种方式上传您自己的代码。
      • 通过ZIP包上传代码:选择函数代码ZIP包。
      • 通过文件夹上传代码:选择包含函数代码的文件夹。
      • 通过OSS上传代码:选择上传函数代码的Bucket名称文件名称
      使用示例代码
      启动命令 程序的启动命令。如果不配置启动命令,您需要在代码的根目录手动创建一个名称为bootstrap的启动脚本,您的程序通过此脚本来启动。
      说明 仅当您选择使用自定义运行时平滑迁移Web Server时,需配置此参数。
      npm run start
      监听端口 您的代码中的HTTP Server所监听的端口。
      说明 仅当您选择使用自定义运行时平滑迁移Web Server时,需配置此参数。
      9000
      请求处理程序类型 选择请求处理程序类型。取值范围如下:
      • 处理事件请求:通过定时器、调用API/SDK或其他阿里云服务的触发器来触发函数执行。
      • 处理HTTP请求:用于处理HTTP请求或Websocekt请求的函数。如果您的使用场景是Web场景,建议您使用自定义运行时平滑迁移Web Server。具体信息,请参见函数计算控制台

      使用表格存储触发器时,此参数必须设置为处理事件请求

      处理事件请求
      实例类型 选择适合您的实例类型。取值范围如下:
      • 弹性实例
      • 性能实例

      更多信息,请参见实例类型及使用模式。关于各种实例类型的计费详情,请参见计费概述

      弹性实例
      内存规格 设置函数执行内存。
      • 选择输入:在下拉列表中选择所需内存。
      • 手动输入:仅适用于弹性实例,单击手动输入内存大小,可自定义函数执行内存。取值范围[128, 3072],单位为MB。
        说明 输入的内存必须为64 MB的倍数。
      512 MB
      实例并发度 设置函数实例的并发度,具体信息,请参见设置单实例并发数 1
      请求处理程序 设置请求处理程序,函数计算的运行时会加载并调用您的请求处理程序处理请求。 index.handler

      函数创建完成后,在函数管理页面,即可查看已创建的函数。

    5. 配置触发器区域,根据下表说明填写触发器相关参数。
      参数 操作 本文示例
      触发器类型 选择表格存储Tablestore
      说明请求处理程序类型选择处理HTTP请求时,此参数默认为HTTP触发器
      表格存储Tablestore
      名称 自定义填写触发器名称。 Tablestore-trigger
      实例 在下拉列表中选择已创建的Tablestore实例。 distribute-test
      表格 在下拉列表中选择已创建的数据表。 source_data
      角色名称 选择AliyunTableStoreStreamNotificationRole
      说明 如果您第一次创建该类型的触发器,则需要单击确定后,在弹出的对话框中选择立即授权,并根据系统提示完成角色创建和授权。
      AliyunTableStoreStreamNotificationRole
    6. 单击创建
      创建好的触发器会自动显示在触发器管理页签。
      说明 您也可以在表格存储控制台中数据表的触发器管理页签,查看和创建Tablestore触发器。

步骤三:验证测试

函数计算支持函数的在线调试功能,您可以构建触发的Event,并测试代码逻辑是否符合期望。

由于Tablestore触发函数服务的Event是CBOR格式,该数据格式是一种类似JSON的二进制格式,可以按照如下方法进行在线调试。

  1. 编写代码。
    1. 函数管理页面,单击函数名称。
    2. 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码。
      说明 此处以Python函数代码为例介绍。如果您想使用其他运行环境,更多代码示例,请参见表格存储触发函数计算示例

      在代码中使用records = json.loads(event)处理自定义的测试触发事件。

      import logging
      import cbor
      import json
       def get_attribute_value(record, column):
           attrs = record[u'Columns']
           for x in attrs:
               if x[u'ColumnName'] == column:
                   return x['Value']
       def get_pk_value(record, column):
           attrs = record[u'PrimaryKey']
           for x in attrs:
               if x['ColumnName'] == column:
                   return x['Value']
       def handler(event, context):
           logger = logging.getLogger()
           logger.info("Begin to handle event")
           #records = cbor.loads(event)
           records = json.loads(event)
           for record in records['Records']:
               logger.info("Handle record: %s", record)
               pk_0 = get_pk_value(record, "pk_0")
               attr_0 = get_attribute_value(record, "attr_0")
           return 'OK'
  2. 验证测试。
    1. 函数代码页签,单击测试函数右侧的fig_20220419_downlist图标后,从下拉列表中,选择配置测试参数
    2. 配置测试参数对话框,选择创建新测试事件编辑已有测试事件页签,选择事件模板Tablestore,并填写事件名称和事件内容。单击确定
      关于事件内容数据格式的更多信息,请参见附录:数据处理
    3. 单击测试函数,查看是否符合期望。
      当使用records=json.loads(event)测试OK后,可以修改为records = cbor.loads(event)并单击部署代码,当Tablestore中有数据写入时,相关的函数逻辑就会触发。

附录:数据处理

配置测试参数时,需要按照特定的数据格式设置事件内容。

  • 数据格式

    Tablestore触发器使用CBOR格式对增量数据进行编码构成函数计算的Event。增量数据的具体数据格式如下:

    {
        "Version": "string",
        "Records": [
            {
                "Type": "string",
                "Info": {
                    "Timestamp": int64
                },
                "PrimaryKey": [
                    {
                        "ColumnName": "string",
                        "Value": formated_value
                    }
                ],
                "Columns": [
                    {
                        "Type": "string",
                        "ColumnName": "string",
                        "Value": formated_value,
                        "Timestamp": int64
                    }
                ]
            }
        ]
    }
  • 成员定义

    成员定义请参见下表。

    参数 说明
    Version payload版本号,目前为Sync-v1。类型为string。
    Records 数据表中的增量数据行数组。包含如下内部成员:
    • Type:数据行类型,包含PutRow、UpdateRow和DeleteRow。类型为string。
    • Info:包含Timestamp内部成员。Timestamp表示该行的最后修改UTC时间。类型为int64。
    PrimaryKey 主键列数组。包含如下内部成员:
    • ColumnName:主键列名称。类型为string。
    • Value:主键列内容。类型为formated_value,支持integer、string和blob。
    Columns 属性列数组。包括如下内部成员:
    • Type:属性列类型,包含Put、DeleteOneVersion和DeleteAllVersions。类型为string。
    • ColumnName:属性列名称。类型为string。
    • Value:属性列内容。类型为formated_value,支持integer、boolean、double、string和blob。
    • Timestamp:属性列最后修改UTC时间。类型为int64。
  • 数据示例
    {
        "Version": "Sync-v1",
        "Records": [
            {
                "Type": "PutRow",
                "Info": {
                    "Timestamp": 1506416585740836
                },
                "PrimaryKey": [
                    {
                        "ColumnName": "pk_0",
                        "Value": 1506416585881590900
                    },
                    {
                        "ColumnName": "pk_1",
                        "Value": "2017-09-26 17:03:05.8815909 +0800 CST"
                    },
                    {
                        "ColumnName": "pk_2",
                        "Value": 1506416585741000
                    }
                ],
                "Columns": [
                    {
                        "Type": "Put",
                        "ColumnName": "attr_0",
                        "Value": "hello_table_store",
                        "Timestamp": 1506416585741
                    },
                    {
                        "Type": "Put",
                        "ColumnName": "attr_1",
                        "Value": 1506416585881590900,
                        "Timestamp": 1506416585741
                    }
                ]
            }
        ]
    }

附录:使用已有函数计算创建Tablestore触发器

在表格存储控制台,使用已有函数计算的服务以及服务下的函数创建Tablestore触发器。

  1. 登录表格存储控制台
  2. 概览页面,单击实例名称或在操作列单击实例管理
  3. 实例详情页签的数据表列表区域,单击数据表名称。
  4. 触发器管理页签,单击使用已有函数计算
  5. 创建触发器对话框,选择函数计算服务以及服务下的函数,并填写触发器名称,单击确定