本文为您介绍如何使用云数据库RDS MySQL版连接器。
RDS MySQL基于阿里巴巴的MySQL源码分支,经过双十一高并发、大数据量的考验,拥有优良的性能。RDS MySQL支持实例管理、账号管理、数据库管理、备份恢复、白名单、透明数据加密以及数据迁移等基本功能。RDS MySQL详情请参见RDS MySQL云数据库。
后续将计划不再支持云数据库RDS MySQL版连接器,建议您直接使用MySQL连接器。使用MySQL连接器请参见MySQL。
RDS MySQL连接器支持的信息如下。
| 类别 | 详情 | 
| 支持类型 | 结果表和维表 | 
| 运行模式 | 流模式与批模式 | 
| 数据格式 | 暂不适用 | 
| 特有监控指标 | 
 说明  指标含义详情,请参见监控指标说明。 | 
| API种类 | SQL | 
| 是否支持更新或删除结果表数据 | 是 | 
前提条件
- 已创建RDS MySQL数据库和表,详情请参见创建数据库和账号。 
- 已设置IP白名单,详情请参见通过客户端、命令行连接RDS MySQL实例。 
使用限制
- 仅Flink计算引擎VVR 2.0.0及以上版本支持RDS MySQL连接器。 
- 仅支持阿里云RDS MySQL云数据库。 
- 语义上可以保证At-Least-Once,在结果表有主键的情况下,幂等可以保证数据的正确性。 
- 推荐您使用最新版本的Flink(例如6.x以上),以获取最新的性能与稳定性优化。 
注意事项
RDS MySQL连接器后续会逐步下线,建议您在功能满足的前提下使用MySQL连接器。详情请参见MySQL。
语法结构
- 结果表 - CREATE TABLE rds_sink( id INT, num BIGINT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='rds', 'tableName'='your-table-name', 'userName'='your-user-name', 'password'='your-password', 'url'='your-url' );说明- Flink RDS连接器写入数据库结果表原理:针对Flink Sink输出数据,拼接成一行SQL语句,然后执行。对于没有主键的结果表,会执行 - INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);语句。对于包含主键的结果表,会执行- INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;语句。请注意:如果物理表存在除主键外的唯一索引约束,当插入两条主键不同但唯一索引相同的记录时,下游数据会因为唯一索引冲突导致数据覆盖引发数据丢失。
- 如果在RDS MySQL云数据库定义了自增主键,在Flink DDL中不应该声明该自增字段。数据写入过程中,数据库会自动填补该自增字段。Flink RDS连接器仅支持写入和删除带自增字段的数据,不支持更新。 
 
- 维表 - CREATE TABLE rds_dim( id1 INT, id2 VARCHAR ) WITH ( 'connector'='rds', 'tableName'='your-table-name', 'userName'='your-user-name', 'password'='your-password', 'url'='your-url' 'cache'='NONE' );
WITH参数
- 通用 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - connector - 表类型。 - String - 是 - 无。 - 固定值为rds。 - tableName - 表名。 - String - 是 - 无。 - 无。 - userName - 用户名。 - String - 是 - 无。 - 无。 - password - 密码。 - String - 是 - 无。 - 无。 - url - 表地址。 - String - 是 - 无。 - RDS MySQL云数据库专有网络VPC地址,即内网地址,详情请参见查看或修改内外网地址和端口。 - URL的格式为: - jdbc:mysql://<内网地址>:<端口号>/<数据库名称>。说明- 对于结果表,建议在URL后面加上参数?rewriteBatchedStatements=true,以提高系统性能。 - maxRetryTimes - 查询维表或者写数据到结果表失败后,最多重试次数。 - Integer - 否 - 在Flink计算引擎VVR 4.0.7及以上版本,该参数默认值为10。 
- 在Flink计算引擎VVR 4.0.6及以下版本,该参数默认值为3。 
 - 无。 
- 结果表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - batchSize - 一次批量写入的条数。 - Integer - 否 - 在Flink计算引擎VVR 4.0.7及以上版本,该参数默认值为4096。 
- 在Flink计算引擎VVR 4.0.0~4.0.6版本,该参数默认值为5000。 
- 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为100。 
 - 无。 - bufferSize - 内存中缓存的最大数据条数。batchSize或 bufferSize任一到达阈值都会触发数据写操作。 - Integer - 否 - 10000 - 仅Flink计算引擎VVR 4.0.7及以上版本支持该参数。 
