全部产品

RDS触发器

更新时间:2019-06-20 16:12:02

RDS 触发器

阿里云关系型数据库( Relational Database Service,简称 RDS)是一种稳定可靠、可弹性伸缩的在线数据库服务。为了支持用户的异构数据流转和轻量级计算需求,比如字段合并、拆分、变换等操作,RDS 与阿里云函数计算(Function Compute,以下简称 FC )进行集成,将 RDS 作为事件源接入 FC,借助 FC 的 serverless 计算能力,使用户具有自定义处理数据的能力。比如一个典型的例子:Redis缓存淘汰

fc-rds

以 bls 业务为例,bls-manager 是管控核心组件,对元数据库执行读写操作,但 bls-console 是外围组件,负责运维页面的 api,基本是一些查询操作,bls-console 会把查到的数据缓存到redis中。

  • 问题

    bls-manager 更新了元数据,但 bls-console 不知道,需要等 key 过期后才会重新从元数据库获取。

  • 解决办法

    为了解决元数据更新后,redis 中缓存未及时清除/更新的问题,用函数计算实时订阅 rds 的 binlog,根据相应 dml 语句,对 redis 中的 key 执行 delete/update 操作

目前支持 RDS trigger 支持范围

  • 规格

支持 mysql 5.6、5.7,高可用版以及基础版, 后续还会支持更多类型的数据库

  • 网络

支持经典网络和 vpc

  • 地域

当前支持地域:北京、青岛、上海、杭州、深圳、香港、美东、 美西、新加坡、澳大利亚、德国; 日本、印度和张家口暂时不支持

注意事项

避免出现循环调用的情况,用户编写函数的时候,注意不要出现以下逻辑: RDS 数据库 myDb 中的表 A 触发函数 B,函数 B 又更新数据库 myDb 中的表 A 的数据,从而造成函数无限循环调用

RDS 事件定义

当 RDS 实例下面的 DataBase 中 具体的 Table 发生了 insert 、 update 、 delete 发生数据变更操作的时候,会将变更详情信息编码为 json 字符串或者 protobuf 字节流,传递给函数进行处理。

