表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储。将数据写入到表格存储时,您可以通过函数计算对新增的数据做简单的清洗,将清洗后的数据写回到表格存储的另一种数据表中。同时,您也可以实时访问表格存储中的原始数据和结果数据。
样例场景
假设写入表格存储的为日志数据,且日志数据包括如下三个字段。为了便于日志查询,您需要将level>1的日志写入到表格存储的另一张数据表result中。
字段名称 | 类型 | 说明 |
id | 整型 | 日志ID。 |
level | 整型 | 日志的等级,数值越大表示日志等级越高。 |
message | 字符串 | 日志的内容。 |
步骤一:为数据表开启Stream功能
使用触发器功能需要先在表格存储控制台开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。
登录表格存储控制台。
在页面上方,选择地域。
在概览页面,单击实例名称或在实例操作列单击实例管理。
在实例详情页签的数据表列表页签,单击数据表名称后选择实时消费通道页签或单击后选择实时消费通道。
在实时消费通道页签,单击Stream信息对应的开启。
在开启Stream功能对话框,设置日志过期时长,单击开启。
日志过期时长取值为非零整数,单位为小时,最长时长为168小时。
重要日志过期时长设置后不能修改,请谨慎设置。
步骤二:配置Tablestore触发器
在函数计算控制台创建Tablestore触发器来处理Tablestore数据表的实时数据流。
表格存储只支持使用内置运行时创建的方式创建函数。
进入函数管理页签。
登录函数计算控制台。
在左侧导航栏,单击服务及函数。
在页面上方,选择地域。
在服务列表页面,单击目标服务名称。
在函数管理页签,单击创建函数。
在创建函数页面,配置函数信息。
选择创建函数的方式为使用内置运行时创建。
在基本设置区域,填写函数名称,并选择请求处理程序类型为处理事件请求。
处理事件请求表示通过定时器、调用API/SDK或其他阿里云服务的触发器来触发函数执行。
说明如果未填写函数名称,函数计算会自动为您生成函数名称。
在函数代码区域,根据下表说明配置函数代码信息。
参数
说明
运行环境
选择您熟悉的语言,例如Python、Java、PHP、Node.js等。此处选择Python 3.9。
函数计算支持的运行环境,请参见函数计算支持的函数运行环境列表。
代码上传方式
上传函数代码到函数计算的方式。取值范围如下:
使用示例代码(默认):根据业务需要选择函数计算为您提供的创建函数的示例代码。
通过ZIP包上传代码:选择函数代码ZIP包。
通过文件夹上传代码:选择包含函数代码的文件夹。
通过OSS上传代码:选择上传函数代码的Bucket名称和文件名称。
此处请选择使用示例代码后,在示例代码列表中选择表格存储TableStore触发函数。
在高级配置区域,根据下表说明配置函数的实例相关信息和函数执行超时时间等。
参数
说明
规格方案
选择或手动输入合理的vCPU规格和内存规格组合。关于各资源使用的计费详情,请参见函数计算计费概述。
重要vCPU大小(核)与内存大小(GB)的比值必须在1:1到1:4之间。
临时硬盘大小
函数实例中的用于临时存储文件的磁盘,临时磁盘会被挂载到实例的根目录。
选择临时存储文件的硬盘大小。取值范围如下:
512 MB(默认):不计费,函数计算为您提供512 MB以内的硬盘免费使用额度。
10 GB:按9.5 GB进行计费。
重要临时硬盘大小与底层执行函数的实例生命周期一致,实例被系统回收后,硬盘上的数据也会清除。
实例并发度
单个函数实例可以同时处理的请求数。更多信息,请参见设置实例并发度。
执行超时时间
执行函数的超时时间。默认值为60秒,最大值为86400秒。
如果函数在该时间内未能成功执行,则函数计算会返回超时错误。
请求处理程序
函数计算运行时会加载并调用您的请求处理程序处理请求。
时区
函数计算的运行环境默认使用UTC时间,您可以通过配置TZ环境变量进行修改。修改时区后,函数计算将会自动为函数添加TZ环境变量,TZ环境变量的值为您设置的目标时区。
(可选)在环境变量区域,设置函数运行环境中的环境变量。更多信息,请参见环境变量。
在触发器配置区域,根据下表说明填写触发器相关参数。
参数
说明
触发器类型
选择表格存储Tablestore。
名称
自定义填写触发器名称。
实例
在下拉列表中选择已创建的Tablestore实例。
表格
在下拉列表中选择已创建的数据表。
角色名称
选择AliyunTableStoreStreamNotificationRole。
说明如果您第一次创建该类型的触发器,则需要单击创建后,在弹出的对话框中选择立即授权,并根据系统提示完成角色创建和授权。
单击创建。
创建好的触发器会自动显示在触发器管理页签。
说明您也可以在表格存储控制台中数据表的触发器管理页签,查看和创建Tablestore触发器。
步骤三:验证测试
创建触发器后,通过在表格存储中写入和查询数据验证数据清洗是否成功。
编写代码。
在函数管理页面,单击函数名称。
在函数详情页面,单击函数代码页签,在代码编辑器中编写代码。
此处以Python函数代码为例介绍。其中INSTANCE_NAME(表格存储的实例名称)、REGION(使用的地域)、ENDPOINT(服务地址)需要根据情况进行修改。
#!/usr/bin/env python # -*- coding: utf-8 -*- import cbor import json import tablestore as ots INSTANCE_NAME = 'distribute-test' REGION = 'cn-shanghai' ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION) RESULT_TABLENAME = 'result' def _utf8(input): return str(bytearray(input, "utf-8")) def get_attrbute_value(record, column): attrs = record[u'Columns'] for x in attrs: if x[u'ColumnName'] == column: return x['Value'] def get_pk_value(record, column): attrs = record[u'PrimaryKey'] for x in attrs: if x['ColumnName'] == column: return x['Value'] #由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限。 def get_ots_client(context): creds = context.credentials client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken) return client def save_to_ots(client, record): id = int(get_pk_value(record, 'id')) level = int(get_attrbute_value(record, 'level')) msg = get_attrbute_value(record, 'message') pk = [(_utf8('id'), id),] attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),] row = ots.Row(pk, attr) client.put_row(RESULT_TABLENAME, row) def handler(event, context): records = cbor.loads(event) #records = json.loads(event) client = get_ots_client(context) for record in records['Records']: level = int(get_attrbute_value(record, 'level')) if level > 1: save_to_ots(client, record) else: print ("level <= 1, ignore.")
向source_data数据表中写入数据,依次填入id、level和message信息,并在result表中查询清洗后的数据。
当向soure_data表中写入level>1的数据时,数据会同步到result表中。
当向soure_data表中写入level<=1的数据时,数据不会同步到result表中。
- 本页导读 (1)