Flink数据导入

本文介绍如何将开源Flink中的数据导入AnalyticDB for MySQL数仓版集群。

前提条件

  • 下载Flink驱动,并将其部署到Flink所有节点的${flink部署目录}/lib目录下。您可以根据Flink版本下载对应的驱动:

    如需其他版本的驱动,请前往JDBC SQL Connector 页面下载。

  • 下载MySQL驱动,并将其部署到Flink所有节点的${flink部署目录}/lib目录下。

    说明

    MySQL驱动版本需为5.1.40或以上,请前往MySQL驱动下载页面下载。

  • 部署所有的JAR包后请重启Flink集群。启动方式,请参见Start a Cluster

  • 已在目标AnalyticDB for MySQL集群中创建数据库和数据表,用于保存需要写入的数据。数据库和数据表的创建方法,请参见CREATE DATABASECREATE TABLE

    说明
    • 本文示例中创建的数据库名称为tpch,建库语句如下:

      CREATE DATABASE IF NOT EXISTS tpch;
    • 本文示例中创建的数据表名为person,建表语句如下:

      CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
  • 如果您的AnalyticDB for MySQL集群是弹性模式,您需要在集群信息页面的网络信息区域,打开启用ENI网络的开关。启用ENI网络

注意事项

流程介绍

说明

本文示例以CSV格式的文件作为输入源介绍数据写入流程。

步骤

说明

步骤一:数据准备

创建一个新的CSV文件并在文件中写入源数据,然后将新文件部署至Flink所有节点的/root下。

步骤二:数据写入

通过SQL语句在Flink中创建源表和结果表,并通过源表和结果表将数据写入AnalyticDB for MySQL中。

步骤三:数据验证

登录AnalyticDB for MySQL目标数据库,来查看并验证源数据是否成功导入。

步骤一:数据准备

  1. 在其中一个Flink节点的root目录下,执行vim /root/data.csv命令来创建一个名为data.csv的CSV文件。

    文件中包含的数据如下(您可以多复制几行相同的数据来增加写入的数据量):

    0,json00,20
    1,json01,21
    2,json02,22
    3,json03,23
    4,json04,24
    5,json05,25
    6,json06,26
    7,json07,27
    8,json08,28
    9,json09,29
  2. 文件创建完成后,将其部署至Flink其他节点的/root目录下。

步骤二:数据写入

  1. 启动并运行Flink SQL程序。详细操作步骤,请参见Starting the SQL Client CLI

  2. 创建一张名为csv_person的源表,语句如下:

    CREATE TABLE if not exists csv_person (
      `user_id` STRING,
      `user_name` STRING,
      `age` INT
    ) WITH (
      'connector' = 'filesystem',
      'path' = 'file:///root/data.csv',
      'format' = 'csv',
      'csv.ignore-parse-errors' = 'true',
      'csv.allow-comments' = 'true'
    );
    说明
    • 源表中的列名和数据类型需与AnalyticDB for MySQL中目标表的列名和数据类型保持一致。

    • 建表语句中填写的pathdata.csv的本地路径(Flink各个节点的路径均需一致)。如果您的data.csv文件不在本地,请根据实际情况填写正确的路径。

      关于建表语句中的其他参数说明,请参见FileSystem SQL Connector

  3. 创建一张名为mysql_person的结果表,语句如下:

    CREATE TABLE mysql_person (
      user_id String,
      user_name String,
      age INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true',
      'table-name' = '<table_name>',
      'username' = '<username>',
      'password' = '<password>',
      'sink.buffer-flush.max-rows' = '10',
      'sink.buffer-flush.interval' = '1s'
      );
    说明
    • 结果表中的列名和数据类型需与AnalyticDB for MySQL中目标表的列名和数据类型保持一致。

    • 下表仅列举了连接AnalyticDB for MySQL集群时的必填配置项,关于选填配置项的信息,请参见Connector Options

    必填配置项

    说明

    connector

    指定Flink使用的连接器类型,选择jdbc

    url

    AnalyticDB for MySQL集群的JDBC URL。

    格式:jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true',其中:

    • endpoint:目标AnalyticDB for MySQL集群的连接地址。

      说明

      如果需要使用公网地址连接集群,您需要先申请公网地址,申请方法,请参见申请/释放公网地址

    • db_nameAnalyticDB for MySQL中的目标数据库名。

    • useServerPrepStmts=false&rewriteBatchedStatements=true:批量写入数据至AnalyticDB for MySQL的必填配置,用于提高写入性能,以及降低对AnalyticDB for MySQL集群的压力。

    示例:jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true

    table-name

    AnalyticDB for MySQL中的目标表名,用于存储写入的数据。本文示例中目标表名为person

    username

    AnalyticDB for MySQL中具有写入权限的数据库账号名。

    说明
    • 您可以通过SHOW GRANTS查看当前账号所拥有的权限。

    • 您可以通过GRANT语句为目标账号授予权限。

    password

    AnalyticDB for MySQL中具有写入权限的数据库账号密码。

    sink.buffer-flush.max-rows

    从Flink写入数据至AnalyticDB for MySQL时,一次批量写入的最大行数。Flink会接收实时数据,当接收到的数据行数达到最大写入行数后,再将数据批量写入AnalyticDB for MySQL集群。可选取值如下:

    • 0:最大行数为0时,批量写入数据功能仅考虑sink.buffer-flush.interval配置,即只要满足最大间隔时间就会开始批量写入。

    • 具体的行数,例如10002000等。

    说明

    不建议将该参数设置为0。取值为0不仅会导致写入性能变差,也会导致AnalyticDB for MySQL集群执行并发查询时的压力变大。

    sink.buffer-flush.max-rowssink.buffer-flush.interval配置均不为0时,批量写入功能生效规则如下:

    • 若Flink接收到的数据量已达到sink.buffer-flush.max-rows所设的值,但最大时间间隔还未到达sink.buffer-flush.interval所设的值,那么Flink无需等待间隔期满,即可直接触发批量写入数据至AnalyticDB for MySQL

    • 若Flink接收到的数据量未达到sink.buffer-flush.max-rows所设的值,但间隔时间已达到sink.buffer-flush.interval所设的值,那么无论Flink接收了多少数据量,都直接触发批量写入数据至AnalyticDB for MySQL

    sink.buffer-flush.interval

    Flink批量写入数据至AnalyticDB for MySQL的最大间隔时间,即执行下一次批量写入数据前的最大等待时间,可选取值如下:

    • 0:时间间隔为0时,批量写入数据功能仅考虑sink.buffer-flush.max-rows配置,即只要Flink接收到的数据行数达到最大写入行数后就会开始批量写入。

    • 具体的时间间隔,例如1d1h1min1s1ms等。

    说明

    不建议将该参数设置为0,避免在业务低谷期产生源数据较少的场景下,影响数据导入的及时性。

  4. 使用INSERT INTO语句导入数据,当主键重复时会自动忽略当前写入数据,数据不做更新,作用等同于INSERT IGNORE INTO,更多信息,请参见INSERT INTO。语句如下:

    INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;

步骤三:数据验证

导入完成后,您可以登录AnalyticDB for MySQL集群的目标库tpch,执行如下语句查看并验证源数据是否成功导入至目标表person中:

SELECT * FROM person;