全部产品
阿里云办公

使用函数计算

更新时间:2017-11-08 14:40:37

本节介绍如何使用函数计算对表格存储增量数据进行实时计算。

函数计算(Function Compute)是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而用户只需根据实际代码运行所消耗的资源进行付费。详细信息请参见函数计算帮助文档

Table Store Stream是用于获取Table Store表中增量数据的一个数据通道,通过创建Table Store触发器,能够实现Table Store Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Table Store表中发生的数据修改。详细信息请参见Table Store Stream

配置Table Store触发器

您可以通过控制台创建Table Store触发器来处理Table Store数据表的实时数据流。

  1. 创建开通了Stream流的数据表。

    1. Table Store控制台创建实例。

      instance

    2. 在该实例下创建数据表,并且开启数据表的Stream功能。

      table

  2. 创建函数计算的函数。

    1. 函数计算控制台创建服务(Service)。

      service

    2. 在Service的高级配置中,您可以配置服务的角色,用于授权函数进行日志收集,以及在计算函数中继续访问用户的其他资源。详细信息请参见函数计算权限模型

      role

    3. 在新创建的Service下单击新建函数

    4. 函数模板页面选择 空白函数

    5. 触发器配置页面选择 不创建任何触发器 ,然后单击下一步

    6. 配置函数信息。

      Table Store触发器会使用CBOR格式将增量数据编码为函数计算的Event,并调用用户函数。下图示例函数就是将编码后的Event重新解码和打印到日志中心,您可以在解码数据后进行任意数据处理。

      fucntion1

  3. 创建和测试Table Store触发器。

    1. Table Store控制台新创建的数据表下,选择使用已有函数计算来创建触发器。

    2. 创建过程中需要授权Table Store发送事件通知所需的权限。

      勾选完毕后,实际可以在RAM控制台查看到自动创建的授权角色AliyunTableStoreStreamNotificationRole

      trigger

数据处理

数据格式

Table Store触发器使用CBOR格式对增量数据进行编码构成函数计算的Event。增量数据的具体数据格式如下:

  1. {
  2. "Version": "string",
  3. "Records": [
  4. {
  5. "Type": "string",
  6. "Info": {
  7. "Timestamp": int64
  8. },
  9. "PrimaryKey": [
  10. {
  11. "ColumnName": "string",
  12. "Value": formated_value
  13. }
  14. ],
  15. "Columns": [
  16. {
  17. "Type": "string",
  18. "ColumnName": "string",
  19. "Value": formated_value,
  20. "Timestamp": int64
  21. }
  22. ]
  23. }
  24. ]
  25. }

成员定义

  • Version
    • 含义: payload版本号,目前为Sync-v1
    • 类型: string
  • Records
    • 含义: 数据表中的增量数据行数组
    • 内部成员:
      • Type
        • 含义: 数据行类型,包含PutRow、UpdateRow和DeleteRow
        • 类型: string
      • Info
        • 含义: 数据行基本信息
        • 内部成员:
          • Timestamp
          • 含义: 该行的最后修改UTC时间
          • 类型: int64
      • PrimaryKey
        • 含义: 主键列数组
        • 内部成员
          • ColumnName
            • 含义: 主键列名称
            • 类型: string
          • Value
            • 含义: 主键列内容
            • 类型: formated_value,支持integer、string和blob
      • Colunms
        • 含义: 属性列数组
        • 内部成员
          • Type
            • 含义: 属性列类型,包含Put、DeleteOneVersion和DeleteAllVersions
            • 类型: string
          • ColumnName
            • 含义: 属性列名称
            • 类型: string
          • Value
            • 含义: 属性列内容
            • 类型: formated_value,支持integer、boolean、double、string和blob
          • Timestamp
            • 含义: 属性列最后修改UTC时间
            • 类型: int64

数据示例

  1. {
  2. "Version": "Sync-v1",
  3. "Records": [
  4. {
  5. "Type": "PutRow",
  6. "Info": {
  7. "Timestamp": 1506416585740836
  8. },
  9. "PrimaryKey": [
  10. {
  11. "ColumnName": "pk_0",
  12. "Value": 1506416585881590900
  13. },
  14. {
  15. "ColumnName": "pk_1",
  16. "Value": "2017-09-26 17:03:05.8815909 +0800 CST"
  17. },
  18. {
  19. "ColumnName": "pk_2",
  20. "Value": 1506416585741000
  21. }
  22. ],
  23. "Columns": [
  24. {
  25. "Type": "Put",
  26. "ColumnName": "attr_0",
  27. "Value": "hello_table_store",
  28. "Timestamp": 1506416585741
  29. },
  30. {
  31. "Type": "Put",
  32. "ColumnName": "attr_1",
  33. "Value": 1506416585881590900,
  34. "Timestamp": 1506416585741
  35. }
  36. ]
  37. }
  38. ]
  39. }

在线调试

函数计算支持函数的在线调试功能,用户能够自己构建触发的Event,并测试代码逻辑是否符合期望。

由于Table Store触发函数服务的Event是CBOR格式,该数据格式是一种类似JSON的二进制格式,可以按照如下方法进行在线调试:

  1. 在代码中同时import cbor和json。

  2. 单击触发事件,选择自定义,将上述数据示例的json文件粘贴至编辑框,根据实际情况修改,并保存。

  3. 代码中先使用 records = json.loads(event) 来处自定义的测试触发事件,单击执行,查看是否符合期望。

  4. 当使用 records=json.loads(event) 测试OK之后,可以修改为 records = cbor.loads(event) 并保存,当Table Store中有数据写入时,相关的函数逻辑就会触发。

fc调试

示例代码:

  1. import logging
  2. import cbor
  3. import json
  4. def get_attrbute_value(record, column):
  5. attrs = record[u'Columns']
  6. for x in attrs:
  7. if x[u'ColumnName'] == column:
  8. return x['Value']
  9. def get_pk_value(record, column):
  10. attrs = record[u'PrimaryKey']
  11. for x in attrs:
  12. if x['ColumnName'] == column:
  13. return x['Value']
  14. def handler(event, context):
  15. logger = logging.getLogger()
  16. logger.info("Begin to handle event")
  17. #records = cbor.loads(event)
  18. records = json.loads(event)
  19. for record in records['Records']:
  20. logger.info("Handle record: %s", record)
  21. pk_0 = get_pk_value(record, "pk_0")
  22. attr_0 = get_attrbute_value(record, "attr_0")
  23. return 'OK'
本文导读目录