本文为您介绍如何调试和运行使用MySQL连接器的 DataStream 作业。
DataStream调试方案
创建DataStream API程序并使用MySqlSource。代码示例如下:
本地调试需要下载相关jar并配置依赖,详情请参见本地运行和调试包含连接器的作业。本文已提供Maven示例项目,详情请参考MysqlCDCDemo.zip。
package com.alibaba.realtimecompute;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.configuration.Configuration;
public class MysqlCDCDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("pipeline.classpaths", "file://" + "mysql uber jar绝对路径"); // 配置依赖
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hostname")
.port(3306)
.databaseList("test_db") // 设置捕获的数据库
.tableList("test_db.test_table") // 设置捕获的表
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(4)
.print().setParallelism(1);
env.execute("Print MySQL Snapshot + Binlog");
}
}
在构建MySqlSource时,代码中必须指定以下参数:
参数 | 说明 |
参数 | 说明 |
hostname | MySQL数据库的IP地址或者主机名。 |
port | MySQL数据库服务的端口号。 |
databaseList | MySQL数据库名称。 数据库名称支持正则表达式以读取多个数据库的数据,您可以使用 |
username | MySQL数据库服务的用户名。 |
password | MySQL数据库服务的密码。 |
deserializer | 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:
|
pom依赖
阿里云实时计算Flink版连接器包含商业化的额外内容,与Apache Flink版本有许多差异,本地调试时需对pom做如下修改:
${flink.version}必须为1.19.0。
连接器${vvr.version}可选版本如下:1.15-vvr-6.0.7、1.15-vvr-6.0.7-1、1.17-vvr-8.0.4-1、1.17-vvr-8.0.4-3、1.17-vvr-8.0.11-1。
需要添加kafka连接器依赖。
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>8</java.version>
<flink.version>1.19.0</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
</dependencies>
在实时计算Flink版启动作业调试时,连接器无版本限制,${flink.version}和${vvr.version}仅需和作业引擎版本相对应即可,对应关系请参见引擎。参考pom如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
- 本页导读 (1)
- DataStream调试方案
- pom依赖