利用Flink CDC实现数据同步至Delta Table

MaxCompute为您提供对接Flink CDC的新版插件Connector连接器。您可以通过对接Flink CDC,将数据源(例如MySQL)数据实时同步至MaxCompute的目标表(普通表或Delta表)。本文为您介绍MaxCompute新版插件的能力支持情况与主要操作流程。

Flink CDC背景介绍

Flink CDC是一个端到端的开源实时数据集成工具,定义了一套功能完整的编程接口和ETL数据处理框架,用户可通过提交Flink作业使用其功能,详情请参见Flink CDC。Flink CDC深度集成并由Apache Flink驱动,提供以下核心功能:

  • 端到端的数据集成框架。

  • 为数据集成的用户提供了易于构建作业的API。

  • 支持在Source(数据源)和Sink(输出端)中处理多个表。

  • 整库同步。

  • 具备表结构变更自动同步的能力(Schema Evolution)。

前提条件

已创建MaxCompute项目,详情请参见创建MaxCompute项目

注意事项

  • 数据同步Connector连接器支持自动建表,将MaxCompute表与源表的位置关系、数据类型进行自动映射。当源表有主键时,会自动创建Delta表,否则会创建MaxCompute普通表。映射详情请参见表位置映射数据类型映射

  • 当数据写入至普通表时,系统会忽略DELETE操作,UPDATE操作会被视为INSERT操作。

  • 目前仅支持at-least-once,Delta表由于主键特性能够实现幂等写。

  • 对于表结构变更同步。

  • 新增列只能添加到最后一列。

  • 修改列类型,只能修改为兼容的数据类型。数据类型兼容表详情请参见更改列数据类型

快速开始

本文将基于Flink CDC,快速构建MySQL到MaxCompute的Streaming ETL作业(MySQL to MaxCompute),实现Flink CDC Pipeline的编写。其中包含整库同步、表结构变更同步和分库分表同步功能。

环境准备

准备Flink Standalone集群

  1. 下载flink-1.18.0-bin-scala_2.12.tgz并解压,解压后得到flink-1.18.0目录。进入flink-1.18.0目录,执行以下命令,将FLINK_HOME设置为flink-1.18.0的安装目录。

    export FLINK_HOME=$(pwd)
  2. $flink-1.18.0/conf目录下执行vim flink-conf.yaml命令,在配置文件中追加下列参数并保存。

    # 开启checkpoint,每隔3秒做一次checkpoint
    # 仅作测试使用,实际作业checkpoint间隔时间不建议低于30s
    execution.checkpointing.interval: 3000
    
    # 由于flink-cdc-pipeline-connector-maxcompute依赖flink通信机制进行写入同步,
    # 这里适当增大消息通信超时时间
    pekko.ask.timeout: 60s
  3. 执行如下命令,启动Flink集群。

    ./bin/start-cluster.sh

    如启动成功,可以在http://localhost:8081/(8081为默认端口)访问到Flink Web UI。

    多次执行start-cluster.sh可以拉起多个TaskManager,用于并发执行。

准备MySQL环境

此处以Docker Compose的方式为例指导您准备MySQL环境。

  1. 启动Docker镜像后,创建一个名为docker-compose.yaml的文件,文件内容如下:

    version: '2.1'
    services:
      mysql:
        image: debezium/example-mysql:1.1
        ports:
          - "3306:3306"
        environment:
          - MYSQL_ROOT_PASSWORD=123456
          - MYSQL_USER=mysqluser
          - MYSQL_PASSWORD=mysqlpw

    参数说明:

    参数

    描述

    version

    Docker版本。

    image

    镜像版本,配置为debezium/example-mysql:1.1。

    ports

    MySQL端口号。

    environment

    MySQL账号密码。

    该Docker Compose中包含的容器有:MySQL-包含商品信息的数据库app_db。

  2. 在docker-compose.yaml所在目录执行如下命令,启动所需组件:

    docker-compose up -d

    该命令将以Detached模式自动启动Docker Compose配置中定义的所有容器。您可以执行docker ps命令查看上述容器是否已正常启动。

