Flink读写AnalyticDB for MySQL数据最佳实践
本文主要介绍实时计算 Flink 版在读取数据和写入数据的最佳实践,确保可以高效地读写云原生数据仓库 AnalyticDB MySQL 版的数据。
通过配置maxJoinRows参数指定维表查询返回的最大行数
在维表中,maxJoinRows
参数用于指定维表查询返回的最大行数,默认值为1024。因此在将AnalyticDB for MySQL表作为维表使用时,最多会返回1024行数据。若维表查询返回的行数超过该值,则超出的行会被丢弃。您可以在创建维表时,通过maxJoinRows
参数指定维表查询返回的最大行数。示例如下:
CREATE TEMPORARY TABLE `adb_dim_table`(
id BIGINT,
name VARCHAR,
code VARCHAR
) WITH (
'connector' = 'adb3.0',
'url' = 'jdbc:mysql://amv-uf648****.ads.aliyuncs.com/testdb',
'userName' = 'username',
'password' = 'password',
'tableName' = 'test_table',
'maxJoinRows' = '10240'
);
通过配置replaceMode参数指定数据写入方式
在结果表中,replaceMode
参数用于指定数据的写入方式,其默认值为true,即使用REPLACE INTO方式写入数据。若您想使用INSERT INTO方式写入数据,可以在创建结果表时将该参数设置为false。示例如下:
结果表必须定义主键,否则replaceMode
参数不会生效。
CREATE TEMPORARY TABLE adb_sink_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = 'jdbc:mysql://amv-uf6484****.ads.aliyuncs.com/testdb',
'userName' = 'username',
'password' = 'password',
'tableName' = 'test_table',
'replaceMode' = 'false'
);
通过配置ignoreDelete参数忽略DELETE操作
在多个Flink任务同时向一张AnalyticDB for MySQL表写入数据时,各个任务之间通过INSERT ON DUPLICATE KEY UPDATE的方式更新相同主键行的部分字段。若其中某个任务使用了ROW_NUMBER
等窗口函数,可能会导致数据回撤,整行数据也将被删除,从而导致其他任务写入的数据丢失。您可以在创建结果表时,通过ignoreDelete
参数忽略数据回撤时所触发的DELETE操作。示例如下:
在结果表中配置ignoreDelete=true
后,Flink将忽略所有接收到的DELETE操作。一般情况下,若业务数据仅包含新增、更新以及回撤产生的DELETE操作时,此配置不会对业务数据产生影响;但若业务数据需要执行DELETE操作,此配置会对业务数据产生影响。因此,请在配置该参数前,谨慎评估此行为是否符合业务预期。
CREATE TEMPORARY TABLE adb_sink_table (
`id` INT,
`num` BIGINT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'adb3.0',
'url' = 'jdbc:mysql://amv-uf6484****.ads.aliyuncs.com/testdb',
'userName' = 'username',
'password' = 'password****',
'tableName' = 'test_table',
'replaceMode' = 'false',
'ignoreDelete' = 'true'
);
相关文档
更多Flink读写AnalyticDB for MySQL数据的配置参数详情,请参见云原生数据仓库AnalyticDB MySQL版(ADB)3.0。