经过了上一节的介绍,下面我们来完成一个使用函数计算对表格存储中的数据做简单清洗的场景。

表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储,我们可以将数据写入到表格存储中,同时在函数计算中对新增的数据做简单的清洗,并将清洗之后的数据写回到表格存储的结果表中,并对原始数据及结果数据提供实时访问。

数据定义

我们假设写入的为日志数据,包括三个基础字段:

字段名称 类型 含义
id 整型 日志id
level 整型 日志的等级,越大表明等级越高
message 字符串 日志的内容

我们需要将 level>1 的日志写入到另外一张数据表中,用作专门的查询。

创建实例及数据表

表格存储控制台创建表格存储实例(本次以 华东2 distribute-test 为例),并创建源表(source_data)及结果表(result),主键为 id (整型),由于表格存储是 schemafree 结构,无需预先定义其他属性列字段。

以 source_data 为例,创建如下图:



开启数据源表的Stream功能

触发器功能需要先开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。



Stream记录过期时长是指通过 StreamAPI 能够读取到的增量数据的最长时间。

由于触发器只能绑定现有的函数,故先到函数计算的控制台上在同region创建服务及函数。
  • 创建函数计算服务

    函数计算的控制台上创建服务及处理函数,我们继续使用华东2节点。

    1. 在华东2节点创建服务。

    2. 创建函数依次选择空白函数 > 不创建触发器

      • 函数名称为:etl_test,选择 python2.7 环境,在线编辑代码
      • 函数入口为:etl_test.handler
      • 代码稍后编辑,点击下一步。
    3. 进行服务授权

      由于函数计算需要将运行中的日志写入到日志服务中,同时,需要对表格存储的表进行读写,故需要对函数计算进行授权,为方便起见,我们先添加 AliyunOTSFullAccess 与 AliyunLogFullAccess 权限,实际生产中,建议根据权限最小原则来添加权限。



    4. 单击授权完成,并创建函数。
    5. 修改函数代码。

      创建好函数之后,点击对应的函数代码执行,编辑代码并保存,其中,INSTANCE_NAME(表格存储的实例名称)、REGION(使用的区域)需要根据情况进行修改:



      使用示例代码如下:
      #!/usr/bin/env python
      # -*- coding: utf-8 -*-
      import cbor
      import json
      import tablestore as ots
      INSTANCE_NAME = 'distribute-test'
      REGION = 'cn-shanghai'
      ENDPOINT = 'http://%s.%s.ots-internal.aliyuncs.com'%(INSTANCE_NAME, REGION)
      RESULT_TABLENAME = 'result'
      def _utf8(input):
          return str(bytearray(input, "utf-8"))
      def get_attrbute_value(record, column):
          attrs = record[u'Columns']
          for x in attrs:
              if x[u'ColumnName'] == column:
                  return x['Value']
      def get_pk_value(record, column):
          attrs = record[u'PrimaryKey']
          for x in attrs:
              if x['ColumnName'] == column:
                  return x['Value']
      #由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限
      def get_ots_client(context):
          creds = context.credentials
          client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
          return client
      def save_to_ots(client, record):
          id = int(get_pk_value(record, 'id'))
          level = int(get_attrbute_value(record, 'level'))
          msg = get_attrbute_value(record, 'message')
          pk = [(_utf8('id'), id),]
          attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
          row = ots.Row(pk, attr)
          client.put_row(RESULT_TABLENAME, row)
      def handler(event, context):
          records = cbor.loads(event)
          #records = json.loads(event)
          client = get_ots_client(context)
          for record in records['Records']:
              level = int(get_attrbute_value(record, 'level'))
              if level > 1:
                  save_to_ots(client, record)
              else:
                  print "Level <= 1, ignore."

绑定触发器

  1. 回到表格存储的实例管理页面,单击表 source_data 后的使用触发器按钮,进入触发器绑定界面,单击使用已有函数计算, 选择刚创建的服务及函数,勾选表格存储发送事件通知的权限, 进行确定。

  2. 绑定成功之后,能够看到如下的信息:

运行验证

  1. 向 source_data 表中写入数据。

    单击source_data的数据管理页面,然后单击插入数据,如图依次填入id、level及message信息。



  2. 在 result 表中查询清洗后的数据

    单击result表的数据管理页面,会查询到刚写入到 source_data 中的数据。

    向 soure_data 写入level <=1的数据将不会同步到 result 表中。