本文为您介绍如何调试和运行使用MySQL连接器的 DataStream 作业。
DataStream调试方案
创建DataStream API程序并使用MySqlSource。代码示例如下:
本地调试需要下载相关jar并配置依赖,详情请参见本地运行和调试包含连接器的作业。本文已提供Maven示例项目,详情请参考MysqlCDCDemo.zip。
package com.alibaba.realtimecompute;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;
public class MysqlCDCDemo {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString("pipeline.classpaths", "file://" + "mysql uber jar绝对路径");  // 配置依赖
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("hostname")
                .port(3306)
                .databaseList("test_db") // 设置捕获的数据库
                .tableList("test_db.test_table") // 设置捕获的表
                .username("username")
                .password("password")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        env.enableCheckpointing(3000);
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .print().setParallelism(1);
        env.execute("Print MySQL Snapshot + Binlog");
    }
}在构建MySqlSource时,代码中必须指定以下参数:
| 参数 | 说明 | 
| hostname | MySQL数据库的IP地址或者主机名。 | 
| port | MySQL数据库服务的端口号。 | 
| databaseList | MySQL数据库名称。 说明  数据库名称支持正则表达式以读取多个数据库的数据,您可以使用 | 
| username | MySQL数据库服务的用户名。 | 
| password | MySQL数据库服务的密码。 | 
| deserializer | 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下: 
 | 
pom依赖
本地调试
阿里云实时计算Flink版连接器包含商业化的额外内容,与Apache Flink版本有许多差异,本地调试时需对pom做如下修改:
- ${flink.version}必须为1.19.0。 
- 连接器${vvr.version}推荐版本如下:1.17-vvr-8.0.11-6,1.20-vvr-11.1.3-jdk11。其他版本可见Maven Repository。 
- 需要添加kafka连接器依赖。 
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>8</java.version>
    <flink.version>1.19.0</flink.version>
    <vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <!-- VVR 11.x 版本使用此 Group ID -->
        <groupId>com.alibaba.ververica</groupId>
       <!-- VVR 8.x 版本使用此 Group ID -->
        <!-- <groupId>org.apache.flink</groupId> -->
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-mysql</artifactId>
        <version>${vvr.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr.version}</version>
    </dependency>
</dependencies>实时计算Flink版部署调试
在实时计算Flink版启动作业调试时,连接器无版本限制,${flink.version}和${vvr.version}仅需和作业引擎版本相对应即可,对应关系请参见引擎。参考pom如下:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>