函数计算原RDS触发器将于2023年01月31日下线,建议您使用函数计算提供的最新RDS触发链路,即
。本文介绍RDS触发器的最新链路配置步骤以及Event参数的差异性。前提条件
- 已准备配置DTS数据订阅任务所需的数据库账号。更多信息,请参见创建账号。
- 已开通事件总线EventBridge并授权。
- 已在函数计算创建服务和创建函数。
操作步骤
配置DTS
- 登录数据传输服务控制台,然后在顶部菜单栏,选择地域。
- 在左侧导航栏,单击数据订阅,然后单击创建任务。
- 在创建任务页面的配置源库及目标库信息配置向导,设置源库信息和消费网络类型,然后单击测试连接以进行下一步。主要配置项解释如下,其余配置项保持默认值即可。说明 此测试连接必须通过公网连接,请确保RDS实例已开通外网连接功能。您可以在RDS实例的数据库连接页面配置。
配置项 说明 示例 源库信息区域 数据库类型 选择待订阅的数据库类型。 MySQL RDS实例ID 选择待订阅的RDS实例。 rm-bp1pw60i18f2x**** 数据库账号 填写待订阅的RDS数据库账号。 db_chi 数据库密码 填写待订阅的RDS数据库账号对应的密码。 ************* 消费网络类型区域 专有网络 选择数据订阅实例所属的专有网络。 vpc-bp12c5dzorfoizcez**** 虚拟交换机 选择数据订阅实例所属的交换机。 vsw-bp1lt2oxenx87jvc8**** - 在创建任务页面的配置任务对象及高级配置配置向导,选择订阅的数据类型和数据表,然后根据界面提示完成高级配置和预检查操作。
- 在创建任务页面的购买配置向导,根据界面提示购买实例,然后在订阅任务列表,启动任务。
- 单击任务ID打开任务,在任务管理页面,单击数据消费,然后单击新增消费组,在弹出的新增消费组面板设置消费组名称、账号和密码等。
配置EventBridge
- 登录事件总线EventBridge控制台,在左侧导航栏,单击事件流。
- 在顶部菜单栏,选择地域,然后单击创建事件流。
- 在创建事件流面板,设置以下配置项,然后单击创建。
- 在基本信息页签,设置事件流名称和描述,然后单击下一步。
- 在事件源页签,事件提供方选择数据库 DTS,选择步骤6设置的消费组,并填写密码和消费位点。然后单击下一步。
- 可选:在规则页签,设置事件规则,然后单击下一步。
- 在目标页签,服务类型选择函数计算,然后填写已创建的服务和函数。
创建完成后,您可以在事件流列表查看事件源数据库DTS至事件目标函数计算的任务,其状态为运行中。
完成以上所有配置后,
的数据链路配置成功。根据Event差异调整函数代码
RDS触发器最新链路的数据格式和原RDS触发器的数据格式不相同,具体差异点如下所示。需要在代码中将函数对Event的解析处理方式做调整。
- 原RDS触发器,即具体信息,请参见RDS触发器eventFormat为protobuf的代码使用示例。 链路下,Event数据格式为Object。
- 新RDS触发器,即数据传输服务DTS。 链路下,Event数据格式为Array。Array中每一个元素是String类型数据,String数据即为Object to String的结果。更多信息,请参见
以下提供Python和Java两条链路的Event解析Demo。您可以对比两者的差异,修改函数Event解析逻辑。
Python
# -*- coding: utf-8 -*- import json import logging def handler(event, context): logger = logging.getLogger() eventObj = json.loads(event) logger.info("rds trigger event = {}".format(eventObj)) # 您可以在此处增加自己的处理逻辑,例如,更新Redis缓存。 return "OK"
事件解析代码# -*- coding: utf-8 -*- import logging import json def handler(event, context): logger = logging.getLogger() # 将Event入参转换为String列表。 evt_list = json.loads(event) for evt in evt_list: # 将列表中每一个元素转为Json类型。 evt_obj = json.loads(evt) logger.info(evt_obj) return "OK"
事件解析代码
Java
package example; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import com.google.gson.*; import com.google.gson.JsonParser; import com.google.gson.JsonObject; import com.aliyun.fc.runtime.Context; import com.aliyun.fc.runtime.StreamRequestHandler; import java.io.*; import java.nio.charset.StandardCharsets; public class App implements StreamRequestHandler { @Override public void handleRequest( InputStream inputStream, OutputStream outputStream, Context context) throws IOException { // 从input入参读取数据,并存储到byte[] buffer中。 ByteArrayOutputStream result = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; for (int length; (length = inputStream.read(buffer)) != -1; ) { result.write(buffer, 0, length); } // 将byte[]数据转换为Object类型。 String stringData = result.toString(StandardCharsets.UTF_8.name()); JsonObject jsonObj = (JsonObject)(new JsonParser().parse(stringData)); System.out.println(jsonObj); outputStream.write(new String("OK").getBytes()); } }
pom.xml文件示例如下。<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>example</groupId> <artifactId>FCJavaDemo</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>FCJavaDemo</name> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.aliyun.fc.runtime</groupId> <artifactId>fc-java-core</artifactId> <version>1.4.1</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-my-jar-with-dependencies</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> <maven.test.skip>true</maven.test.skip> </properties> </project>
事件解析代码package example; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import com.google.gson.*; import com.google.gson.JsonParser; import com.google.gson.JsonObject; import com.aliyun.fc.runtime.Context; import com.aliyun.fc.runtime.StreamRequestHandler; import java.io.*; import java.nio.charset.StandardCharsets; public class App implements StreamRequestHandler { @Override public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException { // 从input入参读取数据,并存储到byte[] buffer中 ByteArrayOutputStream result = new ByteArrayOutputStream(); byte[] buffer = new byte[1024]; for (int length; (length = inputStream.read(buffer)) != -1; ) { result.write(buffer, 0, length); } // 将byte[]类型数据转换为String[]类型。 String stringData = result.toString(StandardCharsets.UTF_8.name()); Gson gson =new Gson(); String[] stringArrayData = gson.fromJson(stringData, String[].class); // 遍历String[]中的每一个元素,并将每一个元素转为Json格式数据。其中data字段为DTS原数据。 for(String elem : stringArrayData){ JsonObject jsonObj = (JsonObject)(new JsonParser().parse(elem)); System.out.println(jsonObj.get("data")); } outputStream.write((new String("OK")).getBytes()); } }
pom.xml文件示例如下。<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>example</groupId> <artifactId>FCJavaDemo</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>FCJavaDemo</name> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.aliyun.fc.runtime</groupId> <artifactId>fc-java-core</artifactId> <version>1.4.1</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-my-jar-with-dependencies</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <properties> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.source>1.8</maven.compiler.source> <maven.test.skip>true</maven.test.skip> </properties> </project>
事件解析代码