通过函数计算访问表格存储,对表格存储增量数据进行实时计算。
背景信息
函数计算(Function Compute,简称FC)是事件驱动的全托管计算服务。使用函数计算,您无需采购与管理服务器等基础设施,只需编写并上传代码或镜像。函数计算为您准备好计算资源,弹性地、可靠地运行任务,并提供日志查询、性能监控和报警等功能。函数计算示例请参见表格存储函数计算示例。
Tablestore Stream是用于获取Tablestore表中增量数据的一个数据通道,通过创建Tablestore触发器,能够实现Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Tablestore表中发生的数据修改。关于Tablestore触发器的更多信息,请参见Tablestore触发器。
使用场景
使用函数计算可以实现如下任务,如下图所示。
- 数据同步:将表格存储中的实时数据同步到数据缓存、搜索引擎或者其他数据库实例中。 
- 数据归档:增量归档表格存储中的数据到OSS等做冷备份。 
- 事件驱动:利用触发器触发函数调用IoT套件、云应用API或者做消息通知等。 

准备工作
注意事项
- 目前支持使用Tablestore触发器的地域有:华北2(北京)、华东1(杭州)、华东2(上海)、华南1(深圳)、日本(东京)、新加坡、德国(法兰克福)和中国香港。 
- 在创建Tablestore触发器时,仅支持选择与当前函数计算服务处于同一地域内的表格存储实例及其数据表。因此,请确保函数计算服务与所选的数据表在同一地域内。 
- 如果您需要在Tablestore触发器对应的函数中通过内网访问表格存储,请使用表格存储VPC地址。更多信息,请参见什么是VPC?和获取服务地址。 
- 编写函数时,请注意不要出现以下逻辑:表格存储Table A触发函数B,函数B又更新Table A的数据。这种逻辑会造成函数无限调用问题。 
- 触发的函数执行时间不能超过一分钟。 
- 如果函数执行出现异常,函数将无限重试直到Tablestore中的日志数据过期。 说明- 函数执行异常有以下情况: - 函数代码运行异常:函数实例已拉起,因此函数实例运行的时间段内会产生费用。 
- 函数启动异常:函数实例由于启动指令错误等原因未成功拉起,此时不会产生费用。 
 
- 如果函数执行异常,为避免函数无限重试,您可以关闭数据表的Stream功能。在关闭数据表的Stream功能前请确认没有其他触发器在使用该数据表,以防导致其他触发器异常。 
 
步骤一:为数据表开启Stream功能
使用触发器功能需要先在表格存储控制台开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。
- 登录表格存储控制台。 
- 在页面上方,选择地域。 
- 在概览页面,单击实例别名或在实例操作列单击实例管理。 
- 在实例详情页签的数据表列表页签,单击数据表名称后选择实时消费通道页签或单击  后选择实时消费通道。 后选择实时消费通道。
- 在实时消费通道页签,单击Stream信息对应的开启。 
- 在开启Stream功能对话框,设置日志过期时长,单击开启。 - 日志过期时长取值为非零整数,单位为小时,最长时长为168小时。 重要- 日志过期时长设置后不能修改,请谨慎设置。 
步骤二:创建函数和Tablestore触发器
- 创建函数。 - 登录函数计算控制台。 
- 可选:在页面右上角,单击体验函数计算3.0。 说明- 函数计算3.0进行了多项功能改进,本文采用函数计算3.0进行函数计算的使用介绍。 
- 若您已进入新版控制台页面(页面右上角的按钮为返回函数计算2.0),则无需执行此操作。 
 
- 在左侧导航栏,单击函数。 
- 在顶部菜单栏,选择地域,然后在函数页面,单击创建函数。 
- 在创建函数页面,按需选择创建函数的方式,配置以下配置项,然后单击创建。 - 此处以创建事件函数为例,介绍对表格存储中数据修改进行实时计算的操作。 - 基本设置:设置函数名称。 
- 函数代码:配置函数的运行环境和代码相关信息。 - 配置项 - 说明 - 示例 - 运行环境 - 选择您熟悉的语言,例如Python、Java、PHP、Node.js或自定义容器等。 - 自定义容器镜像。 - 此处选择Python 3.9。 - 代码上传方式 - 选择代码上传到函数计算的方式。 - 使用示例代码:默认方式,您可以根据业务需要选择函数计算为您提供的创建函数的示例代码。 
- 通过 ZIP 包上传代码:选择函数代码ZIP包并上传。 
- 通过文件夹上传代码:选择包含函数代码的文件夹并上传。 
- 通过 OSS 上传代码:选择上传函数代码的Bucket 名称和文件名称。 
 - 此处请选择使用示例代码后,在示例代码列表中选择Hello, world! 示例。 
