使用函数计算清洗数据

表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储。将数据写入到表格存储时,您可以通过函数计算对新增的数据做简单的清洗,将清洗后的数据写回到表格存储的另一种数据表中。同时,您也可以实时访问表格存储中的原始数据和结果数据。

样例场景

假设写入表格存储的为日志数据,且日志数据包括如下三个字段。为了便于日志查询,您需要将level>1的日志写入到表格存储的另一张数据表result中。

字段名称

类型

说明

id

整型

日志ID。

level

整型

日志的等级,数值越大表示日志等级越高。

message

字符串

日志的内容。

步骤一:为数据表开启Stream功能

使用触发器功能需要先在表格存储控制台开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。

  1. 登录表格存储控制台

  2. 在页面上方,选择地域。

  3. 概览页面,单击实例名称或在实例操作列单击实例管理

  4. 实例详情页签的数据表列表页签,单击数据表名称后选择实时消费通道页签或单击fig_001后选择实时消费通道

  5. 实时消费通道页签,单击Stream信息对应的开启

  6. 开启Stream功能对话框,设置日志过期时长,单击开启

    日志过期时长取值为非零整数,单位为小时,最长时长为168小时。

    重要

    日志过期时长设置后不能修改,请谨慎设置。

步骤二:创建函数和Tablestore触发器

  1. 创建函数。

    1. 登录函数计算控制台

    2. 可选:在页面右上角,单击体验函数计算3.0

      说明
      • 函数计算3.0进行了多项功能改进,本文采用函数计算3.0进行函数计算的使用介绍。

      • 若您已进入新版控制台页面(页面右上角的按钮为返回函数计算2.0),则无需执行此操作。

    3. 在左侧导航栏,单击函数

    4. 在顶部菜单栏,选择地域,然后在函数页面,单击创建函数

    5. 创建函数页面,按需选择创建函数的方式,配置以下配置项,然后单击创建

      此处以创建事件函数为例,介绍对表格存储中数据修改进行实时计算的操作。

      说明

      使用函数计算时,您可以通过创建事件函数Web函数任务函数实现对表格存储中数据的处理。更多信息,请参见函数类型选型

      • 如果要Tablestore中的数据变更自动触发数据处理,请创建事件函数。具体操作,请参见创建事件函数

      • 如果要通过特定HTTP请求触发数据处理,请创建Web函数。具体操作,请参见创建Web函数

      • 如果要定时或异步触发数据处理,请创建任务函数。具体操作,请参见创建任务函数

      • 基本设置:设置函数名称

      • 函数代码:配置函数的运行环境和代码相关信息。

        配置项

        说明

        示例

        运行环境

        选择您熟悉的语言,例如Python、Java、PHP、Node.js或自定义容器等。

        自定义容器镜像。

        此处选择Python 3.9

        代码上传方式

        选择代码上传到函数计算的方式。

        • 使用示例代码:默认方式,您可以根据业务需要选择函数计算为您提供的创建函数的示例代码。

        • 通过 ZIP 包上传代码:选择函数代码ZIP包并上传。

        • 通过文件夹上传代码:选择包含函数代码的文件夹并上传。

        • 通过 OSS 上传代码:选择上传函数代码的Bucket 名称文件名称

        此处请选择使用示例代码后,在示例代码列表中选择Hello, world! 示例

      • 高级配置:配置函数的实例相关信息和函数执行超时时间等。

        配置项

        说明

        示例

        规格方案

        根据您的业务情况,选择或手动输入合理的vCPU规格内存规格组合。关于各资源使用的计费详情,请参见计费概述

        说明

        vCPU大小(单位为核)与内存大小(单位为GB)的比例必须设置在1:11:4之间。

        0.35核,512 MB

        临时硬盘大小

        根据您的业务情况,选择临时存储文件的硬盘大小。

        取值说明如下。

        • 512 MB:默认值。不计费,函数计算为您提供512 MB以内的硬盘免费使用额度。

        • 10 GB:按9.5 GB进行计费。

        说明

        临时硬盘中所有目录可写,共享临时硬盘的空间。

        临时硬盘大小与底层执行函数的实例生命周期一致,实例被系统回收后,硬盘上的数据也会消失。如您需要对文件进行持久化保存,可以选择挂载NASOSS。具体操作,请参见配置NAS文件系统配置OSS对象存储

        512 MB

        执行超时时间

        设置超时时间。执行超时时间默认为180秒,最长为86400秒。

        180

        请求处理程序

        设置请求处理程序,函数计算的运行时会加载并调用您的请求处理程序处理请求。创建函数的方式选择Web函数时,无需设置此配置项。

        说明

        代码上传方式选择使用示例代码时,不需要修改请求处理程序。当选择其他代码上传方式时,则需要根据实际情况修改请求处理程序,否则函数执行时会报错。

        index.handler

        时区

        选择函数的时区。此处设置函数的时区后,将自动为函数添加一条环境变量TZ,其值为您设置的目标时区。

        UTC

        函数角色

        函数计算平台会使用这个RAM角色来生成访问您的阿里云资源的临时密钥,并传递给您的代码。

        重要

        需授予函数角色访问表格存储服务的权限。更多信息,请参见附录:授予函数计算访问表格存储的权限

        AliyunFCDefaultRole

        允许访问 VPC

        是否允许函数访问VPC内资源。更多信息,请参见配置网络

        专有网络

        允许访问 VPC选择时必填。创建新的VPC或在下拉列表中选择要访问的VPC ID。

        fc.auto.create.vpc.1632317****

        交换机

        允许访问 VPC选择时必填。创建新的交换机或在下拉列表中选择交换机ID。

        fc.auto.create.vswitch.vpc-bp1p8248****

        安全组

        允许访问 VPC选择时必填。创建新的安全组或在下拉列表中选择安全组。

        fc.auto.create.SecurityGroup.vsw-bp15ftbbbbd****

        允许函数默认网卡访问公网

        是否允许函数可以通过默认网卡访问公网。关闭后,当前服务中的函数将无法通过函数计算的默认网卡访问公网。

        重要

        使用固定公网IP地址功能时,您必须关闭允许函数默认网卡访问公网,否则配置的固定公网IP地址不生效。更多信息,请参见配置固定公网IP地址

        日志功能

        是否启用阿里云日志服务。取值说明如下:

        • 启用:函数的执行日志被持久化保存到日志服务,方便您代码调试、故障分析和数据分析等。

          说明

          启用日志功能后,函数中打印到 stdout 的内容就会被阿里云日志服务采集到。然后您可以查看函数的执行日志,从而方便您的代码调试、故障分析、数据分析等操作。

        • 禁用:函数的执行日志将无法通过日志服务存储和查询。

        启用

      • (可选)环境变量:设置函数运行环境中的环境变量。更多信息,请参见配置环境变量

  2. 创建Tablestore触发器。

    1. 函数详情页签,选择配置页签,在左侧导航栏,单击触发器,然后单击创建触发器

    2. 在创建触发器面板,填写相关信息,然后单击确定

      配置项

      操作

      本文示例

      触发器类型

      选择表格存储 Tablestore

      表格存储Tablestore

      名称

      自定义填写触发器名称。

      Tablestore-trigger

      版本或别名

      默认值为LATEST,如果您需要创建其他版本或别名的触发器,需先在函数详情页的版本或别名下拉列表选择该版本或别名。关于版本和别名的简介,请参见版本管理别名管理

      LATEST

      实例

      在列表中选择已创建的Tablestore实例。

      d00dd8xm****

      表格

      在列表中选择已创建的表格。

      mytable

      角色名称

      选择AliyunTableStoreStreamNotificationRole

      说明

      如果您第一次创建该类型的触发器,则需要在单击确定后,在弹出的对话框中选择立即授权

      AliyunTableStoreStreamNotificationRole

      创建完成后,在触发器名称列表中显示已创建的触发器。如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理

