本文为您介绍如何创建实时计算云数据库RDS MySQL版结果表,以及创建结果表时使用的WITH参数和类型映射。

什么是云数据库RDS MySQL版

RDS MySQL基于阿里巴巴的MySQL源码分支,经过双十一高并发、大数据量的考验,拥有优良的性能。RDS MySQL支持实例管理、账号管理、数据库管理、备份恢复、白名单、透明数据加密以及数据迁移等基本功能。除此之外还提供如下高级功能:

  • 专属集群MyBase:是由多台主机(底层服务器,如ECS I2服务器、神龙服务器)组成的集群,相对于全托管数据库,可以满足您更多的需求。
  • 只读实例:在对数据库有少量写请求,但有大量读请求的应用场景下,单个实例可能无法承受读取压力,甚至对业务产生影响。为了实现读取能力的弹性扩展,分担数据库压力,您可以创建一个或多个只读实例,利用只读实例满足大量的数据库读取需求,增加应用的吞吐量。
  • 读写分离:读写分离功能是在只读实例的基础上,额外提供了一个读写分离地址,联动主实例及其所有只读实例,创建自动的读写分离链路。应用程序只需连接读写分离地址进行数据读取及写入操作,读写分离程序会自动将写入请求发往主实例,而将读取请求按照权重发往各个只读实例。用户只需通过添加只读实例的个数,即可不断扩展系统的处理能力,应用程序上无需做任何修改。
  • 什么是数据库代理:数据库独享代理服务是使用独立代理计算资源为当前实例提供代理服务,提供更多高级功能,例如读写分离、短连接优化、事务拆分等。
  • 自治服务DAS:针对SQL语句性能、CPU使用率、IOPS使用率、内存使用率、磁盘空间使用率、连接数、锁信息、热点表等,DAS提供了智能的诊断及优化功能,能最大限度发现数据库存在的或潜在的健康问题。DAS的诊断基于单个实例,会提供问题详情及相应的解决方案,为您维护实例带来极大的便利。

RDS MySQL仅支持InnoDB和X-Engine两种存储引擎,具体的功能请参见MySQL功能概览

使用限制

实时计算Flink版暂不支持通过数据存储功能中存储注册的方式使用RDS MySQL 8.0版本,建议您使用明文方式使用RDS MySQL 8.0版本。数据存储功能详情请参见概述

语法示例

实时计算Flink版支持使用RDS MySQL版作为结果输出,示例代码如下。
CREATE TABLE rds_output(
   id INT,
   len INT,
   content VARCHAR,
   PRIMARY KEY (id,len)
) WITH (
   type='rds',
   url='<yourDatabaseURL>',
   tableName='<yourDatabaseTable>',
   userName='<yourDatabaseUserName>',
   password='<yourDatabasePassword>'
);        
说明
  • 实时计算Flink版写入RDS MySQL数据库结果表原理:针对实时计算Flink版每行结果数据,拼接成一行SQL语句,输入至目标端数据库,然后执行。如果使用批量写,需要在URL后面加上参数?rewriteBatchedStatements=true,以提高系统性能。
  • RDS MySQL数据库支持自增主键。如果实时计算Flink版写入数据支持自增主键,则在DDL中不声明该自增字段即可。例如,ID是自增字段,实时计算Flink版DDL不声明该自增字段,则数据库在一行数据写入过程中会自动填补相关自增字段。
  • 如果DRDS有分区表,拆分键必须在实时计算Flink版DDL里PRIMARY KEY()中声明,否则拆分的表无法写入。
  • 建议使用数据存储注册方式,请参见注册云数据库RDS版
  • DDL声明的字段必须至少存在一个非主键的字段,否则产生报错。

WITH参数

参数 说明 是否必填 备注
type 结果表类型 固定值为rds
url JDBC(Java DataBase Connectivity)连接地址 URL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中databaseName为对应的数据库名称。内网地址参见如下链接:
说明 如果访问通过VPC访问授权过的RDS,对应的URL配置请参见VPC访问授权
tableName 表名
userName 用户名
password 密码
maxRetryTimes 最大重试次数 默认值为10。
batchSize 一次批量写入的条数 默认值为4096。
bufferSize 流入多少条数据后开始去重 默认值为10000。
flushIntervalMs 清空缓存的时间间隔 默认值为2000,单位为毫秒。表示如果缓存中的数据在等待2秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
excludeUpdateColumns 忽略指定字段的更新 默认值为空(默认忽略PRIMARY KEY字段)。表示更新主键值相同的数据时,忽略指定字段的更新。
ignoreDelete 是否忽略delete操作 默认值为false,表示支持delete功能。
partitionBy 分区 默认为空。表示写入Sink节点前,会根据该值进行Hash分区,数据会流向相应的Hash节点。

类型映射

RDS字段类型 实时计算Flink版字段类型
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DECIMAL DECIMAL
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
VARCHAR VARCHAR
VARBINARY VARBINARY

JDBC连接参数

参数名称 说明 默认值 最低版本要求
useUnicode 是否使用Unicode字符集,如果参数characterEncoding设置为GB2312或GBK,本参数值必须设置为true。 false 1.1g
characterEncoding 当useUnicode设置为true时,指定字符编码。例如可设置为GB2312或GBK。 false 1.1g
autoReconnect 当数据库连接异常中断时,是否自动重新连接。 false 1.1
autoReconnectForPools 是否使用针对数据库连接池的重连策略。 false 3.1.3
failOverReadOnly 自动重连成功后,连接是否设置为只读。 true 3.0.12
maxReconnects autoReconnect设置为true时,重试连接的次数。 3 1.1
initialTimeout autoReconnect设置为true时,两次重连之间的时间间隔,单位为秒。 2 1.1
connectTimeout 和数据库服务器建立socket连接时的连接超时时长,单位为毫秒。0表示永不超时,适用于JDK 1.4及以上版本。 0 3.0.1
socketTimeout socket操作(读写)超时,单位:毫秒。0表示永不超时。 0 3.0.1

代码示例

包含云数据库RDS版结果表的实时计算Flink版作业代码示例如下。
CREATE TABLE source (
   id INT,
   len INT,
   content VARCHAR
) with (
   type = 'random'
);

CREATE TABLE rds_output(
   id INT,
   len INT,
   content VARCHAR,
   PRIMARY KEY (id,len)
) WITH (
   type='rds',
   url='<yourDatabaseURL>',
   tableName='<yourDatabaseTable>',
   userName='<yourDatabaseUserName>',
   password='<yourDatabasePassword>'
);

INSERT INTO rds_output
SELECT id, len, content FROM source;

FAQ

  • Q:实时计算Flink版的结果数据写入RDS表,是按主键更新的,还是生成1条新的记录?

    A:如果在DDL中定义了主键,会采用INSERT INTO tablename(field1,field2, field3, ...) VALUES(value1, value2, value3, ...) ON DUPLICATE KEY UPDATE field1=value1,field2=value2, field3=value3, ...;的方式更新记录,即对于不存在的主键字段会直接插入,存在的主键字段则更新相应的值。如果DDL中没有声明PRIMARY KEY,则会用insert into方式插入记录,追加数据。

  • Q:使用RDS表中的唯一索引进行GROUP BY时需要注意什么?
    A:
    • 需要在作业中的GROUP BY中声明该唯一索引。
    • RDS中只有一个自增主键,实时计算Flink版作业中不能声明为PRIMARY KEY。