Tablestore触发器

阿里云表格存储Tablestore是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务。您可以通过创建Tablestore触发器,将Tablestore作为事件源接入函数计算,当Tablestore中的数据更新时会自动触发函数执行,从而完成对Tablestore变更数据的自定义处理。

使用场景

Tablestore触发器典型的使用场景如下图所示。

image

原始信号源数据存储到原始Table A,当Table A中的数据发生变更时会触发函数自定义清洗数据,将清洗后的数据存入Table B,您可以直接读取Table B的数据统计展示,完成一个弹性可伸缩的Serverless Web应用。

前提条件

使用限制

  • 目前支持Tablestore触发器的地域为:华北2(北京)、华东1(杭州)、华东2(上海)、华南1(深圳)、日本(东京)、新加坡、澳大利亚(悉尼)关停中、德国(法兰克福)、中国香港。

  • 表格存储数据表与函数计算服务要处于同一地域。

  • 若您需要使用内网访问Tablestore触发器对应的函数,可以使用VPC Endpoint:{instance}.{region}.vpc.tablestore.aliyuncs.com,不可以使用Tablestore内网Endpoint。详细信息,请参见开启VPC功能对可用性的影响

  • 触发的函数执行时间不能超过一分钟。

注意事项

  • 编写函数的时候,请注意不要出现以下逻辑:表格存储Table A触发函数B,函数B又更新Table A的数据,从而造成函数无限循环调用。

  • 若函数执行出现异常,函数将无限重试直到Tablestore中的日志数据过期。

步骤一:为数据表开启Stream功能

使用触发器功能需要先在表格存储控制台开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。

  1. 登录表格存储控制台

  2. 在页面上方,选择地域。

  3. 概览页面,单击实例名称或在实例操作列单击实例管理

  4. 实例详情页签的数据表列表页签,单击数据表名称后选择实时消费通道页签或单击fig_001后选择实时消费通道

  5. 实时消费通道页签,单击Stream信息对应的开启

  6. 开启Stream功能对话框,设置日志过期时长,单击开启

    日志过期时长取值为非零整数,单位为小时,最长时长为168小时。

    重要

    日志过期时长设置后不能修改,请谨慎设置。

步骤二:创建Tablestore触发器

  1. 登录函数计算控制台

  2. 在左侧导航栏,单击服务及函数
  3. 在顶部菜单栏,选择地域。

  4. 服务列表页面,找到目标服务,在其右侧操作列单击函数管理

  5. 函数管理页面,单击目标函数名称。

  6. 在函数详情页面,单击触发器管理页签,从版本或别名下拉列表选择要创建触发器的版本或别名,然后单击创建触发器

  7. 创建触发器面板,填写相关信息,然后单击确定

    参数

    操作

    本文示例

    触发器类型

    选择表格存储 Tablestore

    表格存储Tablestore

    名称

    自定义填写触发器名称。

    Tablestore-trigger

    版本或别名

    默认值为LATEST,如果您需要创建其他版本或别名的触发器,需先在函数详情页的版本或别名下拉列表选择该版本或别名。关于版本和别名的简介,请参见管理版本管理别名

    LATEST

    实例

    在列表中选择已创建的Tablestore实例。

    d00dd8xm****

    表格

    在列表中选择已创建的表格。

    mytable

    角色名称

    选择AliyunTableStoreStreamNotificationRole

    说明

    如果您第一次创建该类型的触发器,则需要在单击确定后,在弹出的对话框中选择立即授权

    AliyunTableStoreStreamNotificationRole

    创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理