RDS 事件格式示例:

  • json(比如删除表中某一行):

    1. {
    2. "offset": "13054",
    3. "timestamp": "1539182054",
    4. "entries": [{
    5. "operation": "BEGIN",
    6. "timestamp": "1539182055",
    7. "id": "90bd7f98-cac6-11e8-bb08-506b4b2364ec:32690"
    8. }, {
    9. "operation": "DELETE",
    10. "timestamp": "1539182055",
    11. "id": "90bd7f98-cac6-11e8-bb08-506b4b2364ec:32690",
    12. "sequence": "1",
    13. "dbName": "fc-wp",
    14. "tableName": "wp_posts",
    15. "beforeRow": [{
    16. "name": "id",
    17. "charset": "utf8",
    18. "typeNum": -5,
    19. "orgTypeName": "bigint(20)",
    20. "isPk": true,
    21. "value": "MTI="
    22. }, {
    23. "name": "content",
    24. "charset": "utf8",
    25. "idx": 1,
    26. "typeNum": -4,
    27. "orgTypeName": "longtext",
    28. "value": "aGVsbG8="
    29. }, {
    30. "name": "ver",
    31. "charset": "utf8",
    32. "idx": 2,
    33. "typeNum": 4,
    34. "orgTypeName": "int(11)",
    35. "value": "MjQ="
    36. }]
    37. }, {
    38. "operation": "COMMIT",
    39. "timestamp": "1539182055",
    40. "id": "90bd7f98-cac6-11e8-bb08-506b4b2364ec:32690",
    41. "sequence": "2"
    42. }]
    43. }
  • protobuf 完整定义

    1. syntax = "proto3";
    2. package protocol;
    3. enum DBType {
    4. MySQL = 0;
    5. Redis = 1;
    6. Mongo = 2;
    7. HBase = 3;
    8. }
    9. message Message {
    10. //消费该message后可以ack的offset.考虑进行编码.
    11. int64 offset = 1;
    12. //消费该message后可以ack的record对应的timestamp
    13. int64 timestamp = 2;
    14. //留作备用:以后传送大的数据行可以拿来使用.
    15. int32 spare_flag = 3;
    16. int32 spare_seq = 4;
    17. //message的version
    18. int32 version = 5;
    19. //数据源
    20. DBType db_type = 6;
    21. //数据行
    22. repeated Entry entries = 7;
    23. }
    24. //操作
    25. enum OpType {
    26. UNKOWN_TYPE = 0;
    27. BEGIN = 1;
    28. COMMIT = 2;
    29. //不在下列DDL操作用的query
    30. QUERY = 3;
    31. INSERT = 4;
    32. UPDATE = 5;
    33. DELETE = 6;
    34. CREATE = 7;
    35. ALTER = 8;
    36. DROP = 9;
    37. TRUNCATE = 10;
    38. RENAME = 11;
    39. //CREATE INDEX
    40. CINDEX = 12;
    41. //DROP INDEX
    42. DINDEX = 13;
    43. OPTIMIZE = 14;
    44. XA = 15;
    45. }
    46. message Entry {
    47. OpType operation = 1;
    48. //时间戳,单位s
    49. int64 timestamp = 2;
    50. //Transaction id
    51. string id = 3;
    52. //一个事务中的第几行
    53. int64 sequence = 4;
    54. //DML操作的db_name,DDL操作时候session的默认db
    55. string db_name = 5;
    56. //DML操作的table_name
    57. string table_name = 6;
    58. repeated Field row = 7;
    59. repeated Field before_row = 8;
    60. //非DML语句的sql
    61. string query = 9;
    62. }
    63. message Field {
    64. //column name
    65. string name = 1;
    66. //字符集
    67. string charset = 3;
    68. //第几列
    69. int32 idx = 2;
    70. //对应java中的type
    71. int32 type_num = 4;
    72. //对于mysql,mysql中的type num;
    73. int32 org_type = 5;
    74. //在db中原始type的name
    75. string org_type_name = 6;
    76. //预留的flag
    77. int32 flag = 7;
    78. //是否为NULL
    79. bool is_null = 8;
    80. //是否是pk
    81. bool is_pk = 9;
    82. //是否是unsigned value
    83. bool is_unsigned = 10;
    84. //是否是timestamp value(timestamp展示值和时区相关,这里记录标准时区的值)
    85. bool is_timestamp = 11;
    86. //value的值都用bytes表示,消费端需要先用charset结合bytes生成对应string.然后再转换为type_num对应的type value.
    87. //同时当charset为空,即表示原数据类型就是binary类型
    88. bytes value = 12;
    89. }

配置 RDS 触发器:

触发器示例:rds_trigger.yml

  1. triggerConfig:
  2. subscriptionObjects: ["db1.table1", "db1.table2"]
  3. retry: 1
  4. concurrency: 2
  5. eventFormat: json

触发器参数说明

参数名称 约束 默认值 类型 描述
subscriptionObjects string 订阅对象,当前支持到表级别,只有这些表的更新才会触发函数执行, 比如参数为[“db1.table1”, “db2.table2”]
retry [0,3] 3 int 失败重试次数,如果重试指定次数后还是失败,则跳过失败event,继续后续调用
concurrency [1,5] 1 int 调用并发量,如果用户关心事件顺序,一定要置为1,会按照事务在binlog中commit的顺序调用函数。如果不关心事件顺序,则可以调大并发,提高性能
eventFormat json,protobuf protobuf string event格式

注:concurrency 是负载决定并发,如当concurrency设置为5的时候,后端服务尽量保证(但不一定保证)并发度为5

使用示例

可以通过函数计算控制台推荐)、Fun推荐)、命令行工具fcliSDK三种方式设置函数的 RDS 触发器,下面对这三种方式分别进行介绍:

示例1: 控制台新建 RDS 触发器(推荐)

本示例展示了使用控制台设置 RDS 触发器的方式。可以在创建函数的时候设置触发器,也可以在函数创建完成后再设置触发器。对触发器和创建触发器有困惑的同学可以参考文档使用事件源服务创建触发器

首先登录 函数计算管理控制台,选择相应的区域和服务。如果还未新建服务,请参考创建服务

