本文为您介绍如何使用云原生数据仓库AnalyticDB MySQL版3.0连接器。
背景信息
云原生数据仓库AnalyticDB MySQL版3.0是融合数据库、大数据技术于一体的云原生企业级数据仓库服务。AnalyticDB MySQL版支持高吞吐的数据实时增删改、低延时的实时分析和复杂ETL,兼容上下游生态工具,可用于构建企业级报表系统、数据仓库和数据服务引擎。
ADB MySQL 3.0连接器支持的信息如下。
类别 | 详情 |
支持类型 | 维表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
已创建AnalyticDB MySQL集群并创建表,详情请参见创建集群和CREATE TABLE。
已设置白名单,详情请参见设置白名单。
使用限制
仅支持作为维表和结果表,不支持作为源表。
仅Flink计算引擎VVR 3.x及以上版本支持云原生数据仓库AnalyticDB MySQL版3.0连接器。
语法结构
CREATE TEMPORARY TABLE adb_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
Flink DDL中定义的主键必须和AnalyticDB MySQL数据库物理表中的主键保持一致,主键一致包括是否存在主键和主键名称一致。如果不一致,会影响数据正确性。
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
结果表类型。
String
是
无
固定值为adb3.0。
url
JDBC连接地址。
String
是
无
云原生数据仓库AnalyticDB MySQL版数据库的JDBC链接地址。固定格式为jdbc:mysql://<endpoint>:<port>/<databaseName>,其中:
endpoint和port:您可以登录AnalyticDB 控制台,单击对应的集群名称,进入集群信息页面,在网络信息中获取。
databaseName:云原生数据仓库AnalyticDB MySQL版数据库名称。
userName
用户名。
String
是
无
无。
password
密码。
String
是
无
无。
tableName
表名。
String
是
无
无。
maxRetryTimes
写入或读取数据失败后,重试的最大次数。
Integer
否
参数默认值取值情况如下。
在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为3。
在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为10。
无。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
batchSize
一次批量写入的条数。
Integer
否
参数默认值取值情况如下:
在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为100。
在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为1000。
需指定主键后,该参数才生效。
bufferSize
内存中缓存的数据条数。batchSize或bufferSize任一到达阈值都会触发写入。
Integer
否
1000
需指定主键后,该参数才生效。
说明仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
flushIntervalMs
清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
Integer
否
参数默认值取值情况如下:
在Flink计算引擎VVR 3.x版本及以下版本,该参数默认值为1000。
在Flink计算引擎VVR 4.0.10及以上版本,该参数默认值为3000。
单位为毫秒。
ignoreDelete
是否忽略Delete操作。
Boolean
否
false
参数取值如下:
true:忽略Delete操作。
false:接受Delete操作。
说明仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
replaceMode
DDL中定义了主键的情况下,是否采用replace into语法插入数据。
Boolean
否
true
该参数取值如下:
true:采用
replace into
语法插入数据。false:采用
insert into on duplicate key update
语法插入数据。
说明仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
仅AnalyticDB MySQL 3.1.3.5及以上版本支持该参数。
此参数仅在DDL中定义了主键时才生效,插入数据时采用的语法详情如下:
DDL中定义了主键且replaceMode=true,采用
replace into
语法插入数据。DDL中定义了主键且replaceMode=false,采用
insert into on duplicate key update
语法插入数据。DDL中没有定义主键,采用
insert into
语法插入数据。
excludeUpdateColumns
表示更新主键值相同的数据时,忽略指定字段的更新。
String
否
空字符串
如果忽略指定的字段为多个时,则需要使用英文逗号(,)分割。例如
excludeUpdateColumns=column1,column2
。说明仅在replaceMode=false时,该参数才生效。在replaceMode=true时,对应字段会被更新为null。
要忽略的多个字段需要写在一行中,不能换行。
connectionMaxActive
线程池大小。
Integer
否
40
仅Flink计算引擎VVR 4.0.10及以上版本支持该参数。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
cache
缓存策略。
String
否
ALL
云原生数据仓库AnalyticDB MySQL版3.0维表支持以下三种缓存策略:
None:无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
适用于远程表数据量小且MISS KEY在源表数据和维表JOIN时,ON条件无法关联特别多的场景。
说明如果使用CACHE ALL时,请注意节点内存大小,防止出现OOM。
因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
cacheSize
缓存大小,即缓存多少行数据。
Integer
否
100000
cacheSize配置和cache为LRU有关。当cache配置为LRU时,必须配置cacheSize参数。
cacheTTLMs
缓存超时时间,单位为毫秒。
Integer
否
Long.MAX_VALUE
cacheTTLMs配置和cache配置为LRU或ALL有关:
如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间。默认值为
Long.MAX_VALUE
,即代表缓存不过期。如果cache配置为ALL,则cacheTTLMs为物理表数据被重新加载的间隔时间。默认值为
Long.MAX_VALUE
,即代表不重新加载物理表数据。
说明如果cache配置为None,则cacheTTLMs不用配置。因为cache配置为None,表示没有缓存,因此不用配置缓存超时时间。
maxJoinRows
主表中每一条数据查询维表时,匹配后最多返回的结果数。
Integer
否
1024
如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows='n',以确保实时计算匹配处理效率。
说明进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。
类型映射
云原生数据仓库AnalyticDB MySQL版3.0字段类型 | Flink字段类型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) 或NUMERIC(p, s) | DECIMAL(p, s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
使用示例
结果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO adb_sink SELECT * FROM datagen_source;
维表
CREATE TEMPORARY TABLE datagen_source( `a` INT, `b` VARCHAR, `c` STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_dim ( `a` INT, `b` VARCHAR, `c` VARCHAR ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); CREATE TEMPORARY TABLE blackhole_sink( `a` INT, `b` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;