全部产品

表格存储触发器

更新时间:2019-07-01 15:59:24

表格存储触发器

阿里云表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。Table Store 与阿里云函数计算(Function Compute,以下简称 FC )集成,将 Table Store 作为事件源接入 FC ,当 Table Store 中数据发生变更的时候,就会以数据变更信息作为参数触发函数的执行。

fc-ots

如上图所示的一个典型用例场景,原始信号源数据存储到原始 Table A,然后根据 A 的数据变更信息通过函数计算做自定义清洗,将清洗后的数据存入 Table B, 用户直接读取清洗 Table B 的数据完成展示,就可以完成一个弹性可伸缩的 serverless web 运用。

用户在首次使用表格存储触发器时请务必先阅读地域限制和注意事项

地域限制

目前支持表格存储触发器的地域

华北 2(北京), 华东 1(杭州), 华东 2(上海), 华南 1(深圳), 亚太东北 1(东京), 亚太东南 1(新加坡), 亚太东南 2(悉尼), 欧洲中部 1(法兰克福), 香港

其他 region 暂不支持

注意事项

注意避免出现循环调用的情况

用户编写函数的时候,注意不要出现以下逻辑: 表格存储 Table A 触发函数 B,函数 B 又更新 Table A 的数据,从而造成函数无限循环调用

函数计算位于vpc网络

编写的函数不可以使用 TableStore 内网 endpoint , 可以使用vpc endpoint: {instance}.{region}.vpc.tablestore.aliyuncs.com 通过内网访问tablestore.

详情可参考 国内 VPC 功能开启可用性影响说明

其他

  • 触发的用户函数执行时间不能超过一分钟
  • 无限重试直到表格存储日志数据过期

表格存储触发器事件定义

数据格式

