MaxCompute为您提供对接Flink CDC的新版插件Connector连接器。您可以通过对接Flink CDC,将数据源(例如MySQL)数据实时同步至MaxCompute的目标表(普通表或Delta表)。本文为您介绍MaxCompute新版插件的能力支持情况与主要操作流程。
Flink CDC背景介绍
Flink CDC是一个端到端的开源实时数据集成工具,定义了一套功能完整的编程接口和ETL数据处理框架,用户可通过提交Flink作业使用其功能,详情请参见Flink CDC。Flink CDC深度集成并由Apache Flink驱动,提供以下核心功能:
端到端的数据集成框架。
为数据集成的用户提供了易于构建作业的API。
支持在Source(数据源)和Sink(输出端)中处理多个表。
整库同步。
具备表结构变更自动同步的能力(Schema Evolution)。
前提条件
已创建MaxCompute项目,详情请参见创建MaxCompute项目。
注意事项
快速开始
本文将基于Flink CDC,快速构建MySQL到MaxCompute的Streaming ETL作业(MySQL to MaxCompute),实现Flink CDC Pipeline的编写。其中包含整库同步、表结构变更同步和分库分表同步功能。
环境准备
准备Flink Standalone集群
下载Flink 1.18.0并解压,解压后得到
flink-1.18.0
目录。进入flink-1.18.0
目录,执行以下命令,将FLINK_HOME设置为flink-1.18.0的安装目录。export FLINK_HOME=$(pwd)
在
$flink-1.18.0/conf
目录下执行vim flink-conf.yaml
命令,在配置文件中追加下列参数并保存。# 开启checkpoint,每隔3秒做一次checkpoint # 仅作测试使用,实际作业checkpoint间隔时间不建议低于30s execution.checkpointing.interval: 3000 # 由于flink-cdc-pipeline-connector-maxcompute依赖flink通信机制进行写入同步, # 这里适当增大消息通信超时时间 pekko.ask.timeout: 60s
执行如下命令,启动Flink集群。
./bin/start-cluster.sh
如启动成功,可以在http://localhost:8081/(8081为默认端口)访问到Flink Web UI。
多次执行start-cluster.sh可以拉起多个TaskManager,用于并发执行。
准备MySQL环境
此处以Docker Compose的方式为例指导您准备MySQL环境。
启动Docker镜像后,创建一个名为
docker-compose.yaml
的文件,文件内容如下:version: '2.1' services: mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw
参数说明:
参数
描述
version
Docker版本。
image
镜像版本,配置为debezium/example-mysql:1.1。
ports
MySQL端口号。
environment
MySQL账号密码。
该Docker Compose中包含的容器有:MySQL-包含商品信息的数据库app_db。
在docker-compose.yaml所在目录执行如下命令,启动所需组件:
docker-compose up -d
该命令将以Detached模式自动启动Docker Compose配置中定义的所有容器。您可以执行
docker ps
命令查看上述容器是否已正常启动。
在MySQL数据库中准备数据
执行如下命令,进入MySQL容器。
docker-compose exec mysql mysql -uroot -p123456
在MySQL中创建数据库,并准备表数据。
创建数据库。
CREATE DATABASE app_db; USE app_db;
准备表数据。
创建orders表,并插入数据。
CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); -- 插入数据 INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);
创建shipments表,并插入数据。
CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- 插入数据 INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');
创建products表,并插入数据。
-- CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- 插入数据 INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
通过Flink CDC CLI提交任务
下载所需JAR包:
flink-cdc包
进入flink-cdc下载二进制压缩包flink-cdc-3.1.1-bin.tar.gz,并解压得到
flink-cdc-3.1.1
目录,其中会包含bin、lib、log及conf四个目录,将这四个目录下的文件移动至flink-1.18.0对应的目录下。Connector包
下载以下Connector包,并移动至
flink-1.18.0/lib
目录下。说明下载链接只对已发布的版本有效, SNAPSHOT版本需要本地基于master或release-分支编译。
Driver包
下载MySQL Connector Java包,通过--jar参数将其传入Flink CDC CLI,或将其放在
$flink-1.18.0/lib
目录下并重启Flink集群,因为CDC Connectors不再包含这些Drivers。
编写任务配置YAML文件。下述为您提供一个整库同步的示例文件
mysql-to-maxcompute.yaml
:################################################################################ # Description: Sync MySQL all tables to MaxCompute ################################################################################ source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app_db.\.* server-id: 5400-5404 server-time-zone: UTC # accessId, accessKey, endpoint, project需要用户自行填写 sink: type: maxcompute name: MaxComputeSink accessId: ${your_accessId} accessKey: ${your_accessKey} endpoint: ${your_maxcompute_endpoint} project: ${your_project} bucketsNum: 8 pipeline: name: Sync MySQL Database to MaxCompute parallelism: 1
参数说明:
Source部分的参数配置详情请参见Apache Flink CDC MySQL Connector。
Sink部分的参数配置方式请参见连接器Connector配置项。
执行下述命令,提交任务至Flink Standalone集群。
./bin/flink-cdc.sh mysql-to-maxcompute.yaml
提交成功后,返回如下信息:
Pipeline has been submitted to cluster. Job ID: f9f9689866946e25bf151ecc179ef46f Job Description: Sync MySQL Database to MaxCompute
在Flink Web UI中,即可看到一个名为
Sync MySQL Database to MaxCompute
的任务正在运行。在MaxCompute中执行如下SQL,查看orders、shipments及products三张表是否已被成功创建,并且可以进行数据写入。
-- 查看orders表 read orders; -- 返回结果: +------------+------------+ | id | price | +------------+------------+ | 1 | 4 | | 2 | 100 | +------------+------------+ -- 查看shipments表 read shipments; -- 返回结果 +------------+------------+ | id | city | +------------+------------+ | 1 | beijing | | 2 | xian | +------------+------------+ -- 查看products表 read products; -- 返回结果 +------------+------------+ | id | product | +------------+------------+ | 3 | Peanut | | 1 | Beer | | 2 | Cap | +------------+------------+
同步变更操作
此处以orders表为例,为您展示在修改MySQL数据库中的源表数据时,MaxCompute中对应的目标表数据也会实时更新。
执行如下命令,进入MySQL容器。
docker-compose exec mysql mysql -uroot -p123456
在MySQL的orders表中插入一条数据。
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
在MaxCompute中执行
read orders;
命令查询orders表数据。返回结果如下:+------------+------------+ | id | price | +------------+------------+ | 3 | 100 | | 1 | 4 | | 2 | 100 | +------------+------------+
在MySQL的orders表中增加一个字段。
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
在MaxCompute中执行
read orders;
命令查询orders表数据。返回结果如下:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 4 | NULL | | 2 | 100 | NULL | +------------+------------+------------+
在MySQL的orders表中更新一条数据。
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
在MaxCompute中执行
read orders;
命令查询orders表数据。返回结果如下:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | | 2 | 100 | NULL | +------------+------------+------------+
在MySQL的orders表中删除一条数据。
DELETE FROM app_db.orders WHERE id=2;
在MaxCompute中执行
read orders;
命令查询orders表数据。返回结果如下:+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | +------------+------------+------------+
对于上述操作,在MySQL中每执行一步,就在MaxCompute中进行一次数据预览,可以看到MaxCompute中显示的orders表数据是实时更新的。
轮询变更操作
Flink CDC提供了将源表的表结构或数据路由到其他表名的配置,借助这种能力,我们能够实现表名、库名替换,整库同步等功能。 下面提供一个配置文件说明:
################################################################################
# Description: Sync MySQL all tables to MaxCompute
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
# accessId, accessKey, endpoint, project 需要用户自行填写
sink:
type: maxcompute
name: MaxComputeSink
accessId: ${your_accessId}
accessKey: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
bucketsNum: 8
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to MaxCompute
parallelism: 1
route部分的参数详情请参见Flink CDC Route。
通过上面的route配置,会将app_db.orders表的结构和数据同步至ods_db.ods_orders中。从而实现数据库迁移的功能。 特别地,source-table支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置:
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_orders
这样,就可以将诸如app_db.order01、app_db.order02、app_db.order03的表数据汇总到ods_db.ods_orders中。
目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。
环境清理
执行完上述操作后,您需要进行环境清理。
在docker-compose.yml文件所在的目录下执行如下命令停止所有容器:
docker-compose down
在Flink所在目录flink-1.18.0下,执行如下命令停止Flink集群:
./bin/stop-cluster.sh
附录
连接器Connector配置项
配置项 | 是否必填 | 默认值 | 类型 | 描述 |
type | 是 | none | String | 指定要使用的连接器,这里需要设置成 |
name | 否 | none | String | Sink的名称。 |
accessId | 是 | none | String | 阿里云账号或RAM用户的AccessKey ID。您可以进入AccessKey管理页面获取AccessKey ID。 |
accessKey | 是 | none | String | AccessKey ID对应的AccessKey Secret。 |
endpoint | 是 | none | String | MaxCompute服务的连接地址。您需要根据创建MaxCompute项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见 Endpoint。 |
project | 是 | none | String | MaxCompute项目名称。您可以登录MaxCompute控制台,在工作区>项目管理页面获取MaxCompute项目名称。 |
tunnelEndpoint | 否 | none | String | MaxCompute Tunnel服务的连接地址,通常这项配置可以根据指定的项目所在的地域进行自动路由。仅在使用代理等特殊网络环境下使用该配置。 |
quotaName | 否 | none | String | MaxCompute数据传输使用的独享资源组名称,如不指定该配置,则使用共享资源组。详情可以参见购买与使用独享数据传输服务资源组。 |
stsToken | 否 | none | String | 当使用RAM角色颁发的短时有效的访问令牌(STS Token)进行鉴权时,需要指定该参数。 |
bucketsNum | 否 | 16 | Integer | 自动创建MaxCompute Delta表时使用的桶数。使用方式请参见Delta Table概述。 |
compressAlgorithm | 否 | zlib | String | 写入MaxCompute时使用的数据压缩算法,当前支持 |
totalBatchSize | 否 | 64MB | String | 内存中缓冲的数据量大小,单位为分区级(非分区表单位为表级),不同分区(表)的缓冲区相互独立,达到阈值后数据写入到MaxCompute。 |
bucketBatchSize | 否 | 4MB | String | 内存中缓冲的数据量大小,单位为桶级,仅写入Delta表时生效。不同数据桶的缓冲区相互独立,达到阈值后将该桶数据写入到MaxCompute。 |
numCommitThreads | 否 | 16 | Integer | Checkpoint阶段,能够同时处理的分区(表)数量。 |
numFlushConcurrent | 否 | 4 | Integer | 写入数据到MaxCompute时,能够同时写入的桶数量。仅写入Delta表时生效。 |
retryTimes | 否 | 3 | Integer | 当网络链接发生错误时,进行重试的次数。 |
sleepMillis | 否 | true | Long | 当网络链接发生错误时,每次重试等待的时间,单位:毫秒。 |
表位置映射
连接器Connector自动建表时,使用如下映射关系,将源表的位置信息映射到MaxCompute表中。
当MaxCompute项目不支持Schema模型时,每个同步任务仅能同步一个MySQL Database。(其他数据源同理,连接器Connector会忽略tableId.namespace信息)。
Flink CDC中对象 | MaxCompute位置 | MySQL位置 |
配置文件中project | Project | none |
TableId.namespace | Schema(仅当MaxCompute项目支持Schema模型时,如不支持,将忽略该配置) | Database |
TableId.tableName | Table | Table |
数据类型映射
Flink Type | MaxCompute Type |
CHAR/VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |