表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储。您可以将数据写入到表格存储中,同时在函数计算中对新增的数据做简单的清洗,将清洗后的数据写回到表格存储表中,并能实时访问原始和结果数据。

样例场景

如果写入的为日志数据,日志数据包括如下三个字段。为了便于日志查询,您需要将level>1的日志写入到另一张数据表中。

字段名称 类型 含义
id 整型 日志id。
level 整型 日志的等级,数值越大表示日志等级越高。
message 字符串 日志的内容。

创建函数和触发器

使用触发器功能需要先开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。由于触发器只能绑定已有函数,所以需在和表格存储实例相同的地域中创建函数。

  1. 创建开通了Stream流的数据表。
    1. 在表格存储控制台创建实例
    2. 在该实例下创建数据表,并且开启数据表的Stream功能。
  2. 创建函数。
    1. 登录函数计算控制台
    2. 在左侧导航栏,单击服务 / 函数
    3. 服务 / 函数页面,单击新建函数
    4. 新建函数页面,选择事件函数,单击下一步
    5. 根据实际需要配置函数信息,单击完成
      • 函数名称为etl_test,选择python2.7环境。
      • 函数入口为etl_test.handler。
      fig_newfunction
  3. 配置服务。
    1. 在左侧导航栏,单击服务 / 函数
    2. 服务 / 函数页面,单击服务配置
    3. 服务配置页签,可配置服务的角色,详情请参见函数计算权限模型
      由于函数计算需要将运行中的日志写入到日志服务中,同时需要对表格存储的表进行读写,所以需要对函数计算进行授权,此处以添加AliyunOTSFullAccess与AliyunLogFullAccess权限为例介绍,如下图所示。实际使用时建议根据权限最小原则来添加权限。fig_rule
  4. 修改函数代码。
    1. 服务 / 函数页面,单击函数名称。
    2. 在函数详情页面,单击代码执行
    3. 代码执行页签,编辑代码并保存。
      其中INSTANCE_NAME(表格存储的实例名称)、REGION(使用的区域)、ENDPOINT(服务地址)需要根据情况进行修改。 fig_code
      使用示例代码如下:
      #!/usr/bin/env python
      # -*- coding: utf-8 -*-
      import cbor
      import json
      import tablestore as ots
      INSTANCE_NAME = 'distribute-test'
      REGION = 'cn-shanghai'
      ENDPOINT = 'http://%s.%s.vpc.tablestore.aliyuncs.com'%(INSTANCE_NAME, REGION)
      RESULT_TABLENAME = 'result'
      def _utf8(input):
          return str(bytearray(input, "utf-8"))
      def get_attrbute_value(record, column):
          attrs = record[u'Columns']
          for x in attrs:
              if x[u'ColumnName'] == column:
                  return x['Value']
      def get_pk_value(record, column):
          attrs = record[u'PrimaryKey']
          for x in attrs:
              if x['ColumnName'] == column:
                  return x['Value']
      #由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限。
      def get_ots_client(context):
          creds = context.credentials
          client = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)
          return client
      def save_to_ots(client, record):
          id = int(get_pk_value(record, 'id'))
          level = int(get_attrbute_value(record, 'level'))
          msg = get_attrbute_value(record, 'message')
          pk = [(_utf8('id'), id),]
          attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]
          row = ots.Row(pk, attr)
          client.put_row(RESULT_TABLENAME, row)
      def handler(event, context):
          records = cbor.loads(event)
          #records = json.loads(event)
          client = get_ots_client(context)
          for record in records['Records']:
              level = int(get_attrbute_value(record, 'level'))
              if level > 1:
                  save_to_ots(client, record)
              else:
                  print "Level <= 1, ignore."
  5. 创建和测试Tablestore触发器。
    说明 首次通过表格存储控制台使用函数计算时,请使用老版控制台授予表格存储发送事件通知的权限。
    1. 在表格存储控制台数据表的触发器管理页签,单击使用已有函数计算
    2. 创建触发器对话框,选择函数计算服务和函数计算函数,输入触发器名称。
      说明 当使用老版控制台首次创建触发器时,需要选中授予表格存储发送事件通知的权限,授予表格存储发送事件通知的权限。

      授权后,可以在RAM控制台查看到自动创建的授权角色AliyunTableStoreStreamNotificationRole。

    3. 单击确定

运行验证

创建函数和触发器后,通过在表格存储中写入和查询数据验证数据清洗是否成功。

向source_data表中写入数据,依次填入id、level和message信息,并在result表中查询清洗后的数据,详请请参见控制台读写数据
  • 当向soure_data表中写入level>1的数据时,数据会同步到result表中。
  • 当向soure_data表中写入level<=1的数据时,数据不会同步到result表中。