表格存储触发器使用 CBOR 格式对增量数据进行编码构成函数计算的 Event, 增量数据的具体数据格式如下:

  1. {
  2. "Version": "string",
  3. "Records": [
  4. {
  5. "Type": "string",
  6. "Info": {
  7. "Timestamp": int64
  8. },
  9. "PrimaryKey": [
  10. {
  11. "ColumnName": "string",
  12. "Value": formated_value
  13. }
  14. ],
  15. "Columns": [
  16. {
  17. "Type": "string",
  18. "ColumnName": "string",
  19. "Value": formated_value,
  20. "Timestamp": int64
  21. }
  22. ]
  23. }
  24. ]
  25. }
  • 成员定义
    • Version
      • 含义: payload 版本号,目前为 Sync-v1
      • 类型: string
    • Records
      • 含义: 数据表中的增量数据行数组
      • 内部成员:
        • Type
          • 含义: 数据行类型,包含 PutRow 、UpdateRow 和 DeleteRow
          • 类型: string
        • Info
          • 含义: 数据行基本信息
          • 内部成员:
            • Timestamp
            • 含义: 该行的最后修改 UTC 时间
            • 类型: int64
        • PrimaryKey
          • 含义: 主键列数组
          • 内部成员
            • ColumnName
              • 含义: 主键列名称
              • 类型: string
            • Value
              • 含义: 主键列内容
              • 类型: formated_value,支持 integer、string 和 blob
        • Colunms
          • 含义: 属性列数组
          • 内部成员
            • Type
              • 含义: 属性列类型,包含 Put、DeleteOneVersion 和 DeleteAllVersions
              • 类型: string
            • ColumnName
              • 含义: 属性列名称
              • 类型: string
            • Value
              • 含义: 属性列内容
              • 类型: formated_value,支持 integer、boolean、double、string 和 blob
            • Timestamp
              • 含义: 属性列最后修改UTC时间
              • 类型: int64
  • 数据示例

    1. {
    2. "Version": "Sync-v1",
    3. "Records": [
    4. {
    5. "Type": "PutRow",
    6. "Info": {
    7. "Timestamp": 1506416585740836
    8. },
    9. "PrimaryKey": [
    10. {
    11. "ColumnName": "pk_0",
    12. "Value": 1506416585881590900
    13. },
    14. {
    15. "ColumnName": "pk_1",
    16. "Value": "2017-09-26 17:03:05.8815909 +0800 CST"
    17. },
    18. {
    19. "ColumnName": "pk_2",
    20. "Value": 1506416585741000
    21. }
    22. ],
    23. "Columns": [
    24. {
    25. "Type": "Put",
    26. "ColumnName": "attr_0",
    27. "Value": "hello_table_store",
    28. "Timestamp": 1506416585741
    29. },
    30. {
    31. "Type": "Put",
    32. "ColumnName": "attr_1",
    33. "Value": 1506416585881590900,
    34. "Timestamp": 1506416585741
    35. }
    36. ]
    37. }
    38. ]
    39. }

    配置表格存储触发器:

    触发器示例:tablestore_trigger.yml

    1. triggerConfig:

    触发器参数说明:无

    使用示例

    可以通过函数计算控制台命令行工具fcliSDK三种方式设置函数的表格存储触发器, 下面对这三种方式分别进行介绍:

    下面的函数示例使用 python 编写, 如果想用其他 Runtime,可以参考表格存储触发函数计算示例之 Nodejs/Php/Java Runtime

    示例1: 控制台新建表格存储触发器(推荐)

    本示例展示了使用控制台设置表格存储触发器的方式。可以在创建函数的时候设置触发器,也可以在函数创建完成后再设置触发器。对触发器和创建触发器有困惑的同学可以参考文档使用事件源服务创建触发器

    首先登录 函数计算管理控制台,选择相应的区域和服务。如果还未新建服务,请参考创建服务

    在创建函数时设置触发器

    1. 单击 创建函数,进入创建函数页面,选择 空白函数,单击下一步

    2. 选择 表格存储触发器,并按下图进行配置ots-2

    3. 创建函数,填写相应信息,选择 在线编辑,并粘贴以下 python runtime的函数示例代码,单击 下一步

      1. import logging
      2. import cbor
      3. import json
      4. def get_attrbute_value(record, column):
      5. attrs = record[u'Columns']
      6. for x in attrs:
      7. if x[u'ColumnName'] == column:
      8. return x['Value']
      9. def get_pk_value(record, column):
      10. attrs = record[u'PrimaryKey']
      11. for x in attrs:
      12. if x['ColumnName'] == column:
      13. return x['Value']
      14. def handler(event, context):
      15. logger = logging.getLogger()
      16. logger.info("Begin to handle event")
      17. #records = cbor.loads(event)
      18. records = json.loads(event)
      19. for record in records['Records']:
      20. logger.info("Handle record: %s", record)
      21. pk_0 = get_pk_value(record, "pk_0")
      22. attr_0 = get_attrbute_value(record, "attr_0")
      23. return 'OK'
    4. 配置权限(可选),单击 下一步, 核对信息无误后,单击 创建

    5. 在线调试,由于表格存储触发函数服务的 Event 是 CBOR 格式,该数据格式是一种类似JSON的二进制格式,可以按照如下方法进行在线调试:

      • 在代码中同时 import cbor 和 json。
      • 单击触发事件,选择自定义,将上述数据示例的 json 文件粘贴至编辑框,根据实际情况修改,并保存。
      • 代码中先使用 records = json.loads(event) 来处自定义的测试触发事件,单击执行,查看是否符合期望。
      • 当使用 records=json.loads(event) 测试OK之后,可以修改为 records = cbor.loads(event) 并保存,当 Table Store 中有数据写入时,相关的函数逻辑就会触发。

    在函数创建完成后设置触发器

    1. 选中已有的函数,单击 触发器 > 创建触发器
    2. 在创建触发器页面,按下图配置 表格存储触发器,单击 确定

      注: 如果不是考虑精确粒度的授权,可以直接使用快捷授权

    ots-1

    示例2: Fun 新建 TableStore 触发器(推荐)

    Fun 提供了 TableStore 触发器 的支持,可以实现函数触发器的创建。下面,我们介绍使用 Fun 配置 TableStore 触发器的步骤。

    在项目根目录创建一个 template.yml 文件,并将内容填充为:

    1. ROSTemplateFormatVersion: '2015-09-01'
    2. Transform: 'Aliyun::Serverless-2018-04-03'
    3. Resources:
    4. test-tableStore-service:
    5. Type: 'Aliyun::Serverless::Service'
    6. fun-ots-func:
    7. Type: 'Aliyun::Serverless::Function'
    8. Properties:
    9. Handler: index.handler
    10. Runtime: nodejs8
    11. CodeUri: './'
    12. Events:
    13. my-tablestore-trigger:
    14. Type: TableStore
    15. Properties:
    16. InstanceName: fc-test-inst
    17. TableName: fc_test_tbl

    在项目根目录创建一个 index.js,然后编写相应的逻辑代码就可以了。代码编写完成,直接执行 fun deploy 即可实现部署。另外,Fun 也支持直接创建 TableStore 服务,更完整的示例请参考

    如果想在本地单步调试、运行函数,可以参考 开发函数计算的正确姿势 —— 使用 Fun Local 本地运行与调试

    更多的配置规则 请参考

    Fun 的更多教程 请参考

    示例3: fcli 新建 TableStore 触发器

    首先,创建一个包含 Trigger Config 的 yaml 文件,对应的 yaml 文件内容如下:

    1. triggerConfig:

    在对应的fuction目录下创建触发器:

    1. mkt triggerName serviceName/functionName -t tablestore -c TriggerConfig.yaml -r acs:ram::$accountId:role/aliyuntablestorestreamnotificationrole -s acs:mns:cn-hangzhou:$accountId:/topics/$topicName
    • -r —invocation-role string设置触发角色。
    • -s —source-arn string事件源的资源符号,例如acs:ots:region:$accountId:instance/$instanceName]/table/$tableName。
    • -c —trigger-config string设置trigger配置文件。
    • -t —type string 触发器类型。

    更多fcli使用,请参考fcli开发指南

    示例4: SDK编程新建 TableStore 触发器

    fc-python-sdk为例演示如何通过SDK新建 TableStore 触发器。函数计算还提供 fc-nodejs-sdkfc-java-sdk, fc-php-sdk.

    新建触发器代码

    1. client = fc2.Client(
    2. endpoint='<Your Endpoint>',
    3. accessKeyID='<Your AccessKeyID>',
    4. accessKeySecret='<Your AccessKeySecret>')
    5. service_name = 'serviceName'
    6. function_name = 'functionName'
    7. trigger_name = 'triggerName'
    8. trigger_type = 'tablestore'
    9. source_arn = 'acs:ots:region:<Your Account ID>:instance/<Your instanceName>/table/<Your tableName>'
    10. invocation_role = 'acs:ram::<Your Account ID>:role/<Your Invocation Role>'
    11. trigger_config = {}
    12. client.create_trigger(service_name, function_name, trigger_name, trigger_type, trigger_config, source_arn, invocation_role)

    函数代码

    1. import logging
    2. import cbor
    3. import json
    4. def get_attrbute_value(record, column):
    5. attrs = record[u'Columns']
    6. for x in attrs:
    7. if x[u'ColumnName'] == column:
    8. return x['Value']
    9. def get_pk_value(record, column):
    10. attrs = record[u'PrimaryKey']
    11. for x in attrs:
    12. if x['ColumnName'] == column:
    13. return x['Value']
    14. def handler(event, context):
    15. logger = logging.getLogger()
    16. logger.info("Begin to handle event")
    17. #records = cbor.loads(event)
    18. records = json.loads(event)
    19. for record in records['Records']:
    20. logger.info("Handle record: %s", record)
    21. pk_0 = get_pk_value(record, "pk_0")
    22. attr_0 = get_attrbute_value(record, "attr_0")
    23. return 'OK'

    其他参考

    表格存储使用函数计算

    表格存储使用函数计算做数据清洗

    表格存储触发函数计算示例之 Nodejs/Php/Java Runtime

    如有疑问可留言,或加入函数计算官方客户群(钉钉群号:11721331)