Flink订阅Binlog

更新时间:

实时计算 Flink 版通过订阅云原生数据仓库 AnalyticDB MySQL 版,可以实时捕获和处理数据库变更数据,实现高效的数据同步和流式计算。本文为您介绍如何使用Flink订阅AnalyticDB for MySQL Binlog。

前提条件

  • AnalyticDB for MySQL产品系列为企业版基础版湖仓版数仓版弹性模式

  • AnalyticDB for MySQL集群的内核版本需为3.2.1.0及以上版本。

    说明
    • 查看企业版基础版湖仓版集群的内核版本,请执行SELECT adb_version();。如需升级内核版本,请联系技术支持。

    • 查看和升级数仓版集群的内核版本,请参见查看和升级版本

  • Flink实时计算引擎需为VVR 8.0.4及以上版本。

  • AnalyticDB for MySQLFlink全托管工作空间需要位于同一VPC下,详情请参见创建集群开通实时计算Flink

  • 已将Flink工作空间所属的网段加入AnalyticDB for MySQL的白名单,详情请参见Flink所属网段查看方法设置白名单

使用限制

  • Flink仅支持处理AnalyticDB for MySQL Binlog中的所有基础数据类型和复杂数据类型JSON,详情请参见数据类型

  • Flink不会处理AnalyticDB for MySQL Binlog中的DDL操作记录和分区表自动分区删除的操作记录。

步骤一:开启Binlog功能

  1. 开启Binlog功能,本文以表名为source_table为例。

    说明

    AnalyticDB for MySQL仅支持按表开启Binlog功能。

    建表时,开启Binlog

    CREATE TABLE source_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )DISTRIBUTED BY HASH (id) BINLOG=true;

    建表后,开启Binlog

    ALTER TABLE source_table BINLOG=true;
  2. (可选)修改Binlog保留时长。

    您可以通过修改binlog_ttl参数来调整Binlog的保留时长,参数默认值为6h。以下示例表示将表source_tableBinlog保留时长设置为1天。

    ALTER TABLE source_table binlog_ttl='1d';

    binlog_ttl参数取值支持以下格式:

    • 纯数字,表示毫秒。例如60代表60毫秒。

    • 数字+s,表示秒。例如30s代表30秒。

    • 数字+h,表示小时。例如2h代表2小时。

    • 数字+d,表示天。例如1d代表1天。

    说明

    建议您设置的Binlog保留时间不小于binlog_ttl参数的默认值。若设置的保留时间过短,可能会导致文件被清理,影响数据同步。

步骤二:配置Flink连接器

  1. 登录实时计算控制台

  2. Flink全托管页签,单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击数据连接

  4. 数据连接页面,单击创建自定义连接器

  5. 上传自定义连接器JAR包。下载链接:AnalyticDB for MySQL Connector

  6. 上传完成后,单击下一步

  7. 单击完成。创建完成的自定义连接器会出现在连接器列表中。