- 需指定主键后,该参数才生效。 
 - flushIntervalMs - flush内存缓冲区的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件(batchSize或bufferSize),系统会自动写出缓存中的所有数据到结果表。 - Integer - 否 - 在Flink计算引擎VVR 4.0.7及以上版本,该参数默认值为2000。 
- 在Flink计算引擎VVR 4.0.0~4.0.6版本,该参数默认值为0。 
- 在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为1000。 
 - 在默认值为0的版本中,如果不配置该参数,可能导致少量数据永远无法写出到结果表。建议您采用更高版本的Flink。 - ignoreDelete - 是否忽略数据Delete操作。 - Boolean - 否 - false - Flink SQL可能会生成数据Delete操作,在多个输出节点根据主键同时更新同一张结果表的不同字段的场景下,可能导致数据结果不正确。 - 例如一个任务在删除了一条数据后,另一个任务又只更新了这条数据的部分字段,其余未被更新的字段由于被删除,其值会变成null或默认值。通过将ignoreDelete设置为true,可以避免数据删除操作。 - connectionMaxActive - 数据库连接池大小 - Integer - 否 - 40 - 仅Flink计算引擎VVR 4.0.7及以上版本支持该参数。 
- 如果出现获取连接超时的问题,可能是连接池不够用,可适当增大连接池的大小。 
- 如果数据库能支持的最大并发连接比较小,可适当减小连接池大小或者减小作业节点并行度。 
 
- 维表独有 - 参数 - 说明 - 数据类型 - 是否必填 - 默认值 - 备注 - cache - 维表缓存策略。 - String - 否 - 在Flink计算引擎VVR 4.0.6之前版本,缓存策略默认值为NONE。 
- 在Flink计算引擎VVR 4.0.6及以上版本,缓存策略的默认值为ALL。 
 - Flink RDS MySQL连接器支持None、LRU和ALL三种缓存策略,取值含义详情请参见背景信息。 - cacheSize - 缓存大小。 - Integer - 否 - 100000 - 当选择LRU缓存策略后,需要设置缓存大小。 
- 当选择NONE或ALL缓存策略时,不必设置缓存大小。 
 - cacheTTLMs - 缓存超时时间。 - Long - 否 - 如果cache配置为NONE,则cacheTTLMs可以不配置,表示缓存不超时。 
- 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。 
- 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。 
 - 单位为毫秒。 - maxJoinRows - 主表中每一条数据查询维表时,匹配后最多返回的结果数。 - Integer - 否 - 1024 - 进行Join时,主表输入一条数据,对应维表匹配后,会返回的结果总数受该参数限制。 - 如果您可以预估一条数据对应的维表数据最多为n条,则可以设置 - maxJoinRows='n',以确保实时计算匹配处理效率。
类型映射
| Flink字段类型 | RDS MySQL字段类型 | 
| BOOLEAN | BOOLEAN | 
| TINYINT | TINYINT | 
| TINYINT(1) 说明  仅维表支持该映射。 | BOOLEAN | 
| SMALLINT | SMALLINT | 
| SMALLINT | TINYINT UNSIGNED | 
| INT | INT | 
| INT | SMALLINT UNSIGNED | 
| BIGINT | BIGINT | 
| BIGINT | INT UNSIGNED | 
| DECIMAL(20,0) | BIGINT UNSIGNED | 
| FLOAT | FLOAT | 
| DECIMAL | DECIMAL | 
| DOUBLE | DOUBLE | 
| DATE | DATE | 
| TIME | TIME | 
| TIMESTAMP | TIMESTAMP | 
| VARCHAR | VARCHAR | 
| VARBINARY | VARBINARY | 
使用示例
- 结果表 - CREATE TEMPORARY TABLE datagen_source( `name` VARCHAR, `age` INT ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE rds_sink( `name` VARCHAR, `age` INT ) WITH ( 'connector'='rds', 'password'='your-password', 'tableName'='your-tablename', 'url'='your-url', 'userName'='your-username' ); INSERT INTO rds_sink SELECT * FROM datagen_source;
- 维表 - CREATE TEMPORARY TABLE datagen_source( a INT, b BIGINT, c STRING, `proctime` AS PROCTIME() ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE rds_dim( a INT, b VARCHAR, c VARCHAR ) WITH ( 'connector'='rds', 'password'='<yourPassword>', 'tableName'='<yourTablename>', 'url'='jdbc:mysql://xxx', 'userName'='<yourUsername>' ); CREATE TEMPORARY TABLE blackhole_sink( a INT, b STRING ) WITH ( 'connector'='blackhole' ); INSERT INTO blackhole_sink SELECT T.a, H.b FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a=H.a;