本文为您介绍如何将自建Flink集群上的Datastream JAR作业,迁移至实时计算Flink全托管的JAR作业类型中。
背景信息
本文介绍的迁移场景如下图所示。
前提条件
- 本地已安装Maven 3.x。
- 已在Maven资源中心下载了开源JDBC Connector包,包括mysql-connector-java-8.0.27.jar和flink-connector-jdbc_2.11-1.13.0.jar。重要 依赖文件的版本需要与Flink集群的引擎版本保持一致。
- 已下载了代码示例,下载的flink2vvp-main包中的文件说明如下:
- Table/SQL类型:TableJobKafka2Rds.java
- Datastream类型:DataStreamJobKafka2Rds.java
- 已搭建好了基础环境,详情请参见搭建基础环境。
- 已构建自建集群测试作业并跑通。详情请参见构建自建集群测试作业。说明 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。
自建Flink迁移Flink全托管
- 在RDS控制台,创建自建集群的sink表rds_old_table1。
- 在Maven中修改以下配置信息并打包。
- 在Flink全托管开发控制台上,新建Flink JAR流作业。
- 在高级配置页签,配置JM和TM的资源量。
- 单击保存。
- 在页面右上角,单击上线,将作业提交至集群。
- 在作业运维页面,单击启动。如果作业的状态变为运行中,则表示作业已正常运行。
- 在RDS控制台上,双击rds_new_table3表后,单击执行(F8),查询对应的计算结果。说明 如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。
构建自建集群测试作业
说明 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。
- 登录RDS数据库创建自建集群的sink表rds_old_table3。
- 在Maven中修改以下配置信息并打包。
- 拷贝新构建的JAR包到自建EMR-Flink集群。
- 连接EMR-Flink集群后,执行以下命令运行Flink作业。连接方式详情请参见 连接方式概述 ECS远程连接操作指南 。
cd / flink run -d -t yarn-per-job -p 2 -ynm 'tablejobkafka2rds' -yjm 1024m -ytm 2048m -yD yarn.appmaster.vcores=1 -yD yarn.containers.vcores=1 -yD state.checkpoints.dir=hdfs:///flink/flink-checkpoints -yD execution.checkpointing.interval="180s" -c com.alibaba.realtimecompute.TableJobKafka2Rds ./flink2vvp-1.0-SNAPSHOT.jar
- 在DMS控制台,查看写入RDS的结果。
由上图可见,自建Flink作业正常消费Kafka数据并成功写入RDS结果表。