MySQL连接器DataStream本地调试

本文为您介绍如何在本地IDEA调试和运行MySQL DataStream作业。

DataStream本地调试方案

重要

本地调试需要下载相关jar并配置依赖,详情请参见本地运行和调试包含连接器的作业

创建DataStream API程序并使用MySqlSource。代码及pom依赖项示例如下:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MysqlCdcDemo {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("pipeline.classpaths", "file://" + "mysql uber jar绝对路径");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hostname")
                .port(3306)
                .databaseList("test_db") // 设置捕获的数据库
                .tableList("test_db.test_table") // 设置捕获的表
                .username("username")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .print().setParallelism(1);
        env.execute("Print MySQL Snapshot + Binlog");
    }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

在构建MySqlSource时,代码中必须指定以下参数:

参数

说明

hostname

MySQL数据库的IP地址或者主机名。

port

MySQL数据库服务的端口号。

databaseList

MySQL数据库名称。

说明

数据库名称支持正则表达式以读取多个数据库的数据,您可以使用.*匹配所有数据库。

username

MySQL数据库服务的用户名。

password

MySQL数据库服务的密码。

deserializer

反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:

  • RowDataDebeziumDeserializeSchema:将SourceRecord转成Flink Table或SQL内部数据结构RowData。

  • JsonDebeziumDeserializationSchema:将SourceRecord转成JSON格式的String。

pom依赖项必须指定以下参数:

参数

说明

${vvr.version}

阿里云实时计算Flink版的引擎版本,例如1.17-vvr-8.0.4-3

${flink.version}

Apache Flink版本,例如1.17.2

重要

请使用阿里云实时计算Flink版的引擎版本对应的Apache Flink版本,避免在作业运行时出现不兼容的问题。版本对应关系详情,请参见引擎

本地调试问题绕行方案

阿里云实时计算Flink版包含商业化的额外内容,与Apache Flink版本有许多差异,导致本地调试运行发生报错。下面详细描述已发布的MySQL连接器不同版本,应该如何绕过本地调试运行报错问题。

连接器版本

绕行方案

1.15-vvr-6.0.2-3及以前的版本

按照已有本地调试方案运行即可。

1.15-vvr-6.0.7

这四个版本需要调整pom.xml配置后,可按本地调试方案运行。

  1. ${flink.version}改为1.19.0。

  2. 删除flink-connector-base依赖。

  3. 添加1.17-vvr-8.0.4-1版本的kafka连接器依赖。

pom.xml修改后如下:

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>8</java.version>

        <flink.version>1.19.0</flink.version>
        <vvr.version>xxx</vvr.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>${vvr.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>1.17-vvr-8.0.4-1</version>
        </dependency>
    </dependencies>
重要

以上修改仅用于本地调试,调试结束后请将修改内容还原后再打包部署。

1.15-vvr-6.0.7-1

1.17-vvr-8.0.4-1

1.17-vvr-8.0.4-3

1.17-vvr-8.0.8

暂无绕行方案,请使用1.17-vvr-8.0.4-3版本调试。