本文为您介绍云数据库RDS MySQL维表DDL定义、WITH参数、CACHE参数、类型映射和代码示例。
什么是云数据库RDS MySQL
RDS MySQL基于阿里巴巴的MySQL源码分支,经过双十一高并发、大数据量的考验,拥有优良的性能。RDS MySQL支持实例管理、账号管理、数据库管理、备份恢复、白名单、透明数据加密以及数据迁移等基本功能。RDS MySQL详情请参见概述。
前提条件
- 已创建RDS MySQL数据库和表,详情请参见创建数据库和账号。
- 已设置IP白名单,详情请参见通过客户端、命令行连接RDS MySQL实例。
使用限制
仅Flink计算引擎VVR 2.0.0及以上版本支持云数据库RDS MySQL Connector。
语法示例
CREATE TABLE rds_dim (
id1 INT,
id2 VARCHAR
) WITH (
'connector' = 'rds',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'cache' = 'ALL'
);
WITH参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
connector | 维表类型 | 是 | 固定值为rds 。
|
password | 密码 | 是 | 无 |
tableName | 表名 | 是 | 无 |
url | URL地址 | 是 | 云数据库RDS MySQL专有网络VPC地址,即内网地址,详情请参见查看或修改内外网地址和端口。URL的格式为:jdbc:mysql://<内网地址>/<databaseName> ,其中databaseName为对应的数据库名称。 |
userName | 用户名 | 是 | 无 |
maxRetryTimes | 写入数据失败后,重试写入的次数 | 否 | 默认值为3。 |
CACHE | 缓存策略、缓存大小和缓存超时时间 | 否 | 详情请参见CACHE参数。 |
CACHE参数
参数 | 说明 | 是否必填 | 备注 |
---|---|---|---|
cache | 缓存策略 | 否 | 云数据库RDS(DRDS)版维表支持以下三种缓存策略:
说明
|
cacheSize | 缓存大小 | 否 |
|
cacheTTLMs | 缓存超时时间,单位为毫秒 | 否 | cacheTTLMs配置和cache有关:
|
maxJoinRows | 主表中每一条数据查询维表时,匹配后最多返回的结果数。 | 否 | 默认值为1024。如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows='n',以确保实时计算匹配处理效率。
说明 进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。
|
类型映射
RDS字段类型 | Flink字段类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | SMALLINT |
INT | INT |
SMALLINT UNSIGNED | INT |
BIGINT | BIGINT |
INT UNSIGNED | BIGINT |
BIGINT UNSIGNED | DECIMAL(20,0) |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
VARBINARY | VARBINARY |
代码示例
CREATE TEMPORARY TABLE datagen_source(
a INT,
b BIGINT,
c STRING,
`proctime` AS PROCTIME()
) with (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE rds_dim (
a INT,
b VARCHAR,
c VARCHAR
) with (
'connector' = 'rds',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'url' = 'jdbc:mysql://xxx',
'userName' = '<yourUsername>'
);
CREATE TEMPORARY TABLE blackhole_sink(
a INT,
b STRING
) with (
'connector' = 'blackhole'
);
insert into blackhole_sink select T.a,H.b
FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
常见问题
为什么MySQL物理表(包含RDS MySQL和ADB)的INT UNSIGNED字段类型,在Flink SQL中要被声明为其他类型?