使用函数计算可以对表格存储增量数据进行实时计算。

背景信息

函数计算(Function Compute)是一个事件驱动的服务,通过函数计算,无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而您只需根据实际代码运行所消耗的资源进行付费。详细信息参见函数计算帮助文档。函数计算示例请参见表格存储函数计算示例

Tablestore Stream是用于获取Tablestore表中增量数据的一个数据通道,通过创建Tablestore触发器,能够实现Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Tablestore表中发生的数据修改。

使用场景

如下图所示,使用函数计算可以实现如下任务。

  • 数据同步:同步表格存储中的实时数据到数据缓存、搜索引擎或者其他数据库实例中。
  • 数据归档:增量归档表格存储中的数据到OSS等做冷备份。
  • 事件驱动:利用触发器触发函数调用IOT套件、云应用API或者做消息通知等。
fig_fuc001

配置Tablestore触发器

通过控制台创建Tablestore触发器来处理Tablestore数据表的实时数据流。

  1. 创建开通了Stream流的数据表。
    1. 在表格存储控制台创建实例
    2. 在该实例下创建数据表,并且开启数据表的Stream功能。
  2. 创建函数计算的函数。
    1. 登录函数计算控制台
    2. 在左侧导航栏,单击服务 / 函数
    3. 服务 / 函数页面,单击新建函数
    4. 新建函数页面,选择事件函数,单击下一步
    5. 根据实际需要配置函数信息,并单击完成
      fig_newfunction
  3. 配置服务。
    1. 在左侧导航栏,单击服务 / 函数
    2. 服务 / 函数页面,单击服务配置
    3. 服务配置页签,可配置服务的角色,用于授权函数进行日志收集,以及在计算函数中继续访问用户的其他资源。详情请参见函数计算权限模型
  4. 创建和测试Tablestore触发器。
    说明 首次通过表格存储控制台使用函数计算时,请使用老版控制台授予表格存储发送事件通知的权限。
    1. 在表格存储控制台数据表的触发器管理页签,单击使用已有函数计算
    2. 创建触发器对话框,选择函数计算服务和函数计算函数,输入触发器名称。
      说明 当使用老版控制台首次创建触发器时,需要选中授予表格存储发送事件通知的权限,授予表格存储发送事件通知的权限。

      授权后,可以在RAM控制台查看到自动创建的授权角色AliyunTableStoreStreamNotificationRole。

    3. 单击确定

数据处理

  • 数据格式

    Tablestore触发器使用CBOR格式对增量数据进行编码构成函数计算的Event。增量数据的具体数据格式如下:

    {
        "Version": "string",
        "Records": [
            {
                "Type": "string",
                "Info": {
                    "Timestamp": int64
                },
                "PrimaryKey": [
                    {
                        "ColumnName": "string",
                        "Value": formated_value
                    }
                ],
                "Columns": [
                    {
                        "Type": "string",
                        "ColumnName": "string",
                        "Value": formated_value,
                        "Timestamp": int64
                    }
                ]
            }
        ]
    }
  • 成员定义

    成员定义请参见下表。

    参数 说明
    Version payload版本号,目前为Sync-v1。类型为string。
    Records 数据表中的增量数据行数组。包含如下内部成员。
    • Type:数据行类型,包含PutRow、UpdateRow和DeleteRow。类型为string。
    • Info:包含Timestamp内部成员。Timestamp表示该行的最后修改UTC时间。类型为int64。
    PrimaryKey 主键列数组。包含如下内部成员。
    • ColumnName:主键列名称。类型为string。
    • Value:主键列内容。类型为formated_value,支持integer、string和blob。
    Colunms 属性列数组。包括如下内部成员。
    • Type:属性列类型,包含Put、DeleteOneVersion和DeleteAllVersions。类型为string。
    • ColumnName:属性列名称。类型为string。
    • Value:属性列内容。类型为formated_value,支持integer、boolean、double、string和blob。
    • Timestamp:属性列最后修改UTC时间。类型为int64。
  • 数据示例
    {
        "Version": "Sync-v1",
        "Records": [
            {
                "Type": "PutRow",
                "Info": {
                    "Timestamp": 1506416585740836
                },
                "PrimaryKey": [
                    {
                        "ColumnName": "pk_0",
                        "Value": 1506416585881590900
                    },
                    {
                        "ColumnName": "pk_1",
                        "Value": "2017-09-26 17:03:05.8815909 +0800 CST"
                    },
                    {
                        "ColumnName": "pk_2",
                        "Value": 1506416585741000
                    }
                ],
                "Columns": [
                    {
                        "Type": "Put",
                        "ColumnName": "attr_0",
                        "Value": "hello_table_store",
                        "Timestamp": 1506416585741
                    },
                    {
                        "Type": "Put",
                        "ColumnName": "attr_1",
                        "Value": 1506416585881590900,
                        "Timestamp": 1506416585741
                    }
                ]
            }
        ]
    }

在线调试

函数计算支持函数的在线调试功能,您可以构建触发的Event,并测试代码逻辑是否符合期望。

由于Tablestore触发函数服务的Event是CBOR格式,该数据格式是一种类似JSON的二进制格式,可以按照如下方法进行在线调试:

  1. 在代码中同时import CBOR和import JSON。
  2. 服务 / 函数页面,单击函数名称。
  3. 在函数详情页面,单击代码执行
  4. 代码执行页签,单击触发事件,选择自定义,将上述数据示例的JSON文件粘贴至编辑框,根据实际情况修改,并单击确定
  5. 验证测试。
    1. 在代码中使用records = json.loads(event)处理自定义的测试触发事件,单击保存并执行,查看是否符合期望。
    2. 当使用 records=json.loads(event) 测试OK后,可以修改为 records = cbor.loads(event) 并单击保存,当Tablestore中有数据写入时,相关的函数逻辑就会触发。
    fig_functionname_001
    示例代码:
    import logging
    import cbor
    import json
    def get_attrbute_value(record, column):
        attrs = record[u'Columns']
        for x in attrs:
            if x[u'ColumnName'] == column:
                return x['Value']
    def get_pk_value(record, column):
        attrs = record[u'PrimaryKey']
        for x in attrs:
            if x['ColumnName'] == column:
                return x['Value']
    def handler(event, context):
        logger = logging.getLogger()
        logger.info("Begin to handle event")
        #records = cbor.loads(event)
        records = json.loads(event)
        for record in records['Records']:
            logger.info("Handle record: %s", record)
            pk_0 = get_pk_value(record, "pk_0")
            attr_0 = get_attrbute_value(record, "attr_0")
        return 'OK'