如果您通过DataStream的方式以精确一次(Exactly-once)的语义读取MySQL数据库中的数据变更,同时读取全量数据,则需要使用MySQL CDC DataStream Connector连接Flink全托管。本文为您介绍如何在Flink全托管控制台上使用MySQL CDC DataStream Connector来读MySQL数据。

背景信息

Maven中央库中已经内置了VVR Connector,以供您在作业开发时直接使用。您可以通过以下任何一种方式来使用DataStream Connector:

什么是MySQL的CDC源表

MySQL的CDC源表,即MySQL的流式源表,支持对MySQL数据库的全量和增量读取,并保证Exactly Once,不多读一条也不少读一条数据。其工作机制是,在启动扫描全表前,先加一个全局读锁(FLUSH TABLES WITH READ LOCK),然后获取此时的Binlog位点以及表的schema,紧接着释放全局读锁。随后开始扫描全表,当全表数据读取完后,会从之前获取的Binlog位点获取增量的变更记录。在Flink作业运行期间会周期性执行checkpoint,记录下Binlog位点,当作业发生Failover,便会从之前记录的Binlog位点继续处理,从而实现Exactly Once语义。

使用限制

  • MySQL CDC DataStream Source不能并行读取MySQL数据,因为只有一个Task能够接收Binlog事件。
  • MySQL CDC DataStream Source不支持读取增量快照,仅支持读取单并发。
    注意 MySQL CDC SQL Source支持读取增量快照,详情请参见MySQL的CDC源表(公测中)

注意事项

  • 在使用MySQL CDC DataStream Source读取全量数据时,checkpoint不生效。因此在使用MYSQL CDC DataStreamSource时,不建议开启自动调优功能。
  • 建议对MySQL用户授予RELOAD权限。
    如果您未对MySQL用户授予RELOAD权限,则全局读锁会降级为表级读锁。而使用表级读锁需要等到全表扫描完成,才能释放锁。所以持锁时间会较长,而读锁会阻塞往表内写入数据,影响线上业务。
    说明 如果您已经配置了'debezium.snapshot.locking.mode' = 'none' 属性来显示跳过读锁阶段,则不用授予RELOAD权限。
  • 全局读锁(FLUSH TABLES WITH READ LOCK)的影响。
    全局读锁阶段会去获取Binlog位点以及表的Schema,因此其持锁耗时与表的数量成正比,数据库持锁耗时可能达到秒级。例如上千张表大概两三秒。而读锁是会阻塞写入操作,因此仍可能对线上业务造成影响。如果您希望跳过锁阶段,且能容忍非Exactly Once语义,则可以在debezium properties中增加'snapshot.locking.mode' = 'none' 属性来显式跳过锁阶段。示例如下。
    Properties debeziumProperties = new Properties();
    debeziumProperties.put("snapshot.locking.mode", "none");
    MySQLSource.<String>builder()
    ... // other parameters
    .debeziumProperties(debeziumProperties)
    .build();
  • 每个作业需显式配置不同的SERVER ID。

    每个同步数据库数据的客户端,都会有一个唯一ID,即SERVER ID。MySQL SERVER会根据该ID来维护网络连接以及Binlog位点。因此如果有大量不同的SERVER ID的客户端一起连接MySQL SERVER,可能导致MySQL SERVER的CPU陡增,影响线上业务稳定性。此外,多个作业共享相同的SERVER ID,会导致Binlog位点错乱,多读或少读数据。因此建议您通过动态Hints,在每个CDC作业都配置上不同的SERVER ID,例如SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;。动态Hints详情请参见动态Hints

  • 全表扫描阶段无法执行checkpoint。
    在扫描全表数据时,没有可用于恢复的位点,所以无法在全表扫描阶段去执行checkpoint。为了不执行checkpoint,MySQL CDC DataStream Source会让执行中的checkpoint一直等待,甚至checkpoint超时(如果表超级大,扫描耗时非常长)。超时的checkpoint会被认为是failed checkpoint,Flink默认配置下,会触发Flink的Failover。因此建议当表超大时,为了避免因为checkpoint超时而导致作业失败,可以配置如下作业参数:
    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay 
    restart-strategy.fixed-delay.attempts: 2147483647
    参数 说明
    execution.checkpointing.interval checkpoint间隔时间,单位为分钟。
    execution.checkpointing.tolerable-failed-checkpoints checkpoint失败容忍重试的总次数。
    restart-strategy checkpoint失败后的重试策略,取值如下:
    • fixed-delay(推荐值):遇到失败,以固定间隔(默认1秒)重启应用,并最多重启restart-strategy.fixed-delay.attempts次。
    • failure-rate:失败后会以固定间隔(默认1s)重启应用,失败频率如果超过指定值,则不再重启。失败频率通过以下两个参数控制:
      • restart-strategy.failure-rate.failure-rate-interval:确定衡量的时间间隔。
      • restart-strategy.failure-rate.max-failures-per-interval:确定时间间隔中最多失败的次数。
    • none:失败后不重启。
    restart-strategy.fixed-delay.attempts checkpoint失败后的重试次数。