步骤三:配置函数的入口参数

  1. 在函数详情页面,单击函数代码页签,然后单击测试函数右侧xialatubiao图标,从下拉列表中,选择配置测试参数

  2. 配置测试参数面板,选择创建新测试事件编辑已有测试事件页签,填写事件名称和事件内容。然后单击确定

    表格存储触发器使用CBOR格式对增量数据进行编码,构成函数计算的event。具体格式如下所示。

    {
        "Version": "Sync-v1",
        "Records": [
            {
                "Type": "PutRow",
                "Info": {
                    "Timestamp": 1506416585740836
                },
                "PrimaryKey": [
                    {
                        "ColumnName": "pk_0",
                        "Value": 1506416585881590900
                    },
                    {
                        "ColumnName": "pk_1",
                        "Value": "2017-09-26 17:03:05.8815909 +0800 CST"
                    },
                    {
                        "ColumnName": "pk_2",
                        "Value": 1506416585741000
                    }
                ],
                "Columns": [
                    {
                        "Type": "Put",
                        "ColumnName": "attr_0",
                        "Value": "hello_table_store",
                        "Timestamp": 1506416585741
                    },
                    {
                        "Type": "Put",
                        "ColumnName": "attr_1",
                        "Value": 1506416585881590900,
                        "Timestamp": 1506416585741
                    }
                ]
            }
        ]
    }

    event参数中不同属性字段的解释如下表所示。

    参数

    描述

    Version

    Payload版本号。示例如Sync-v1。类型为String。

    Records

    数据表中的增量数据行数组。包含如下内部成员:

    • Type:数据行类型,包含PutRow、UpdateRow和DeleteRow。类型为String。

    • Info:包含Timestamp内部成员。Timestamp表示该行的最后修改UTC时间。类型为Int64。

    PrimaryKey

    主键列数组。包含如下内部成员:

    • ColumnName:主键列名称。类型为String。

    • Value:主键列内容。类型为formated_value,支持Integer、String和Blob。

    Columns

    属性列数组。包括如下内部成员:

    • Type:属性列类型,包含Put、DeleteOneVersion和DeleteAllVersions。类型为String。

    • ColumnName:属性列名称。类型为String。

    • Value:属性列内容。类型为formated_value,支持Integer、Boolean、Double、String和Blob。

    • Timestamp:属性列最后修改UTC时间。类型为Int64。

步骤四:编写函数代码并测试

完成Tablestore触发器创建后,您可以开始编写函数代码并测试,以验证代码的正确性。在实际操作过程中当Tablestore中有数据更新时会自动触发函数执行。

  1. 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码,然后单击部署代码

    本文以Python函数代码为例。如果您想使用其他运行环境,更多代码示例,请参见表格存储触发函数计算示例

    import logging
    import cbor
    import json
     def get_attribute_value(record, column):
         attrs = record[u'Columns']
         for x in attrs:
             if x[u'ColumnName'] == column:
                 return x['Value']
     def get_pk_value(record, column):
         attrs = record[u'PrimaryKey']
         for x in attrs:
             if x['ColumnName'] == column:
                 return x['Value']
     def handler(event, context):
         logger = logging.getLogger()
         logger.info("Begin to handle event")
         #records = cbor.loads(event)
         records = json.loads(event)
         for record in records['Records']:
             logger.info("Handle record: %s", record)
             pk_0 = get_pk_value(record, "pk_0")
             attr_0 = get_attribute_value(record, "attr_0")
         return 'OK'
  2. 单击函数代码页签的测试函数

    执行完成后,您可以在函数代码页签的上方查看执行结果。

常见问题

  • 如果您无法在某一地域创建Tablestore触发器,请确认支持创建Tablestore触发器的地域,具体请查看使用限制

  • 如果您在创建Tablestore触发器时无法找到已经创建好的表格存储数据表,请确认表格存储数据表与函数计算服务是否处于同一地域。

  • 使用Tablestore触发器时,总是会报客户端取消的报错,一般是由于客户端调用函数时设置的超时时间小于函数执行时间。建议您将客户端超时时间调大,具体请参见客户端断开连接,报错Invocation canceled by client怎么办?

  • 如果Tablestore数据表中有新增的数据,但是Tablestore触发器没有被触发,您可以从以下方面进行排查。关于触发器不能正常触发的详细排查方案可参见触发器不能正常触发函数执行怎么办?

    • 确认数据表是否开启了Stream功能,具体请参见步骤一:为数据表开启Stream功能

    • 确认在创建触发器时配置的角色是否正确,您可以使用默认的触发器角色AliyunTableStoreStreamNotificationRole,具体请参见步骤二:创建Tablestore触发器

    • 查看是否有函数运行日志,可以根据日志确认是否是函数执行失败。函数执行失败后,会一直重试直到Tablestore中的日志数据过期。