文档

使用函数计算清洗数据

更新时间:

表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储。将数据写入到表格存储时,您可以通过函数计算对新增的数据做简单的清洗,将清洗后的数据写回到表格存储的另一种数据表中。同时,您也可以实时访问表格存储中的原始数据和结果数据。

样例场景

假设写入表格存储的为日志数据,且日志数据包括如下三个字段。为了便于日志查询,您需要将level>1的日志写入到表格存储的另一张数据表result中。

字段名称

类型

说明

id

整型

日志ID。

level

整型

日志的等级,数值越大表示日志等级越高。

message

字符串

日志的内容。

步骤一:为数据表开启Stream功能

使用触发器功能需要先在表格存储控制台开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。

  1. 登录表格存储控制台

  2. 在页面上方,选择地域。

  3. 概览页面,单击实例名称或在实例操作列单击实例管理

  4. 实例详情页签的数据表列表页签,单击数据表名称后选择实时消费通道页签或单击fig_001后选择实时消费通道

  5. 实时消费通道页签,单击Stream信息对应的开启

  6. 开启Stream功能对话框,设置日志过期时长,单击开启

    日志过期时长取值为非零整数,单位为小时,最长时长为168小时。

    重要

    日志过期时长设置后不能修改,请谨慎设置。

步骤二:配置Tablestore触发器

在函数计算控制台创建Tablestore触发器来处理Tablestore数据表的实时数据流。

说明

表格存储只支持使用内置运行时创建的方式创建函数。

  1. 进入函数管理页签。

    1. 登录函数计算控制台

    2. 在左侧导航栏,单击服务及函数

    3. 在页面上方,选择地域。

    4. 服务列表页面,单击目标服务名称。

  2. 函数管理页签,单击创建函数

  3. 创建函数页面,配置函数信息。

    1. 选择创建函数的方式为使用内置运行时创建

    2. 基本设置区域,填写函数名称,并选择请求处理程序类型处理事件请求

      处理事件请求表示通过定时器、调用API/SDK或其他阿里云服务的触发器来触发函数执行。

      说明

      如果未填写函数名称,函数计算会自动为您生成函数名称。

    3. 函数代码区域,根据下表说明配置函数代码信息。

      参数

      说明

      运行环境

      选择您熟悉的语言,例如Python、Java、PHP、Node.js等。此处选择Python 3.9

      函数计算支持的运行环境,请参见函数计算支持的函数运行环境列表

      代码上传方式

      上传函数代码到函数计算的方式。取值范围如下:

      • 使用示例代码(默认):根据业务需要选择函数计算为您提供的创建函数的示例代码。

      • 通过ZIP包上传代码:选择函数代码ZIP包。

      • 通过文件夹上传代码:选择包含函数代码的文件夹。

      • 通过OSS上传代码:选择上传函数代码的Bucket名称文件名称

      此处请选择使用示例代码后,在示例代码列表中选择表格存储TableStore触发函数

    4. 高级配置区域,根据下表说明配置函数的实例相关信息和函数执行超时时间等。

      参数

      说明

      规格方案

      选择或手动输入合理的vCPU规格内存规格组合。关于各资源使用的计费详情,请参见函数计算计费概述

      重要

      vCPU大小(核)与内存大小(GB)的比值必须在1:1到1:4之间。

      临时硬盘大小

      函数实例中的用于临时存储文件的磁盘,临时磁盘会被挂载到实例的根目录。

      选择临时存储文件的硬盘大小。取值范围如下:

      • 512 MB(默认):不计费,函数计算为您提供512 MB以内的硬盘免费使用额度。

      • 10 GB:按9.5 GB进行计费。

      重要

      临时硬盘大小与底层执行函数的实例生命周期一致,实例被系统回收后,硬盘上的数据也会清除。

      实例并发度

      单个函数实例可以同时处理的请求数。更多信息,请参见设置实例并发度

      执行超时时间

      执行函数的超时时间。默认值为60秒,最大值为86400秒。

      如果函数在该时间内未能成功执行,则函数计算会返回超时错误。

      请求处理程序

      函数计算运行时会加载并调用您的请求处理程序处理请求。

      时区

      函数计算的运行环境默认使用UTC时间,您可以通过配置TZ环境变量进行修改。修改时区后,函数计算将会自动为函数添加TZ环境变量,TZ环境变量的值为您设置的目标时区。

    5. (可选)在环境变量区域,设置函数运行环境中的环境变量。更多信息,请参见环境变量

    6. 触发器配置区域,根据下表说明填写触发器相关参数。

      参数

      说明

      触发器类型

      选择表格存储Tablestore

      名称

      自定义填写触发器名称。

      实例

      在下拉列表中选择已创建的Tablestore实例。

      表格

      在下拉列表中选择已创建的数据表。

      角色名称

      选择AliyunTableStoreStreamNotificationRole

      说明

      如果您第一次创建该类型的触发器,则需要单击创建后,在弹出的对话框中选择立即授权,并根据系统提示完成角色创建和授权。

  4. 单击创建

    创建好的触发器会自动显示在触发器管理页签。

    说明

    您也可以在表格存储控制台中数据表的触发器管理页签,查看和创建Tablestore触发器。

步骤三:验证测试

创建触发器后,通过在表格存储中写入和查询数据验证数据清洗是否成功。

  1. 编写代码。

    1. 函数管理页面,单击函数名称。

    2. 在函数详情页面,单击函数代码页签,在代码编辑器中编写代码。

      此处以Python函数代码为例介绍。其中INSTANCE_NAME(表格存储的实例名称)、REGION(使用的地域)、ENDPOINT(服务地址)需要根据情况进行修改。

      #!/usr/bin/env python
      # -*- coding: utf-8 -*-
      import cbor
      import json
      import tablestore as ots
      INSTANCE_NAME = 'distribute-test'
      REGION = 'cn-shanghai'
      ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION)
      RESULT_TABLENAME = 'result'
      def _utf8(input):
          return str(bytearray(input, "utf-8"))
      def get_attrbute_value(record, column):
          attrs = record[u'Columns']
          for x in attrs:
              if x[u'ColumnName'] == column:
                  return x['Value']
      def get_pk_value(record, column):
          attrs = record[u'PrimaryKey']
          for x in attrs:
              if x['ColumnName'] == column:
                  return x['Value']
      #由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限。
      def get_ots_client(context):
          creds = context.credentials
          client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
          return client
      def save_to_ots(client, record):
          id = int(get_pk_value(record, 'id'))
          level = int(get_attrbute_value(record, 'level'))
          msg = get_attrbute_value(record, 'message')
          pk = [(_utf8('id'), id),]
          attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
          row = ots.Row(pk, attr)
          client.put_row(RESULT_TABLENAME, row)
      def handler(event, context):
          records = cbor.loads(event)
          #records = json.loads(event)
          client = get_ots_client(context)
          for record in records['Records']:
              level = int(get_attrbute_value(record, 'level'))
              if level > 1:
                  save_to_ots(client, record)
              else:
                  print ("level <= 1, ignore.")
  2. 向source_data数据表中写入数据,依次填入id、level和message信息,并在result表中查询清洗后的数据。

    • 当向soure_data表中写入level>1的数据时,数据会同步到result表中。

    • 当向soure_data表中写入level<=1的数据时,数据不会同步到result表中。

  • 本页导读 (1)