步骤三:订阅Binlog

  1. 登录实时计算控制台,新建SQL作业。详情请参见创建作业

  2. 创建源表,连接到AnalyticDB for MySQL并读取指定表(source_table)的Binlog数据。

    说明
    • Flink DDL中定义的主键必须和AnalyticDB for MySQL集群物理表中的主键保持一致,主键一致包括主键和主键名称一致。如果不一致,会影响数据正确性。

    • Flink的数据类型需要和AnalyticDB for MySQL兼容。映射关系,请参见类型映射

    CREATE TEMPORARY TABLE adb_source (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb-mysql-cdc',
      'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
      'username' = 'testUser',
      'password' = 'Test12****',
      'database-name' = 'binlog',
      'table-name' = 'source_table'
    );

    WITH参数说明:

    参数

    是否必填

    默认值

    数据类型

    说明

    connector

    STRING

    使用的连接器。

    这里填写自定义连接器,固定填写adb-mysql-cdc

    hostname

    STRING

    AnalyticDB for MySQLVPC地址。

    username

    STRING

    AnalyticDB for MySQL数据库账号。

    password

    STRING

    AnalyticDB for MySQL数据库密码。

    database-name

    STRING

    AnalyticDB for MySQL数据库名称。

    由于AnalyticDB for MySQL实现的是表级Binlog,此处仅支持设置一个数据库。

    table-name

    STRING

    AnalyticDB for MySQL数据库的表名。

    由于AnalyticDB for MySQL实现的是表级Binlog,此处仅支持设置一个表。

    port

    3306

    INTEGER

    端口号。

    scan.incremental.snapshot.enabled

    true

    BOOLEAN

    增量快照。

    默认开启。增量快照是一种读取表快照的新机制,与旧的快照机制相比,增量快照有许多优点,包括:

    • 在读取快照期间,Source支持并发读取。

    • 在读取快照期间,Source支持进行Chunk粒度的Checkpoint。

    • 在读取快照之前,Source不需要获取数据库锁权限。

    scan.incremental.snapshot.chunk.size

    8096

    INTEGER

    表快照的Chunk大小(包含的行数)。

    当开启增量快照读取时,表会被切分成多个Chunk读取。

    scan.snapshot.fetch.size

    1024

    INTEGER

    读取表快照时,每次读取数据的最大行数。

    scan.startup.mode

    initial

    STRING

    消费数据的启动模式。

    取值如下:

    • initial(默认):在第一次启动时,会先扫描历史全量数据,然后读取最新的Binlog数据。

    • earliest-offset:不扫描历史全量数据,直接从可读取的最早Binlog开始读取。

    • specific-offset:不扫描历史全量数据,从您指定的Binlog位点启动,位点可通过同时配置scan.startup.specific-offset.filescan.startup.specific-offset.pos参数来指定从特定Binlog文件名和偏移量启动。

    scan.startup.specific-offset.file

    STRING

    specific-offset启动模式下,启动位点的Binlog文件名。

    最新Binlog文件名可使用SHOW MASTER STATUS for table_name获取。

    scan.startup.specific-offset.pos

    LONG

    specific-offset启动模式下,启动位点的Binlog文件位置。

    最新Binlog位置可使用SHOW MASTER STATUS for table_name获取。

    scan.startup.specific-offset.skip-events

    LONG

    在指定的启动位点后需要跳过的事件数量。

    scan.startup.specific-offset.skip-rows

    LONG

    在指定的启动位点后需要跳过的数据行数。

    server-time-zone

    STRING

    数据库服务器中的会话时区。

    例如:"Asia/Shanghai"。它控制AnalyticDB for MySQL中的TIMESTAMP类型如何转成STRING类型。如果没有设置,则使用ZONELD.SYSTEMDEFAULT()来确定服务器时区。

    debezium.min.row.count.to.stream.result

    1000

    INTEGER

    当表的行数大于该值时,连接器会对结果进行流式处理。

    若将此参数设置为0,会跳过所有表大小检查,始终在快照期间对所有结果进行流式处理。

    connect.timeout

    30s

    DURATION

    连接数据库服务器超时,重试连接之前等待超时的最长时间。

    默认单位为秒(s)。

    connect.max-retries

    3

    INTEGER

    连接数据库服务时,连接失败后重试的最大次数。

  3. 在目标端创建表,用于存储处理后的数据。本文以AnalyticDB for MySQL作为目标端。Flink支持的连接器请参见支持的连接器

    CREATE TABLE target_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )
  4. 创建结果表,连接步骤3创建的表,用于将处理后的数据写入到AnalyticDB for MySQL指定的表。

    CREATE TEMPORARY TABLE adb_sink (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb3.0',
      'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
      'userName' = 'testUser',
      'password' = 'Test12****',
      'tableName' = 'target_table'
    );

    结果表对应的WITH参数和映射类型,详情请见:云原生数据仓库AnalyticDB MySQL版(ADB)3.0

  5. 将捕获到的源数据变化同步到结果表,并由结果表将数据同步到目标端。

    INSERT INTO adb_sink
    SELECT * FROM adb_source;
  6. 单击保存

  7. 单击深度检查

    深度检查能够检查作业的SQL语义、网络连通性以及作业使用的表的元数据信息。同时,您可以单击结果区域的SQL优化,展开查看SQL风险问题提示以及对应的SQL优化建议。

  8. (可选)单击调试

    您可以使用作业调试功能模拟作业运行、检查输出结果,验证SELECTINSERT业务逻辑的正确性,提升开发效率,降低数据质量风险。详情请参见作业调试

  9. 单击部署详情请参见部署SQL作业

    完成作业开发和深度检查后,即可部署作业,将数据发布至生产环境。部署后,您可以在作业运维页面启动作业至运行阶段,详情请参见作业启动

  10. (可选)查看Binlog信息。

    说明

    使用以下语句查看Binlog日志信息时,若仅开启Binlog功能,日志信息显示为0。只有成功订阅Binlog后,才会显示日志信息。

    • 查看当前写入的Binlog的位点,SQL语句如下:

      SHOW MASTER STATUS FOR source_table;
    • 查看集群内对应表所有Binlog文件的信息,SQL语句如下:

      SHOW BINARY LOGS FOR source_table;

类型映射

AnalyticDB for MySQLFlink的数据类型映射关系如下:

AnalyticDB for MySQL字段类型

Flink字段类型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p,s)或NUMERIC(p,s)

DECIMAL(p,s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

JSON

STRING