函数计算(Function Compute)是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传。

函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而用户只需根据实际代码运行所消耗的资源进行付费。详细信息请参见函数计算帮助文档

Table Store Stream是用于获取Table Store表中增量数据的一个数据通道,通过创建Table Store触发器,能够实现Table Store Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Table Store表中发生的数据修改。详细信息请参见Table Store Stream

本节介绍如何使用函数计算对表格存储增量数据进行实时计算。

配置Table Store触发器

您可以通过控制台创建Table Store触发器来处理Table Store数据表的实时数据流。

  1. 创建开通了Stream流的数据表。
    1. Table Store控制台创建实例。

    2. 在该实例下创建数据表,并且开启数据表的Stream功能。

  2. 创建函数计算的函数。
    1. 函数计算控制台创建服务(Service)。

    2. 在Service的高级配置中,您可以配置服务的角色,用于授权函数进行日志收集,以及在计算函数中继续访问用户的其他资源。详细信息请参见函数计算权限模型

    3. 在新创建的Service下单击新建函数
    4. 函数模板页面选择 空白函数
    5. 触发器配置页面选择不创建任何触发器 ,然后单击下一步
    6. 配置函数信息。

      Table Store触发器会使用CBOR格式将增量数据编码为函数计算的Event,并调用用户函数。下图示例函数就是将编码后的Event重新解码和打印到日志中心,您可以在解码数据后进行任意数据处理。



  3. 创建和测试Table Store触发器。
    1. Table Store控制台新创建的数据表下,选择使用已有函数计算来创建触发器。
    2. 创建过程中需要授权Table Store发送事件通知所需的权限。

      勾选完毕后,实际可以在RAM控制台查看到自动创建的授权角色AliyunTableStoreStreamNotificationRole。

数据处理

  • 数据格式

    Table Store触发器使用CBOR格式对增量数据进行编码构成函数计算的Event。增量数据的具体数据格式如下:

    {
        "Version": "string",
        "Records": [
            {
                "Type": "string",
                "Info": {
                    "Timestamp": int64
                },
                "PrimaryKey": [
                    {
                        "ColumnName": "string",
                        "Value": formated_value
                    }
                ],
                "Columns": [
                    {
                        "Type": "string",
                        "ColumnName": "string",
                        "Value": formated_value,
                        "Timestamp": int64
                    }
                ]
            }
        ]
    }
  • 成员定义
    • Version
      • 含义: payload版本号,目前为Sync-v1
      • 类型: string
    • Records
      • 含义: 数据表中的增量数据行数组
      • 内部成员:
        • Type
          • 含义: 数据行类型,包含PutRow、UpdateRow和DeleteRow
          • 类型: string
        • Info
          • 含义: 数据行基本信息
          • 内部成员:
            • Timestamp
            • 含义: 该行的最后修改UTC时间
            • 类型: int64
        • PrimaryKey
          • 含义: 主键列数组
          • 内部成员
            • ColumnName
              • 含义: 主键列名称
              • 类型: string
            • Value
              • 含义: 主键列内容
              • 类型: formated_value,支持integer、string和blob
        • Colunms
          • 含义: 属性列数组
          • 内部成员
            • Type
              • 含义: 属性列类型,包含Put、DeleteOneVersion和DeleteAllVersions
              • 类型: string
            • ColumnName
              • 含义: 属性列名称
              • 类型: string
            • Value
              • 含义: 属性列内容
              • 类型: formated_value,支持integer、boolean、double、string和blob
            • Timestamp
              • 含义: 属性列最后修改UTC时间
              • 类型: int64
  • 数据示例
    {
        "Version": "Sync-v1",
        "Records": [
            {
                "Type": "PutRow",
                "Info": {
                    "Timestamp": 1506416585740836
                },
                "PrimaryKey": [
                    {
                        "ColumnName": "pk_0",
                        "Value": 1506416585881590900
                    },
                    {
                        "ColumnName": "pk_1",
                        "Value": "2017-09-26 17:03:05.8815909 +0800 CST"
                    },
                    {
                        "ColumnName": "pk_2",
                        "Value": 1506416585741000
                    }
                ],
                "Columns": [
                    {
                        "Type": "Put",
                        "ColumnName": "attr_0",
                        "Value": "hello_table_store",
                        "Timestamp": 1506416585741
                    },
                    {
                        "Type": "Put",
                        "ColumnName": "attr_1",
                        "Value": 1506416585881590900,
                        "Timestamp": 1506416585741
                    }
                ]
            }
        ]
    }

在线调试

函数计算支持函数的在线调试功能,用户能够自己构建触发的Event,并测试代码逻辑是否符合期望。

由于Table Store触发函数服务的Event是CBOR格式,该数据格式是一种类似JSON的二进制格式,可以按照如下方法进行在线调试:

  1. 在代码中同时import cbor和json。
  2. 单击触发事件,选择自定义,将上述数据示例的json文件粘贴至编辑框,根据实际情况修改,并保存。
  3. 代码中先使用 records = json.loads(event) 来处自定义的测试触发事件,单击执行,查看是否符合期望。
  4. 当使用 records=json.loads(event) 测试OK之后,可以修改为 records = cbor.loads(event) 并保存,当Table Store中有数据写入时,相关的函数逻辑就会触发。

示例代码:
import logging
import cbor
import json
def get_attrbute_value(record, column):
    attrs = record[u'Columns']
    for x in attrs:
        if x[u'ColumnName'] == column:
            return x['Value']
def get_pk_value(record, column):
    attrs = record[u'PrimaryKey']
    for x in attrs:
        if x['ColumnName'] == column:
            return x['Value']
def handler(event, context):
    logger = logging.getLogger()
    logger.info("Begin to handle event")
    #records = cbor.loads(event)
    records = json.loads(event)
    for record in records['Records']:
        logger.info("Handle record: %s", record)
        pk_0 = get_pk_value(record, "pk_0")
        attr_0 = get_attrbute_value(record, "attr_0")
    return 'OK'