本文介绍如何将开源Flink中的数据导入AnalyticDB for MySQL数仓版集群。
前提条件
下载Flink驱动,并将其部署到Flink所有节点的${flink部署目录}/lib目录下。您可以根据Flink版本下载对应的驱动:
Flink 1.11版本:flink-connector-jdbc_2.11-1.11.0.jar
Flink 1.12版本:flink-connector-jdbc_2.11-1.12.0.jar
Flink 1.13版本:flink-connector-jdbc_2.11-1.13.0.jar
如需其他版本的驱动,请前往JDBC SQL Connector 页面下载。
下载MySQL驱动,并将其部署到Flink所有节点的${flink部署目录}/lib目录下。
说明MySQL驱动版本需为5.1.40或以上,请前往MySQL驱动下载页面下载。
部署所有的JAR包后请重启Flink集群。启动方式,请参见Start a Cluster。
已在目标AnalyticDB for MySQL集群中创建数据库和数据表,用于保存需要写入的数据。数据库和数据表的创建方法,请参见CREATE DATABASE和CREATE TABLE。
说明本文示例中创建的数据库名称为
tpch
,建库语句如下:CREATE DATABASE IF NOT EXISTS tpch;
本文示例中创建的数据表名为
person
,建表语句如下:CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
如果您的AnalyticDB for MySQL集群是弹性模式,您需要在集群信息页面的网络信息区域,打开启用ENI网络的开关。
注意事项
本文仅介绍通过Flink SQL创建表并写入数据至AnalyticDB for MySQL的方法。通过Flink JDBC API写入数据的方法,请参见JDBC Connector。
本文介绍的方法仅适用于Flink1.11及以上版本。若您需要将其他版本的Flink数据写入AnalyticDB for MySQL集群,那么:
针对Flink1.10和Flink1.9版本,数据写入方法,请参见Flink 1.10 Documentation: Connect to External Systems。
针对Flink1.8及以下版本,数据写入方法,请参见Flink 1.8 Documentation: Connect to External Systems。
流程介绍
本文示例以CSV格式的文件作为输入源介绍数据写入流程。
步骤 | 说明 |
创建一个新的CSV文件并在文件中写入源数据,然后将新文件部署至Flink所有节点的/root下。 | |
通过SQL语句在Flink中创建源表和结果表,并通过源表和结果表将数据写入AnalyticDB for MySQL中。 | |
登录AnalyticDB for MySQL目标数据库,来查看并验证源数据是否成功导入。 |
步骤一:数据准备
在其中一个Flink节点的root目录下,执行
vim /root/data.csv
命令来创建一个名为data.csv的CSV文件。文件中包含的数据如下(您可以多复制几行相同的数据来增加写入的数据量):
0,json00,20 1,json01,21 2,json02,22 3,json03,23 4,json04,24 5,json05,25 6,json06,26 7,json07,27 8,json08,28 9,json09,29
文件创建完成后,将其部署至Flink其他节点的/root目录下。
步骤二:数据写入
启动并运行Flink SQL程序。详细操作步骤,请参见Starting the SQL Client CLI。
创建一张名为
csv_person
的源表,语句如下:CREATE TABLE if not exists csv_person ( `user_id` STRING, `user_name` STRING, `age` INT ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///root/data.csv', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true', 'csv.allow-comments' = 'true' );
说明源表中的列名和数据类型需与AnalyticDB for MySQL中目标表的列名和数据类型保持一致。
建表语句中填写的
path
是data.csv的本地路径(Flink各个节点的路径均需一致)。如果您的data.csv文件不在本地,请根据实际情况填写正确的路径。关于建表语句中的其他参数说明,请参见FileSystem SQL Connector。
创建一张名为
mysql_person
的结果表,语句如下:CREATE TABLE mysql_person ( user_id String, user_name String, age INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true', 'table-name' = '<table_name>', 'username' = '<username>', 'password' = '<password>', 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '1s' );
说明结果表中的列名和数据类型需与AnalyticDB for MySQL中目标表的列名和数据类型保持一致。
下表仅列举了连接AnalyticDB for MySQL集群时的必填配置项,关于选填配置项的信息,请参见Connector Options。
必填配置项
说明
connector
指定Flink使用的连接器类型,选择
jdbc
。url
AnalyticDB for MySQL集群的JDBC URL。
格式:
jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true'
,其中:endpoint
:目标AnalyticDB for MySQL集群的连接地址。说明如果需要使用公网地址连接集群,您需要先申请公网地址,申请方法,请参见申请/释放公网地址。
db_name
:AnalyticDB for MySQL中的目标数据库名。useServerPrepStmts=false&rewriteBatchedStatements=true
:批量写入数据至AnalyticDB for MySQL的必填配置,用于提高写入性能,以及降低对AnalyticDB for MySQL集群的压力。
示例:
jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true
。table-name
AnalyticDB for MySQL中的目标表名,用于存储写入的数据。本文示例中目标表名为
person
。username
AnalyticDB for MySQL中具有写入权限的数据库账号名。
说明您可以通过SHOW GRANTS查看当前账号所拥有的权限。
您可以通过GRANT语句为目标账号授予权限。
password
AnalyticDB for MySQL中具有写入权限的数据库账号密码。
sink.buffer-flush.max-rows
从Flink写入数据至AnalyticDB for MySQL时,一次批量写入的最大行数。Flink会接收实时数据,当接收到的数据行数达到最大写入行数后,再将数据批量写入AnalyticDB for MySQL集群。可选取值如下:
0:最大行数为0时,批量写入数据功能仅考虑
sink.buffer-flush.interval
配置,即只要满足最大间隔时间就会开始批量写入。具体的行数,例如1000、2000等。
说明不建议将该参数设置为0。取值为0不仅会导致写入性能变差,也会导致AnalyticDB for MySQL集群执行并发查询时的压力变大。
当
sink.buffer-flush.max-rows
和sink.buffer-flush.interval
配置均不为0时,批量写入功能生效规则如下:若Flink接收到的数据量已达到
sink.buffer-flush.max-rows
所设的值,但最大时间间隔还未到达sink.buffer-flush.interval
所设的值,那么Flink无需等待间隔期满,即可直接触发批量写入数据至AnalyticDB for MySQL。若Flink接收到的数据量未达到
sink.buffer-flush.max-rows
所设的值,但间隔时间已达到sink.buffer-flush.interval
所设的值,那么无论Flink接收了多少数据量,都直接触发批量写入数据至AnalyticDB for MySQL。
sink.buffer-flush.interval
Flink批量写入数据至AnalyticDB for MySQL的最大间隔时间,即执行下一次批量写入数据前的最大等待时间,可选取值如下:
0:时间间隔为0时,批量写入数据功能仅考虑
sink.buffer-flush.max-rows
配置,即只要Flink接收到的数据行数达到最大写入行数后就会开始批量写入。具体的时间间隔,例如1d、1h、1min、1s、1ms等。
说明不建议将该参数设置为0,避免在业务低谷期产生源数据较少的场景下,影响数据导入的及时性。
使用
INSERT INTO
语句导入数据,当主键重复时会自动忽略当前写入数据,数据不做更新,作用等同于INSERT IGNORE INTO
,更多信息,请参见INSERT INTO。语句如下:INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;
步骤三:数据验证
导入完成后,您可以登录AnalyticDB for MySQL集群的目标库tpch
,执行如下语句查看并验证源数据是否成功导入至目标表person
中:
SELECT * FROM person;