本文为您介绍如何在本地IDEA调试和运行MySQL DataStream作业。
DataStream本地调试方案
本地调试需要下载相关jar并配置依赖,详情请参见本地运行和调试包含连接器的作业。
创建DataStream API程序并使用MySqlSource。代码及pom依赖项示例如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MysqlCdcDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setString("pipeline.classpaths", "file://" + "mysql uber jar绝对路径");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hostname")
.port(3306)
.databaseList("test_db") // 设置捕获的数据库
.tableList("test_db.test_table") // 设置捕获的表
.username("username")
.password("password")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
env.enableCheckpointing(3000);
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(4)
.print().setParallelism(1);
env.execute("Print MySQL Snapshot + Binlog");
}
}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
在构建MySqlSource时,代码中必须指定以下参数:
参数 | 说明 |
hostname | MySQL数据库的IP地址或者主机名。 |
port | MySQL数据库服务的端口号。 |
databaseList | MySQL数据库名称。 说明 数据库名称支持正则表达式以读取多个数据库的数据,您可以使用 |
username | MySQL数据库服务的用户名。 |
password | MySQL数据库服务的密码。 |
deserializer | 反序列化器,将SourceRecord类型记录反序列化到指定类型。参数取值如下:
|
pom依赖项必须指定以下参数:
参数 | 说明 |
${vvr.version} | 阿里云实时计算Flink版的引擎版本,例如 |
${flink.version} | Apache Flink版本,例如 重要 请使用阿里云实时计算Flink版的引擎版本对应的Apache Flink版本,避免在作业运行时出现不兼容的问题。版本对应关系详情,请参见引擎。 |
本地调试问题绕行方案
阿里云实时计算Flink版包含商业化的额外内容,与Apache Flink版本有许多差异,导致本地调试运行发生报错。下面详细描述已发布的MySQL连接器不同版本,应该如何绕过本地调试运行报错问题。
连接器版本 | 绕行方案 |
1.15-vvr-6.0.2-3及以前的版本 | 按照已有本地调试方案运行即可。 |
1.15-vvr-6.0.7 | 这四个版本需要调整pom.xml配置后,可按本地调试方案运行。
pom.xml修改后如下:
重要 以上修改仅用于本地调试,调试结束后请将修改内容还原后再打包部署。 |
1.15-vvr-6.0.7-1 | |
1.17-vvr-8.0.4-1 | |
1.17-vvr-8.0.4-3 | |
1.17-vvr-8.0.8 | 暂无绕行方案,请使用1.17-vvr-8.0.4-3版本调试。 |