本文为您介绍如何使用云原生数据仓库AnalyticDB MySQL版3.0连接器。
背景信息
云原生数据仓库AnalyticDB MySQL版3.0是融合数据库、大数据技术于一体的云原生企业级数据仓库服务。AnalyticDB MySQL版支持高吞吐的数据实时增删改、低延时的实时分析和复杂ETL,兼容上下游生态工具,可用于构建企业级报表系统、数据仓库和数据服务引擎。
ADB MySQL 3.0连接器支持的信息如下。
类别 | 详情 |
支持类型 | 源表、维表和结果表 说明 仅Flink计算引擎VVR 8.0.4及以上版本支持源表,源表的参数和配置详情请参见Flink订阅Binlog,维表和结果表参数详情请参见WITH参数。 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不适用 |
特有监控指标 | 暂无 |
API种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
已创建AnalyticDB MySQL集群并创建表,详情请参见创建集群和CREATE TABLE。
已设置白名单,详情请参见设置白名单。
语法结构
CREATE TEMPORARY TABLE adb_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>'
);
Flink DDL中定义的主键必须和AnalyticDB MySQL数据库物理表中的主键保持一致,主键一致包括是否存在主键和主键名称一致。如果不一致,会影响数据正确性。
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
结果表类型。
String
是
无
固定值为adb3.0。
url
JDBC连接地址。
String
是
无
云原生数据仓库AnalyticDB MySQL版数据库的JDBC链接地址。固定格式为jdbc:mysql://<endpoint>:<port>/<databaseName>,其中:
endpoint和port:您可以登录AnalyticDB 控制台,单击对应的集群名称,进入集群信息页面,在网络信息中获取。
databaseName:云原生数据仓库AnalyticDB MySQL版数据库名称。
userName
用户名。
String
是
无
无。
password
密码。
String
是
无
无。
tableName
表名。
String
是
无
无。
maxRetryTimes
写入或读取数据失败后,重试的最大次数。
Integer
否
10
无。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
batchSize
一次批量写入的条数。
Integer
否
1000
需指定主键后,该参数才生效。
bufferSize
内存中缓存的数据条数。batchSize或bufferSize任一到达阈值都会触发写入。
Integer
否
1000
需指定主键后,该参数才生效。
flushIntervalMs
清空缓存的时间间隔。表示如果缓存中的数据在等待指定时间后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
Integer
否
3000
单位为毫秒。
ignoreDelete
是否忽略Delete操作。
Boolean
否
false
参数取值如下:
true:忽略Delete操作。
false:接受Delete操作。
replaceMode
DDL中定义了主键的情况下,以哪种模式插入数据。
VVR 11.2及以上版本的类型为String。
低于VVR 11.2的版本的类型为Boolean。
否
VVR 11.2及以上版本的默认值为replace。
低于VVR 11.2的版本的默认值为true。
VVR 11.2及以上版本可配置的值如下:
replace:采用
replace into
语法写数据。当主键重复时,后出现的数据整行替换已有数据。upsert:采用
insert into on duplicate key update
语法写数据,主键不存在时插入数据,主键存在时更新数据。例如ADB表有a(主键)、b、c、d四个字段,若结果表仅提供a(主键)和b两个字段,在主键重复的情况下,系统只会更新b字段,c和d保持不变。insert:采用
insert ignore into
语法写数据。当主键重复时,保留首次出现的数据,忽略后续数据。
低于VVR 11.2的版本仅支持Boolean类型配置:
true:含义与参数值replace相同
false:含义与参数值upsert相同
注意:VVR 11.2及以上版本兼容旧版本的true和false取值。
说明仅AnalyticDB MySQL 3.1.3.5及以上版本支持该参数。
此参数仅在结果表DDL中定义了主键时才生效。如果结果表DDL中没有定义主键,则固定采用
insert ignore into
语法插入数据。
excludeUpdateColumns
表示更新主键值相同的数据时,忽略指定字段的更新
String
否
空字符串
如果需要忽略更新的字段为多个时,则需要使用英文逗号(,)分割。例如
excludeUpdateColumns='column1,column2'
。例如结果表有a(主键)、b、c、d四个字段,配置excludeUpdateColumns='c,d'。主键不重复时,会插入a、b、c、d四个字段的值。主键重复时,系统只会更新b字段,而c和d字段的值保持不变。
说明仅在replaceMode='upsert'或replaceMode='false'时,该参数才生效。
要忽略的多个字段需要写在一行中,不能换行。
connectionMaxActive
线程池大小。
Integer
否
40
无。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
cache
缓存策略。
String
否
ALL
云原生数据仓库AnalyticDB MySQL版3.0维表支持以下三种缓存策略:
None:无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
适用于远程表数据量小且MISS KEY在源表数据和维表JOIN时,ON条件无法关联特别多的场景。
说明如果使用CACHE ALL时,请注意节点内存大小,防止出现OOM。
因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
cacheSize
缓存大小,即缓存多少行数据。
Integer
否
100000
cacheSize配置和cache为LRU有关。当cache配置为LRU时,必须配置cacheSize参数。
cacheTTLMs
缓存超时时间,单位为毫秒。
Integer
否
Long.MAX_VALUE
cacheTTLMs配置和cache配置为LRU或ALL有关:
如果cache配置为LRU,则cacheTTLMs为缓存失效的超时时间。默认值为
Long.MAX_VALUE
,即代表缓存不过期。如果cache配置为ALL,则cacheTTLMs为物理表数据被重新加载的间隔时间。默认值为
Long.MAX_VALUE
,即代表不重新加载物理表数据。
说明如果cache配置为None,则cacheTTLMs不用配置。因为cache配置为None,表示没有缓存,因此不用配置缓存超时时间。
maxJoinRows
主表中每一条数据查询维表时,匹配后最多返回的结果数。
Integer
否
1024
如果您可以预估一条数据对应的维表数据最多为n条,则可以设置maxJoinRows='n',以确保实时计算匹配处理效率。
说明进行Join时,主表输入一条数据,对应维表匹配后返回的结果总数受该参数限制。
类型映射
云原生数据仓库AnalyticDB MySQL版3.0字段类型 | Flink字段类型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) 或NUMERIC(p, s) | DECIMAL(p, s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
使用示例
结果表
CREATE TEMPORARY TABLE datagen_source ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_sink ( `name` VARCHAR, `age` INT ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO adb_sink SELECT * FROM datagen_source;
维表
CREATE TEMPORARY TABLE datagen_source( `a` INT, `b` VARCHAR, `c` STRING, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE adb_dim ( `a` INT, `b` VARCHAR, `c` VARCHAR ) WITH ( 'connector' = 'adb3.0', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); p CREATE TEMPORARY TABLE blackhole_sink( `a` INT, `b` VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT T.a,H.b FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;