函数计算原RDS触发器将于2023年01月31日下线,建议您使用函数计算提供的最新RDS触发链路,即云数据库RDS > 数据传输服务DTS > 事件总线EventBridge > 函数计算。本文介绍RDS触发器的最新链路配置步骤以及Event参数的差异性。

前提条件

操作步骤

配置DTS

  1. 登录数据传输服务控制台,然后在顶部菜单栏,选择地域。
  2. 在左侧导航栏,单击数据订阅,然后单击创建任务
  3. 创建任务页面的配置源库及目标库信息配置向导,设置源库信息消费网络类型,然后单击测试连接以进行下一步
    主要配置项解释如下,其余配置项保持默认值即可。
    说明 此测试连接必须通过公网连接,请确保RDS实例已开通外网连接功能。您可以在RDS实例的 数据库连接页面配置。
    配置项 说明 示例
    源库信息区域
    数据库类型 选择待订阅的数据库类型。 MySQL
    RDS实例ID 选择待订阅的RDS实例。 rm-bp1pw60i18f2x****
    数据库账号 填写待订阅的RDS数据库账号。 db_chi
    数据库密码 填写待订阅的RDS数据库账号对应的密码。 *************
    消费网络类型区域
    专有网络 选择数据订阅实例所属的专有网络。
    说明 消费网络的 专有网络虚拟交换机必须与RDS实例设置的VPC和vSwitch一致。您可以在RDS实例的 数据库连接页面查看。
    vpc-bp12c5dzorfoizcez****
    虚拟交换机 选择数据订阅实例所属的交换机。
    说明 消费网络的 专有网络虚拟交换机必须与RDS实例设置的VPC和vSwitch一致。您可以在RDS实例的 数据库连接页面查看。
    vsw-bp1lt2oxenx87jvc8****
  4. 创建任务页面的配置任务对象及高级配置配置向导,选择订阅的数据类型和数据表,然后根据界面提示完成高级配置和预检查操作。
  5. 创建任务页面的购买配置向导,根据界面提示购买实例,然后在订阅任务列表,启动任务。
  6. 单击任务ID打开任务,在任务管理页面,单击数据消费,然后单击新增消费组,在弹出的新增消费组面板设置消费组名称账号密码等。

配置EventBridge

  1. 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流
  2. 在顶部菜单栏,选择地域,然后单击创建事件流
  3. 创建事件流面板,设置以下配置项,然后单击创建
    1. 基本信息页签,设置事件流名称描述,然后单击下一步
    2. 事件源页签,事件提供方选择数据库 DTS,选择步骤6设置的消费组,并填写密码消费位点。然后单击下一步
    3. 可选:规则页签,设置事件规则,然后单击下一步
    4. 目标页签,服务类型选择函数计算,然后填写已创建的服务函数
    创建完成后,您可以在事件流列表查看 事件源数据库DTS至 事件目标函数计算的任务,其状态为 运行中

完成以上所有配置后,云数据库RDS > 数据传输服务DTS > 事件总线EventBridge > 函数计算的数据链路配置成功。

根据Event差异调整函数代码

RDS触发器最新链路的数据格式和原RDS触发器的数据格式不相同,具体差异点如下所示。需要在代码中将函数对Event的解析处理方式做调整。
  • 原RDS触发器,即云数据库RDS > 函数计算链路下,Event数据格式为Object。具体信息,请参见RDS触发器eventFormat为protobuf的代码使用示例
  • 新RDS触发器,即云数据库RDS > 数据传输服务DTS > 事件总线EventBridge > 函数计算链路下,Event数据格式为Array。Array中每一个元素是String类型数据,String数据即为Object to String的结果。更多信息,请参见数据传输服务DTS

以下提供Python和Java两条链路的Event解析Demo。您可以对比两者的差异,修改函数Event解析逻辑。

Python

  • 云数据库RDS > 函数计算事件解析代码
    # -*- coding: utf-8 -*-
    import json
    import logging
    
    def handler(event, context):
      logger = logging.getLogger()
      eventObj = json.loads(event)
      logger.info("rds trigger event = {}".format(eventObj))
      # 您可以在此处增加自己的处理逻辑,例如,更新Redis缓存。
      return "OK"
  • 云数据库RDS > 数据传输服务DTS > 事件总线EventBridge > 函数计算事件解析代码
    # -*- coding: utf-8 -*-
    import logging
    import json
    
    def handler(event, context):
      logger = logging.getLogger()
      # 将Event入参转换为String列表。
      evt_list = json.loads(event)
      for evt in evt_list:
          # 将列表中每一个元素转为Json类型。
          evt_obj = json.loads(evt)
          logger.info(evt_obj)
      return "OK"