步骤三:验证测试

创建触发器后,通过在表格存储中写入和查询数据验证数据清洗是否成功。

  1. 函数详情页签的代码页签,使用代码编辑器中编写代码。

    此处以Python函数代码为例介绍。其中INSTANCE_NAME(表格存储的实例名称)、REGION(使用的地域)、ENDPOINT(服务地址)和RESULT_TABLENAME(结果表)需要根据情况进行修改。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import cbor
    import json
    import tablestore as ots
    
    INSTANCE_NAME = 'distribute-test'
    REGION = 'cn-shanghai'
    ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com' % (INSTANCE_NAME, REGION)
    RESULT_TABLENAME = 'result'
    
    
    def get_attrbute_value(record, column):
        attrs = record[u'Columns']
        for x in attrs:
            if x[u'ColumnName'] == column:
                return x['Value']
    
    
    def get_pk_value(record, column):
        attrs = record[u'PrimaryKey']
        for x in attrs:
            if x['ColumnName'] == column:
                return x['Value']
    
    
    # 由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限。
    def get_ots_client(context):
        creds = context.credentials
        client = ots.OTSClient(ENDPOINT, creds.access_key_id, creds.access_key_secret, INSTANCE_NAME,
                               sts_token=creds.security_token)
        return client
    
    
    def save_to_ots(client, record):
        id = int(get_pk_value(record, 'id'))
        level = int(get_attrbute_value(record, 'level'))
        msg = get_attrbute_value(record, 'message')
        pk = [('id', id), ]
        attr = [('level', level), ('message', msg), ]
        row = ots.Row(pk, attr)
        client.put_row(RESULT_TABLENAME, row)
    
    
    def handler(event, context):
        records = cbor.loads(event)
        # records = json.loads(event)
        client = get_ots_client(context)
        for record in records['Records']:
            level = int(get_attrbute_value(record, 'level'))
            if level > 1:
                save_to_ots(client, record)
            else:
                print("level <= 1, ignore.")
    
  2. source_data数据表中写入数据,依次填入id、levelmessage信息,并在result表中查询清洗后的数据。

    • 当向source_data表中写入level>1的数据时,数据会同步到result表中。

    • 当向source_data表中写入level<=1的数据时,数据不会同步到result表中。

