通过函数计算访问表格存储,对表格存储增量数据进行实时计算。
背景信息
函数计算(Function Compute)是一个事件驱动的服务,通过函数计算,无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而您只需根据实际代码运行所消耗的资源进行付费。更多信息,请参见什么是函数计算。函数计算示例请参见表格存储函数计算示例。 Tablestore Stream是用于获取Tablestore表中增量数据的一个数据通道,通过创建Tablestore触发器,能够实现Stream和函数计算的自动对接,让计算函数中自定义的程序逻辑自动处理Tablestore表中发生的数据修改。关于Tablestore触发器的更多信息,请参见Tablestore触发器。
使用场景
如下图所示,使用函数计算可以实现如下任务。
- 数据同步:同步表格存储中的实时数据到数据缓存、搜索引擎或者其他数据库实例中。
- 数据归档:增量归档表格存储中的数据到OSS等做冷备份。
- 事件驱动:利用触发器触发函数调用IoT套件、云应用API或者做消息通知等。
注意事项
请确保表格存储数据表与函数计算服务处于同一地域。
步骤一:为数据表开启Stream功能
使用触发器功能需要先在表格存储控制台开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。
- 登录表格存储控制台。
- 在概览页面,单击实例名称或在操作列单击实例管理。
- 在实例详情页签的数据表列表区域,单击数据表名称后选择实时消费通道页签或单击
后选择实时消费通道。
- 在实时消费通道页签,单击Stream信息对应的开启。
- 在开启Stream功能对话框,设置日志过期时长,单击开启。
日志过期时长取值为非零整数,单位为小时,最长时长为168小时。
步骤二:配置Tablestore触发器
在函数计算控制台创建Tablestore触发器来处理Tablestore数据表的实时数据流。
- 创建函数计算的服务。
- 登录函数计算控制台。
- 在左侧导航栏,单击服务及函数。
- 在顶部菜单栏,选择地域。
- 在服务列表页面,单击创建服务。
- 在创建服务面板,填写服务名称和描述,按需设置日志与链路追踪功能。
- 单击确定。
创建完成后,在服务列表页面,您可以查看到已创建的服务及其配置信息。
- 创建函数计算的函数。
说明 系统支持使用标准Runtime从零创建、使用自定义运行时平滑迁移Web Server、使用容器镜像创建、使用模板创建等方式创建函数。此处以
使用标准Runtime从零创建方式为例介绍,其他创建方式,请分别参见
使用容器镜像创建函数和
使用模板创建函数。
- 在服务列表页面,单击目标服务名称。
- 在函数管理页面,单击创建函数。
- 在创建函数页面,选择创建函数的方式为使用标准Runtime从零创建。
- 在基本设置区域,根据下表说明填写函数基本信息。
参数 |
是否必填 |
说明 |
本文示例 |
函数名称 |
否 |
填写自定义的函数名称。
|
Function |
运行环境 |
是 |
选择您熟悉的语言,例如Python、Java、PHP、Node.js等。函数计算支持的运行环境,请参见管理函数。
|
Python 3.6 |
代码上传方式 |
否 |
默认选择使用示例代码创建函数,函数创建完成后自带函数计算平台为其配置的示例代码。您也可以选择以下三种方式上传您自己的代码。
- 通过ZIP包上传代码:选择函数代码ZIP包。
- 通过文件夹上传代码:选择包含函数代码的文件夹。
- 通过OSS上传代码:选择上传函数代码的Bucket名称和文件名称。
|
使用示例代码 |
启动命令 |
否 |
程序的启动命令。如果不配置启动命令,您需要在代码的根目录手动创建一个名称为bootstrap的启动脚本,您的程序通过此脚本来启动。
说明 仅当您选择使用自定义运行时平滑迁移Web Server时,需配置此参数。
|
npm run start |
监听端口 |
是 |
您的代码中的HTTP Server所监听的端口。
说明 仅当您选择使用自定义运行时平滑迁移Web Server时,需配置此参数。
|
9000 |
请求处理程序类型 |
是 |
选择请求处理程序类型。取值范围如下:
- 处理事件请求:通过定时器、调用API/SDK或其他阿里云服务的触发器来触发函数执行。
- 处理HTTP请求:用于处理HTTP请求或Websocekt请求的函数。如果您的使用场景是Web场景,建议您使用自定义运行时平滑迁移Web Server。具体信息,请参见函数计算控制台。
使用表格存储触发器时,此参数必须设置为处理事件请求。
|
处理事件请求 |
实例类型 |
是 |
选择适合您的实例类型。取值范围如下:
更多信息,请参见实例类型及使用模式。关于各种实例类型的计费详情,请参见计费概述。
|
弹性实例 |
内存规格 |
是 |
设置函数执行内存。
- 选择输入:在下拉列表中选择所需内存。
- 手动输入:仅适用于弹性实例,单击手动输入内存大小,可自定义函数执行内存。取值范围[128, 3072],单位为MB。
|
512 MB |
实例并发度 |
是 |
设置函数实例的并发度,具体信息,请参见设置单实例并发数。
|
1 |
请求处理程序 |
是 |
设置请求处理程序,函数计算的运行时会加载并调用您的请求处理程序处理请求。 |
index.handler |
函数创建完成后,在函数管理页面,即可查看已创建的函数。
- 在配置触发器区域,根据下表说明填写触发器相关参数。
参数 |
操作 |
本文示例 |
触发器类型 |
选择表格存储Tablestore。
说明 当请求处理程序类型选择处理HTTP请求时,此参数默认为HTTP触发器。
|
表格存储Tablestore |
名称 |
自定义填写触发器名称。 |
Tablestore-trigger |
实例 |
在下拉列表中选择已创建的Tablestore实例。 |
distribute-test |
表格 |
在下拉列表中选择已创建的数据表。 |
source_data |
角色名称 |
选择AliyunTableStoreStreamNotificationRole。
说明 如果您第一次创建该类型的触发器,则需要单击确定后,在弹出的对话框中选择立即授权,并根据系统提示完成角色创建和授权。
|
AliyunTableStoreStreamNotificationRole |
- 单击创建。
创建好的触发器会自动显示在
触发器管理页签。
说明 您也可以在表格存储控制台中数据表的触发器管理页签,查看和创建Tablestore触发器。
步骤三:验证测试
函数计算支持函数的在线调试功能,您可以构建触发的Event,并测试代码逻辑是否符合期望。
由于Tablestore触发函数服务的Event是CBOR格式,该数据格式是一种类似JSON的二进制格式,可以按照如下方法进行在线调试。
- 编写代码。
- 在函数管理页面,单击函数名称。
- 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码。
在代码中使用records = json.loads(event)
处理自定义的测试触发事件。
import logging
import cbor
import json
def get_attribute_value(record, column):
attrs = record[u'Columns']
for x in attrs:
if x[u'ColumnName'] == column:
return x['Value']
def get_pk_value(record, column):
attrs = record[u'PrimaryKey']
for x in attrs:
if x['ColumnName'] == column:
return x['Value']
def handler(event, context):
logger = logging.getLogger()
logger.info("Begin to handle event")
#records = cbor.loads(event)
records = json.loads(event)
for record in records['Records']:
logger.info("Handle record: %s", record)
pk_0 = get_pk_value(record, "pk_0")
attr_0 = get_attribute_value(record, "attr_0")
return 'OK'
- 验证测试。
- 在函数代码页签,单击测试函数右侧的
图标后,从下拉列表中,选择配置测试参数。
- 在配置测试参数对话框,选择创建新测试事件或编辑已有测试事件页签,选择事件模板为Tablestore,并填写事件名称和事件内容。单击确定。
- 单击测试函数,查看是否符合期望。
当使用records=json.loads(event)
测试OK后,可以修改为records = cbor.loads(event)
并单击部署代码,当Tablestore中有数据写入时,相关的函数逻辑就会触发。
附录:数据处理
配置测试参数时,需要按照特定的数据格式设置事件内容。
- 数据格式
Tablestore触发器使用CBOR格式对增量数据进行编码构成函数计算的Event。增量数据的具体数据格式如下:
{
"Version": "string",
"Records": [
{
"Type": "string",
"Info": {
"Timestamp": int64
},
"PrimaryKey": [
{
"ColumnName": "string",
"Value": formated_value
}
],
"Columns": [
{
"Type": "string",
"ColumnName": "string",
"Value": formated_value,
"Timestamp": int64
}
]
}
]
}
- 成员定义
成员定义请参见下表。
参数 |
说明 |
Version |
payload版本号,目前为Sync-v1。类型为string。 |
Records |
数据表中的增量数据行数组。包含如下内部成员:
- Type:数据行类型,包含PutRow、UpdateRow和DeleteRow。类型为string。
- Info:包含Timestamp内部成员。Timestamp表示该行的最后修改UTC时间。类型为int64。
|
PrimaryKey |
主键列数组。包含如下内部成员:
- ColumnName:主键列名称。类型为string。
- Value:主键列内容。类型为formated_value,支持integer、string和blob。
|
Columns |
属性列数组。包括如下内部成员:
- Type:属性列类型,包含Put、DeleteOneVersion和DeleteAllVersions。类型为string。
- ColumnName:属性列名称。类型为string。
- Value:属性列内容。类型为formated_value,支持integer、boolean、double、string和blob。
- Timestamp:属性列最后修改UTC时间。类型为int64。
|
- 数据示例
{
"Version": "Sync-v1",
"Records": [
{
"Type": "PutRow",
"Info": {
"Timestamp": 1506416585740836
},
"PrimaryKey": [
{
"ColumnName": "pk_0",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_1",
"Value": "2017-09-26 17:03:05.8815909 +0800 CST"
},
{
"ColumnName": "pk_2",
"Value": 1506416585741000
}
],
"Columns": [
{
"Type": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
"Timestamp": 1506416585741
},
{
"Type": "Put",
"ColumnName": "attr_1",
"Value": 1506416585881590900,
"Timestamp": 1506416585741
}
]
}
]
}
附录:使用已有函数计算创建Tablestore触发器
在表格存储控制台,使用已有函数计算的服务以及服务下的函数创建Tablestore触发器。
- 登录表格存储控制台。
- 在概览页面,单击实例名称或在操作列单击实例管理。
- 在实例详情页签的数据表列表区域,单击数据表名称。
- 在触发器管理页签,单击使用已有函数计算。
- 在创建触发器对话框,选择函数计算服务以及服务下的函数,并填写触发器名称,单击确定。