MySQL连接器DataStream调试

更新时间:2025-04-22 05:47:16

本文为您介绍如何调试和运行使用MySQL连接器的 DataStream 作业。

DataStream调试方案

创建DataStream API程序并使用MySqlSource。代码示例如下:

重要

本地调试需要下载相关jar并配置依赖,详情请参见本地运行和调试包含连接器的作业。本文已提供Maven示例项目,详情请参考MysqlCDCDemo.zip

package com.alibaba.realtimecompute;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;

public class MysqlCDCDemo {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("pipeline.classpaths", "file://" + "mysql uber jar绝对路径");  // 配置依赖
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hostname")
                .port(3306)
                .databaseList("test_db") // 设置捕获的数据库
                .tableList("test_db.test_table") // 设置捕获的表
                .username("username")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .print().setParallelism(1);
        env.execute("Print MySQL Snapshot + Binlog");
    }
}

在构建MySqlSource时,代码中必须指定以下参数:

参数

说明

参数

说明

hostname

MySQL数据库的IP地址或者主机名。

port

MySQL数据库服务的端口号。

databaseList

MySQL数据库名称。

说明

数据库名称支持正则表达式以读取多个数据库的数据,您可以使用.*匹配所有数据库。

username

MySQL数据库服务的用户名。

password

MySQL数据库服务的密码。

deserializer

反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:

  • RowDataDebeziumDeserializeSchema:将SourceRecord转成Flink TableSQL内部数据结构RowData。

  • JsonDebeziumDeserializationSchema:将SourceRecord转成JSON格式的String。

pom依赖

本地调试
实时计算Flink版部署调试

阿里云实时计算Flink版连接器包含商业化的额外内容,与Apache Flink版本有许多差异,本地调试时需对pom做如下修改:

  1. ${flink.version}必须为1.19.0。

  2. 连接器${vvr.version}可选版本如下:1.15-vvr-6.0.7、1.15-vvr-6.0.7-1、1.17-vvr-8.0.4-1、1.17-vvr-8.0.4-3、1.17-vvr-8.0.11-1。

  3. 需要添加kafka连接器依赖。

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>8</java.version>
    <flink.version>1.19.0</flink.version>
    <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${vvr.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr.version}</version>
    </dependency>
</dependencies>

在实时计算Flink版启动作业调试时,连接器无版本限制,${flink.version}和${vvr.version}仅需和作业引擎版本相对应即可,对应关系请参见引擎。参考pom如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>
  • 本页导读 (1)
  • DataStream调试方案
  • pom依赖
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等