本文为您介绍如何将Table/SQL JAR作业迁移至Flink全托管的SQL作业中。
背景信息
本文将使用开源JDBC Connector写阿里云RDS的方式进行Table/SQL JAR迁移至SQL,因此需要额外配置附加依赖文件选项。实时计算Flink全托管版集群内置了商业化的RDS Connector,您也可以替换为开源JDBC Connector。商业化的RDS Connector详情请参见云数据库RDS MySQL结果表。
本文介绍的迁移场景如下图所示。
前提条件
- 本地已安装Maven 3.x。
- 已在Maven资源中心下载了开源JDBC Connector包,包括mysql-connector-java-8.0.27.jar和flink-connector-jdbc_2.11-1.13.0.jar。重要 依赖文件的版本需要与Flink集群的引擎版本保持一致。
- 已下载了代码示例,下载的flink2vvp-main包中的文件说明如下:
- Table/SQL类型:TableJobKafka2Rds.java
- Datastream类型:DataStreamJobKafka2Rds.java
- 已搭建好了基础环境,详情请参见搭建基础环境。
- 已构建自建集群测试作业并跑通。详情请参见构建自建集群测试作业。说明 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。
自建Flink迁移Flink全托管
- 在RDS控制台创建新结果表rds_new_table1。
- 在Flink全托管开发控制台上,新建Flink SQL流作业。
- 修改Flink SQL作业配置。
- 可选:在页面右上角,单击验证,进行语法检查。
- 在页面右上角,单击上线,将作业提交至集群。
- 在作业运维页面,单击启动。如果作业的状态变为运行中,则表示作业已正常运行。
- 在RDS控制台上,双击rds_new_table1表后,单击执行(F8),查询对应的计算结果。说明 如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。
构建自建集群测试作业
说明 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。
- 在RDS控制台,创建自建集群的sink表rds_old_table1。
- 在Maven中修改以下配置信息并构建新JAR。
- 拷贝新构建的JAR包到自建EMR-Flink集群。
- 连接EMR-Flink集群后,执行以下命令运行Flink作业。连接方式详情请参见 连接方式概述 ECS远程连接操作指南 。
cd / flink run -d -t yarn-per-job -p 2 -ynm 'tablejobkafka2rds' -yjm 1024m -ytm 2048m -yD yarn.appmaster.vcores=1 -yD yarn.containers.vcores=1 -yD state.checkpoints.dir=hdfs:///flink/flink-checkpoints -yD execution.checkpointing.interval="180s" -c com.alibaba.realtimecompute.TableJobKafka2Rds ./flink2vvp-1.0-SNAPSHOT.jar
- 在DMS控制台查看写入RDS的结果。
由上图可见,自建Flink作业能正常消费Kafka数据并成功写入RDS结果表。