Java

  • 云数据库RDS > 函数计算事件解析代码
    package example;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    
    import com.google.gson.*;
    import com.google.gson.JsonParser;
    import com.google.gson.JsonObject;
    import com.aliyun.fc.runtime.Context;
    import com.aliyun.fc.runtime.StreamRequestHandler;
    
    import java.io.*;
    import java.nio.charset.StandardCharsets;
    
    public class App implements StreamRequestHandler {
        @Override
        public void handleRequest(
                InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
              // 从input入参读取数据,并存储到byte[] buffer中。
            ByteArrayOutputStream result = new ByteArrayOutputStream();
            byte[] buffer = new byte[1024];
            for (int length; (length = inputStream.read(buffer)) != -1; ) {
                result.write(buffer, 0, length);
            }
    
            // 将byte[]数据转换为Object类型。
            String stringData = result.toString(StandardCharsets.UTF_8.name());
            JsonObject jsonObj = (JsonObject)(new JsonParser().parse(stringData));
            System.out.println(jsonObj);
            outputStream.write(new String("OK").getBytes());
        }
    }
                        
    pom.xml文件示例如下。
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>example</groupId>
      <artifactId>FCJavaDemo</artifactId>
      <packaging>jar</packaging>
      <version>1.0-SNAPSHOT</version>
      <name>FCJavaDemo</name>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>com.aliyun.fc.runtime</groupId>
          <artifactId>fc-java-core</artifactId>
          <version>1.4.1</version>
        </dependency>
        <dependency>
          <groupId>com.google.code.gson</groupId>
          <artifactId>gson</artifactId>
          <version>2.8.5</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.47</version>
        </dependency>
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
            </configuration>
            <executions>
              <execution>
                <id>make-my-jar-with-dependencies</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    
      <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.test.skip>true</maven.test.skip>
      </properties>
    </project>
  • 云数据库RDS > 数据传输服务DTS > 事件总线EventBridge > 函数计算事件解析代码
    package example;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    
    import com.google.gson.*;
    import com.google.gson.JsonParser;
    import com.google.gson.JsonObject;
    import com.aliyun.fc.runtime.Context;
    import com.aliyun.fc.runtime.StreamRequestHandler;
    
    import java.io.*;
    import java.nio.charset.StandardCharsets;
    
    public class App implements StreamRequestHandler {
    
        @Override
        public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
              // 从input入参读取数据,并存储到byte[] buffer中
            ByteArrayOutputStream result = new ByteArrayOutputStream();
            byte[] buffer = new byte[1024];
            for (int length; (length = inputStream.read(buffer)) != -1; ) {
                result.write(buffer, 0, length);
            }
    
        // 将byte[]类型数据转换为String[]类型。
            String stringData = result.toString(StandardCharsets.UTF_8.name());
            Gson gson =new Gson();
            String[] stringArrayData = gson.fromJson(stringData, String[].class);
    
            // 遍历String[]中的每一个元素,并将每一个元素转为Json格式数据。其中data字段为DTS原数据。
            for(String elem : stringArrayData){
                JsonObject jsonObj = (JsonObject)(new JsonParser().parse(elem));
                System.out.println(jsonObj.get("data"));
            }
            outputStream.write((new String("OK")).getBytes());
        }
    }
    pom.xml文件示例如下。
    <project xmlns="http://maven.apache.org/POM/4.0.0"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>example</groupId>
      <artifactId>FCJavaDemo</artifactId>
      <packaging>jar</packaging>
      <version>1.0-SNAPSHOT</version>
      <name>FCJavaDemo</name>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>com.aliyun.fc.runtime</groupId>
          <artifactId>fc-java-core</artifactId>
          <version>1.4.1</version>
        </dependency>
        <dependency>
          <groupId>com.google.code.gson</groupId>
          <artifactId>gson</artifactId>
          <version>2.8.5</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.47</version>
        </dependency>
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
            </configuration>
            <executions>
              <execution>
                <id>make-my-jar-with-dependencies</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    
      <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.test.skip>true</maven.test.skip>
      </properties>
    </project>