在MySQL数据库中准备数据

  1. 执行如下命令,进入MySQL容器。

    docker-compose exec mysql mysql -uroot -p123456
  2. 在MySQL中创建数据库,并准备表数据。

    1. 创建数据库。

      CREATE DATABASE app_db;
      USE app_db;
    2. 准备表数据。

      • 创建orders表,并插入数据。

        CREATE TABLE `orders` (
        `id` INT NOT NULL,
        `price` DECIMAL(10,2) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- 插入数据
        INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
        INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
      • 创建shipments表,并插入数据。

        CREATE TABLE `shipments` (
        `id` INT NOT NULL,
        `city` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- 插入数据
        INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
        INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
      • 创建products表,并插入数据。

        -- 
        CREATE TABLE `products` (
        `id` INT NOT NULL,
        `product` VARCHAR(255) NOT NULL,
        PRIMARY KEY (`id`)
        );
        
        -- 插入数据
        INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
        INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
        INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

通过Flink CDC CLI提交任务

  1. 下载所需JAR包:

    • flink-cdc包

      进入flink-cdc下载二进制压缩包flink-cdc-3.1.1-bin.tar.gz,并解压得到flink-cdc-3.1.1目录,其中会包含bin、lib、log及conf四个目录,将这四个目录下的文件移动至flink-1.18.0对应的目录下。

    • Connector包

      下载以下Connector包,并移动至flink-1.18.0/lib目录下。

      说明

      下载链接只对已发布的版本有效, SNAPSHOT版本需要本地基于master或release-分支编译。

    • Driver包

      下载MySQL Connector Java包,通过--jar参数将其传入Flink CDC CLI,或将其放在$flink-1.18.0/lib目录下并重启Flink集群,因为CDC Connectors不再包含这些Drivers。

  2. 编写任务配置YAML文件。下述为您提供一个整库同步的示例文件mysql-to-maxcompute.yaml

    ################################################################################
    # Description: Sync MySQL all tables to MaxCompute
    ################################################################################
    source:
      type: mysql
      hostname: localhost
      port: 3306
      username: root
      password: 123456
      tables: app_db.\.*
      server-id: 5400-5404
      server-time-zone: UTC
    
    # accessId, accessKey, endpoint, project需要用户自行填写
    sink:
       type: maxcompute
       name: MaxComputeSink
       accessId: ${your_accessId}
       accessKey: ${your_accessKey}
       endpoint: ${your_maxcompute_endpoint}
       project: ${your_project}
       bucketsNum: 8
    
    pipeline:
      name: Sync MySQL Database to MaxCompute
      parallelism: 1
    

    参数说明:

  3. 执行下述命令,提交任务至Flink Standalone集群。

    ./bin/flink-cdc.sh mysql-to-maxcompute.yaml

    提交成功后,返回如下信息:

    Pipeline has been submitted to cluster.
    Job ID: f9f9689866946e25bf151ecc179ef46f
    Job Description: Sync MySQL Database to MaxCompute

    在Flink Web UI中,即可看到一个名为Sync MySQL Database to MaxCompute的任务正在运行。

  4. 在MaxCompute中执行如下SQL,查看orders、shipments及products三张表是否已被成功创建,并且可以进行数据写入。

    -- 查看orders表
    read orders;
    
    -- 返回结果:
    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
    
    -- 查看shipments表
    read shipments;
    
    -- 返回结果
    +------------+------------+
    | id         | city       |
    +------------+------------+
    | 1          | beijing    |
    | 2          | xian       |
    +------------+------------+
    
    -- 查看products表
    read products;
    
    -- 返回结果
    +------------+------------+
    | id         | product    |
    +------------+------------+
    | 3          | Peanut     |
    | 1          | Beer       |
    | 2          | Cap        |
    +------------+------------+

同步变更操作

此处以orders表为例,为您展示在修改MySQL数据库中的源表数据时,MaxCompute中对应的目标表数据也会实时更新。

  1. 执行如下命令,进入MySQL容器。

    docker-compose exec mysql mysql -uroot -p123456
  2. 在MySQL的orders表中插入一条数据。

    INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);

    在MaxCompute中执行read orders;命令查询orders表数据。返回结果如下:

    +------------+------------+
    | id         | price      |
    +------------+------------+
    | 3          | 100        |
    | 1          | 4          |
    | 2          | 100        |
    +------------+------------+
  3. 在MySQL的orders表中增加一个字段。

    ALTER TABLE app_db.orders ADD amount varchar(100) NULL;

    在MaxCompute中执行read orders;命令查询orders表数据。返回结果如下:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 4          | NULL       |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  4. 在MySQL的orders表中更新一条数据。

    UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;

    在MaxCompute中执行read orders;命令查询orders表数据。返回结果如下:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    | 2          | 100        | NULL       |
    +------------+------------+------------+
  5. 在MySQL的orders表中删除一条数据。

    DELETE FROM app_db.orders WHERE id=2;

    在MaxCompute中执行read orders;命令查询orders表数据。返回结果如下:

    +------------+------------+------------+
    | id         | price      | amount     |
    +------------+------------+------------+
    | 3          | 100        | NULL       |
    | 1          | 100        | 100.00     |
    +------------+------------+------------+

对于上述操作,在MySQL中每执行一步,就在MaxCompute中进行一次数据预览,可以看到MaxCompute中显示的orders表数据是实时更新的。

轮询变更操作

Flink CDC提供了将源表的表结构或数据路由到其他表名的配置,借助这种能力,我们能够实现表名、库名替换,整库同步等功能。 下面提供一个配置文件说明:

################################################################################
# Description: Sync MySQL all tables to MaxCompute
################################################################################
source:
   type: mysql
   hostname: localhost
   port: 3306
   username: root
   password: 123456
   tables: app_db.\.*
   server-id: 5400-5404
   server-time-zone: UTC

# accessId, accessKey, endpoint, project 需要用户自行填写
sink:
   type: maxcompute
   name: MaxComputeSink
   accessId: ${your_accessId}
   accessKey: ${your_accessKey}
   endpoint: ${your_maxcompute_endpoint}
   project: ${your_project}
   bucketsNum: 8

route:
   - source-table: app_db.orders
     sink-table: ods_db.ods_orders
   - source-table: app_db.shipments
     sink-table: ods_db.ods_shipments
   - source-table: app_db.products
     sink-table: ods_db.ods_products

pipeline:
   name: Sync MySQL Database to MaxCompute
   parallelism: 1

route部分的参数详情请参见Flink CDC Route

通过上面的route配置,会将app_db.orders表的结构和数据同步至ods_db.ods_orders中。从而实现数据库迁移的功能。 特别地,source-table支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置:

route:
  - source-table: app_db.order\.*
    sink-table: ods_db.ods_orders

这样,就可以将诸如app_db.order01、app_db.order02、app_db.order03的表数据汇总到ods_db.ods_orders中。

说明

目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。

环境清理

执行完上述操作后,您需要进行环境清理。

  1. 在docker-compose.yml文件所在的目录下执行如下命令停止所有容器:

    docker-compose down
  2. 在Flink所在目录flink-1.18.0下,执行如下命令停止Flink集群:

    ./bin/stop-cluster.sh

附录

连接器Connector配置项

配置项

是否必填

默认值

类型

描述

type

none

String

指定要使用的连接器,这里需要设置成 maxcompute

name

none

String

Sink的名称。

accessId

none

String

阿里云账号或RAM用户的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。

accessKey

none

String

AccessKey ID对应的AccessKey Secret。

endpoint

none

String

MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见 Endpoint

project

none

String

MaxCompute项目名称。您可以登录MaxCompute控制台,在工作区>项目管理页面获取MaxCompute项目名称。

tunnelEndpoint

none

String

MaxCompute Tunnel服务的连接地址,通常这项配置可以根据指定的项目所在的地域进行自动路由。仅在使用代理等特殊网络环境下使用该配置。

quotaName

none

String

MaxCompute数据传输使用的独享资源组名称,如不指定该配置,则使用共享资源组。详情可以参见购买与使用独享数据传输服务资源组

stsToken

none

String

当使用RAM角色颁发的短时有效的访问令牌(STS Token)进行鉴权时,需要指定该参数。

bucketsNum

16

Integer

自动创建MaxCompute Delta表时使用的桶数。使用方式请参见近实时数仓概述

compressAlgorithm

zlib

String

写入MaxCompute时使用的数据压缩算法,当前支持raw(不进行压缩)、zlibsnappy

totalBatchSize

64MB

String

内存中缓冲的数据量大小,单位为分区级(非分区表单位为表级),不同分区(表)的缓冲区相互独立,达到阈值后数据写入到MaxCompute。

bucketBatchSize

4MB

String

内存中缓冲的数据量大小,单位为桶级,仅写入Delta表时生效。不同数据桶的缓冲区相互独立,达到阈值后将该桶数据写入到MaxCompute。

numCommitThreads

16

Integer

Checkpoint阶段,能够同时处理的分区(表)数量。

numFlushConcurrent

4

Integer

写入数据到MaxCompute时,能够同时写入的桶数量。仅写入Delta表时生效。

retryTimes

3

Integer

当网络链接发生错误时,进行重试的次数。

sleepMillis

true

Long

当网络链接发生错误时,每次重试等待的时间,单位:毫秒。

表位置映射

连接器Connector自动建表时,使用如下映射关系,将源表的位置信息映射到MaxCompute表中。

重要

当MaxCompute项目不支持Schema模型时,每个同步任务仅能同步一个MySQL Database。(其他数据源同理,连接器Connector会忽略tableId.namespace信息)。

Flink CDC中对象

MaxCompute位置

MySQL位置

配置文件中project

Project

none

TableId.namespace

Schema(仅当MaxCompute项目支持Schema模型时,如不支持,将忽略该配置)

Database

TableId.tableName

Table

Table

数据类型映射

Flink Type

MaxCompute Type

CHAR/VARCHAR

STRING

BOOLEAN

BOOLEAN

BINARY/VARBINARY

BINARY

DECIMAL

DECIMAL

TINYINT

TINYINT

SMALLINT

SMALLINT

INTEGER

INTEGER

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

TIME_WITHOUT_TIME_ZONE

STRING

DATE

DATE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_NTZ

TIMESTAMP_WITH_LOCAL_TIME_ZONE

TIMESTAMP

TIMESTAMP_WITH_TIME_ZONE

TIMESTAMP

ARRAY

ARRAY

MAP

MAP

ROW

STRUCT