- 高级配置:配置函数的实例相关信息和函数执行超时时间等。 - 配置项 - 说明 - 示例 - 规格方案 - 根据您的业务情况,选择或手动输入合理的vCPU规格和内存规格组合。关于各资源使用的计费详情,请参见计费概述。 说明- vCPU大小(单位为核)与内存大小(单位为GB)的比例必须设置在1:1到1:4之间。 - 0.35核,512 MB - 临时硬盘大小 - 根据您的业务情况,选择临时存储文件的硬盘大小。 - 取值说明如下。 - 512 MB:默认值。不计费,函数计算为您提供512 MB以内的硬盘免费使用额度。 
- 10 GB:按9.5 GB进行计费。 
 说明- 临时硬盘中所有目录可写,共享临时硬盘的空间。 - 临时硬盘大小与底层执行函数的实例生命周期一致,实例被系统回收后,硬盘上的数据也会消失。如您需要对文件进行持久化保存,可以选择挂载NAS或OSS。具体操作,请参见配置NAS文件系统和配置OSS对象存储。 - 512 MB - 执行超时时间 - 设置超时时间。执行超时时间默认为180秒,最长为86400秒。 - 180 - 请求处理程序 - 设置请求处理程序,函数计算的运行时会加载并调用您的请求处理程序处理请求。创建函数的方式选择Web函数时,无需设置此配置项。 说明- 代码上传方式选择使用示例代码时,不需要修改请求处理程序。当选择其他代码上传方式时,则需要根据实际情况修改请求处理程序,否则函数执行时会报错。 - index.handler - 时区 - 选择函数的时区。此处设置函数的时区后,将自动为函数添加一条环境变量TZ,其值为您设置的目标时区。 - UTC - 函数角色 - 函数计算平台会使用这个RAM角色来生成访问您的阿里云资源的临时密钥,并将其传递给您的代码。 重要- 需授予函数角色访问表格存储服务的权限。更多信息,请参见附录:授予函数计算访问表格存储的权限。 - AliyunFCDefaultRole - 允许访问 VPC - 是否允许函数访问VPC内资源。更多信息,请参见配置网络。 - 是 - 专有网络 - 允许访问 VPC选择是时必填。创建新的VPC或在下拉列表中选择要访问的VPC ID。 - fc.auto.create.vpc.1632317**** - 交换机 - 允许访问 VPC选择是时必填。创建新的交换机或在下拉列表中选择交换机ID。 - fc.auto.create.vswitch.vpc-bp1p8248**** - 安全组 - 允许访问 VPC选择是时必填。创建新的安全组或在下拉列表中选择安全组。 - fc.auto.create.SecurityGroup.vsw-bp15ftbbbbd**** - 允许函数默认网卡访问公网 - 是否允许函数通过默认网卡访问公网。关闭后,当前服务中的函数将无法通过函数计算的默认网卡访问公网。 重要- 使用固定公网IP地址功能时,您必须关闭允许函数默认网卡访问公网,否则配置的固定公网IP地址不生效。更多信息,请参见配置固定公网IP地址。 - 是 - 日志功能 - 是否启用阿里云日志服务。取值说明如下: - 启用:函数的执行日志被持久化保存到日志服务,方便您代码调试、故障分析和数据分析等。 说明- 启用日志功能后,函数中打印到 stdout 的内容就会被阿里云日志服务采集到。然后您可以查看函数的执行日志,从而方便您的代码调试、故障分析、数据分析等操作。 - 点击配置日志查看更多详情。 
- 函数计算在后台为您创建的日志服务资源会产生费用,详情请参见按使用功能计费模式计费项。 
 
- 禁用:函数的执行日志将无法通过日志服务存储和查询。 
 - 启用 
- (可选)环境变量:设置函数运行环境中的环境变量。更多信息,请参见配置环境变量。 
 
 
- 创建Tablestore触发器。 - 在函数详情页签,选择配置页签,在左侧导航栏,单击触发器,然后单击创建触发器。 
- 在创建触发器面板,填写相关信息,然后单击确定。 - 配置项 - 操作 - 示例 - 触发器类型 - 选择表格存储 Tablestore。 - 表格存储Tablestore - 名称 - 自定义填写触发器名称。 - Tablestore-trigger - 版本或别名 - 默认值为LATEST,如果您需要创建其他版本或别名的触发器,需先在函数详情页的版本或别名下拉列表选择该版本或别名。关于版本和别名的简介,请参见版本管理和别名管理。 - LATEST - 实例 - 在列表中选择已创建的Tablestore实例。 - d00dd8xm**** - 表格 - 在列表中选择已创建的表格。 - mytable - 角色名称 - 选择AliyunTableStoreStreamNotificationRole。 说明- 如果您第一次创建该类型的触发器,则需要在单击确定后,在弹出的对话框中选择立即授权。 - AliyunTableStoreStreamNotificationRole - 创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理。 
 