常见问题

  • 如果您无法在某一地域创建Tablestore触发器,请确认支持创建Tablestore触发器的地域,具体请参见注意事项

  • 如果您在创建Tablestore触发器时无法找到已经创建好的表格存储数据表,请确认表格存储数据表与函数计算服务是否处于同一地域。

  • 使用Tablestore触发器时,总是会报客户端取消的报错,一般是由于客户端调用函数时设置的超时时间小于函数执行时间。建议您将客户端超时时间调大,具体请参见客户端断开连接,报错Invocation canceled by client怎么办?

  • 如果Tablestore数据表中有新增的数据,但是Tablestore触发器没有被触发,您可以从以下方面进行排查。关于触发器不能正常触发的详细排查方案可参见触发器不能正常触发函数执行怎么办?

    • 确认数据表是否开启了Stream功能,具体请参见为数据表开启Stream功能

    • 确认在创建触发器时配置的角色是否正确,您可以使用默认的触发器角色AliyunTableStoreStreamNotificationRole,具体请参见创建Tablestore触发器

    • 查看是否有函数运行日志,可以根据日志确认是否是函数执行失败。函数执行失败后,会一直重试直到Tablestore中的日志数据过期。

  • 如果函数执行时报错“access_key_id is None or empty.”,请确认配置的函数角色是否拥有访问表格存储的权限,具体请参见附录:授予函数计算访问表格存储的权限