(推荐)直接将Connector作为项目依赖打进作业JAR包

  1. 在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。
    <repositories>
      <repository>
        <id>oss.sonatype.org-snapshot</id>
        <name>OSS Sonatype Snapshot Repository</name>
        <url>http://oss.sonatype.org/content/repositories/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
      <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
    </repositories>
  2. 检查您的settings.xml配置文件中是否存在<mirrorOf>*</mirrorOf>配置。

    如果存在<mirrorOf>*</mirrorOf>配置,则需要将此配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

    修改的目的是为了避免SNAPSHOT仓库被覆盖,因为mirrorOf中只使用星号(*)会导致第一步中配置的两个repository被覆盖。

  3. 在作业的Maven POM文件中添加您需要的Connector作为项目依赖。
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${connector.version}</version>
    </dependency>
    每个Connector版本对应的Connector类型可能不同,建议您使用最新版本。Connector版本、VVR/Flink版本和Connector类型的对应关系请参见Connector列表
    注意
    • 您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本,在Maven中央库(search.maven.org)上会查找不到。
    • 在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
      <transformers>
          <!-- The service transformer is needed to merge META-INF/services files -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
              <projectName>Apache Flink</projectName>
              <encoding>UTF-8</encoding>
          </transformer>
      </transformers>
  4. 配置MySQL服务器。
    您需要在Debezium MySQL连接器监视的所有数据库上,定义一个具有适当权限的MySQL用户。
    1. 创建MySQL用户。
      CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

      其中user和password需要替换为您需要创建的目标用户名称和对应密码。

    2. 给用户授予权限。
      GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    3. 刷新权限。
      FLUSH PRIVILEGES;
  5. 创建MySQL SourceFunction。
    示例代码如下所示。
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
    import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
    
    public class MySqlBinlogSourceExample {
      public static void main(String[] args) throws Exception {
          Properties debeziumProperties = new Properties();
          debeziumProperties.put("snapshot.locking.mode", "none");// do not use lock
          
          SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
          .hostname("yourHostname")
          .port(yourPort)
          .databaseList("yourDatabaseName") // set captured database
          .tableList("yourDatabaseName.yourTableName") // set captured table
          .username("yourUsername")
          .password("yourPassword")
          .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
          .debeziumProperties(debeziumProperties)
          .build();
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        env
          .addSource(sourceFunction)
          .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    
        env.execute();
      }
    }
    在构建MySQL SourceFunction时,必须指定以下参数。
    参数 说明
    hostname MySQL数据库的IP地址或者Hostname。
    port MySQL数据库服务的端口号。
    databaseList MySQL数据库名称。
    说明 数据库名称支持正则表达式以读取多个数据库的数据。
    username MySQL数据库服务的用户名。
    password MySQL数据库服务的密码。
    deserializer 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:
    • RowDataDebeziumDeserializeSchema:将SourceRecord反序列化为Flink Table/SQL内部数据结构RowData。
    • StringDebeziumDeserializationSchema:将SourceRecord反序列化为String。

上传Connector JAR包到Flink全托管开发控制台后,填写配置信息

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击资源上传
  4. 单击上传资源,选择您要上传的目标Connector的JAR包。

    您可以上传您自己开发的Connector,也可以上传Flink全托管产品提供的Connector。Flink全托管产品提供的Connector官方JAR包的下载地址,请参见Connector列表

  5. 在目标作业开发页面附加依赖文件项,选择目标Connector的JAR包。