步骤三:配置函数的测试参数
- 在函数详情页签的代码页签,单击测试函数右侧的  图标,从下拉列表中,选择配置测试参数。 图标,从下拉列表中,选择配置测试参数。
- 在配置测试参数对话框,选择创建新测试事件页签,选择事件模板为Tablestore,并填写事件名称和事件内容。单击确定。 说明- 如果已创建Tablestore测试事件,您可以在编辑已有测试事件页签,选择已有事件名称。 - 表格存储触发器使用CBOR格式对增量数据进行编码,构成函数计算的event。具体格式如下所示。 - { "Version": "Sync-v1", "Records": [ { "Type": "PutRow", "Info": { "Timestamp": 1506416585740836 }, "PrimaryKey": [ { "ColumnName": "pk_0", "Value": 1506416585881590900 }, { "ColumnName": "pk_1", "Value": "2017-09-26 17:03:05.8815909 +0800 CST" }, { "ColumnName": "pk_2", "Value": 1506416585741000 } ], "Columns": [ { "Type": "Put", "ColumnName": "attr_0", "Value": "hello_table_store", "Timestamp": 1506416585741 }, { "Type": "Put", "ColumnName": "attr_1", "Value": 1506416585881590900, "Timestamp": 1506416585741 } ] } ] }- event参数中不同属性字段的解释请参见下表。 - 参数 - 描述 - Version - Payload版本号。示例如Sync-v1。类型为String。 - Records - 数据表中的增量数据行数组。包含如下内部成员: - Type:数据行类型,包含PutRow、UpdateRow和DeleteRow。类型为String。 
- Info:包含Timestamp内部成员。Timestamp表示该行的最后修改UTC时间。类型为Int64。 
 - PrimaryKey - 主键列数组。包含如下内部成员: - ColumnName:主键列名称。类型为String。 
- Value:主键列内容。类型为formated_value,支持Integer、String和Blob。 
 - Columns - 属性列数组。包括如下内部成员: - Type:属性列类型,包含Put、DeleteOneVersion和DeleteAllVersions。类型为String。 
- ColumnName:属性列名称。类型为String。 
- Value:属性列内容。类型为formated_value,支持Integer、Boolean、Double、String和Blob。 
- Timestamp:属性列最后修改UTC时间。类型为Int64。 
 
步骤四:编写函数代码并测试
完成Tablestore触发器创建后,您可以开始编写函数代码并测试,以验证代码的正确性。在实际操作过程中,Tablestore中有数据更新时会自动触发函数执行。
- 在函数详情页签的代码页签,使用代码编辑器编写代码,然后单击部署代码。 - 本文以Python函数代码为例。如果您想使用其他运行环境,更多代码示例,请参见表格存储触发函数计算示例。 - import logging import cbor import json def get_attribute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] def handler(event, context): logger = logging.getLogger() logger.info("Begin to handle event") # records = cbor.loads(event) records = json.loads(event) for record in records['Records']: logger.info("Handle record: %s", record) pk_0 = get_pk_value(record, "pk_0") attr_0 = get_attribute_value(record, "attr_0") return 'OK'
- 单击测试函数。 - 执行完成后,您可以在函数代码页签的上方查看执行结果。 
- 修改代码并部署代码。 - 当使用 - records=json.loads(event)测试OK后,修改- records代码为- records = cbor.loads(event)。
- 单击部署代码。 
 - 当Tablestore中有数据写入时,相关的函数逻辑就会被触发。 
常见问题
- 如果您无法在某一地域创建Tablestore触发器,请确认支持创建Tablestore触发器的地域,具体请参见注意事项。 
- 如果您在创建Tablestore触发器时无法找到已经创建好的表格存储数据表,请确认表格存储数据表与函数计算服务是否处于同一地域。 
- 使用Tablestore触发器时,总是会报客户端取消的报错,一般是由于客户端调用函数时设置的超时时间小于函数执行时间。建议您将客户端超时时间调大,具体请参见客户端断开连接,报错Invocation canceled by client怎么办?。 
- 如果Tablestore数据表中有新增的数据,但是Tablestore触发器没有被触发,您可以从以下方面进行排查。关于触发器不能正常触发的详细排查方案可参见触发器不能正常触发函数执行怎么办?。 - 确认数据表是否开启了Stream功能,具体请参见为数据表开启Stream功能。 
- 确认在创建触发器时配置的角色是否正确,您可以使用默认的触发器角色 - AliyunTableStoreStreamNotificationRole,具体请参见创建Tablestore触发器。
- 查看是否有函数运行日志,可以根据日志确认是否是函数执行失败。函数执行失败后,会一直重试直到Tablestore中的日志数据过期。 
 
附录:授予函数计算访问表格存储的权限
在使用函数计算提供的功能时,函数计算需要访问表格存储。此时,需要为函数授予相应权限。如果是较粗粒度的授权,可以选择函数计算系统提供的默认服务角色AliyunFCDefaultRole,如果需要更细粒度的授权,则需要为函数授予其他角色及相应的权限策略。
- 采用系统默认角色方式授权。 - 为AliyunFCDefaultRole角色授予AliyunOTSFullAccess权限(管理表格存储服务的权限)。具体操作,请参见为RAM角色授权。 说明- AliyunFCDefaultRole是函数计算的默认服务角色,但不包含表格存储的权限。 - 如果您是初次使用该角色,您需要为该角色添加表格存储权限。 
- 如果您已为该角色添加表格存储权限,则无需执行此操作。 
 
- 采用自定义角色方式授权。 - 具体操作,请参见示例:授予函数计算访问OSS的权限。