在创建函数时设置触发器

  1. 单击 创建函数,进入创建函数页面,选择 空白函数,单击下一步

  2. 选择 RDS触发器,并按下图进行配置,比如这里设置 eventFormat 为 jsonrds1

  3. 创建函数,填写相应信息,选择 在线编辑,并粘贴以下 python runtime的函数示例代码,单击 下一步
  1. import json
  2. import logging
  3. def handler(event, context):
  4. logger = logging.getLogger()
  5. eventObj = json.loads(event)
  6. logger.info("rds trigger event = {}".format(eventObj))
  7. return "OK"

4.(可选)配置权限,单击 下一步, 核对信息无误后,单击 创建

在函数创建完成后设置触发器

  1. 选中已有的函数,单击 触发器 > 创建触发器
  2. 在创建触发器页面,按下图配置 RDS触发器,单击 确定

    注: 如果不是考虑精确粒度的授权,可以直接使用快捷授权

rds-trigger

示例2: Fun 新建 RDS 触发器(推荐)

Fun 提供了 RDS 触发器 的支持,可以实现函数触发器的创建。下面,我们介绍使用 Fun 配置 RDS 触发器的步骤。

在项目根目录创建一个 template.yml 文件,并将内容填充为:

  1. ROSTemplateFormatVersion: '2015-09-01'
  2. Transform: 'Aliyun::Serverless-2018-04-03'
  3. Resources:
  4. FunDemo:
  5. Type: 'Aliyun::Serverless::Service'
  6. rdsdemo:
  7. Type: 'Aliyun::Serverless::Function'
  8. Properties:
  9. Handler: index.handler
  10. Runtime: python2.7
  11. CodeUri: './'
  12. Events:
  13. my-rds-trigger:
  14. Type: RDS
  15. Properties:
  16. InstanceId: rm-12345799xyz
  17. SubscriptionObjects:
  18. - db1.table1
  19. Retry: 2
  20. Concurrency: 1
  21. EventFormat: json

每个字段的含义,请参考

在项目根目录创建一个 index.js,然后编写相应的逻辑代码就可以了。代码编写完成,直接执行 fun deploy 即可实现部署,更完整的示例请参考

如果想在本地单步调试、运行函数,可以参考 开发函数计算的正确姿势 —— 使用 Fun Local 本地运行与调试

更多的配置规则 请参考

Fun 的更多教程 请参考

示例3: fcli 新建 RDS 触发器

首先,创建一个包含Trigger Config的yaml文件。假设对 rds 实例中 Database db1 中的两张表 table1 和 table2 ,对应的yaml文件内容如下:

  1. triggerConfig:
  2. subscriptionObjects: ["db1.table1", "db1.table2"]
  3. retry: 1
  4. concurrency: 2
  5. eventFormat: json

在对应的fuction目录下创建触发器:

  1. mkt serviceName/functionName -t rds -c TriggerConfig.yaml

更多fcli使用,请参考fcli开发指南

示例4: SDK编程新建RDS触发器

fc-python-sdk为例演示如何通过SDK新建 RDS 触发器。函数计算还提供fc-nodejs-sdkfc-java-sdk.

新建触发器代码

  1. client = fc2.Client(
  2. endpoint='<Your Endpoint>',
  3. accessKeyID='<Your AccessKeyID>',
  4. accessKeySecret='<Your AccessKeySecret>')
  5. service_name = 'serviceName'
  6. function_name = 'functionName'
  7. trigger_name = 'triggerName'
  8. trigger_type = 'rds'
  9. source_arn = 'acs:rds:cn-hangzhou:<Your Account ID>:dbinstance/<Your Rds instance ID>'
  10. invocation_role = 'acs:ram::<Your Account ID>:role/<Your Invocation Role>'
  11. trigger_config = {
  12. "subscriptionObjects": ["db1.table1", "db1.table2"],
  13. "retry": 2,
  14. "concurrency": 1,
  15. "eventFormat": "json"
  16. }
  17. client.create_trigger(service_name, function_name, trigger_name, trigger_type, trigger_config, source_arn, invocation_role)

函数代码

  1. # for example,eventFormat is json
  2. import json
  3. import logging
  4. def handler(event, context):
  5. logger = logging.getLogger()
  6. eventObj = json.loads(event)
  7. logger.info("rds trigger event = {}".format(eventObj))
  8. return 'OK'

如有疑问可留言,或加入函数计算官方客户群(钉钉群号:11721331)