本文为您介绍如何将自建Flink集群上的Table/SQL JAR作业,迁移至实时计算Flink全托管的JAR作业。
背景信息
本文将使用开源JDBC Connector写阿里云RDS的方式进行作业迁移,因此需要额外配置附加依赖文件选项。实时计算Flink全托管版集群内置了商业化的RDS Connector,您也可以替换为开源JDBC Connector。商业化的RDS Connector详情请参见云数据库RDS MySQL结果表。
本文介绍的迁移场景如下图所示。
前提条件
本地已安装Maven 3.x。
已在Maven资源中心下载了开源JDBC Connector包,包括mysql-connector-java-8.0.27.jar和flink-connector-jdbc_2.11-1.13.0.jar。
重要依赖文件的版本需要与Flink集群的引擎版本保持一致。
已下载代码示例,下载的flink2vvp-main包中的文件说明如下:
Table/SQL类型:TableJobKafka2Rds.java
Datastream类型:DataStreamJobKafka2Rds.java
已构建自建集群测试作业并跑通。详情请参见构建自建集群测试作业。
说明本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。
操作步骤
在RDS控制台,创建sink新表rds_new_table2。
登录RDS控制台。
单击登录数据库。
填写实例信息。
单击登录。
单击复制IP网段。
将复制的IP网段添加到实例白名单中。
详情请参见添加DMS IP地址。
将以下创建表的命令复制到SQL执行窗口。
CREATE TABLE `rds_new_table2` ( `window_start` timestamp NOT NULL, `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `order_type` varchar(8) NOT NULL, `order_number` bigint NULL, `order_value_sum` double NULL, PRIMARY KEY ( `window_start`, `window_end`, `order_type` ) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8;
单击执行(F8)。
在Maven中修改以下配置信息,并构建新JAR。
在IntelliJ IDEA中,选择 ,打开下载并解压缩完成的flink2vvp-main包。
鼠标左键双击打开TableJobKafka2Rds。
修改Kafka和RDS的连接信息。
类别
参数
说明
Kafka
topic
Kafka Topic名称。本示例Topic名称为kafka-order。
properties.bootstrap.servers
Kafka Broker地址。
格式为
host:port,host:port,host:port
,以英文逗号(,)分割。properties.group.id
Kafka消费组ID。
说明为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
RDS
url
云数据库RDS版专有网络VPC地址。
URL的格式为:
jdbc:mysql://<内网地址>/<databaseName>
,其中<databaseName>为对应的数据库名称。table-name
表名。本示例表名称为rds_new_table2。
username
用户名。
password
密码。
(可选)在pom.xml文件中,删除Table/SQL JAR迁移至SQL添加的下述依赖。
说明如果您是直接打开的新代码示例,没有添加上述该依赖信息,则可以忽略该步骤。
使用
mvn clean package
命令构建JAR包。构建成功后可以在target目录下找到相应的JAR包。
在Flink开发控制台,新建Flink JAR流作业sql_kafka2rds,详情请参见JAR作业开发。
配置项
说明
JAR URI
上传刚编译好的JAR包或者填写对应的JAR信息。
Entry Point Class
指定主类名为com.alibaba.realtimecompute.TableJobKafka2Rds。
附加依赖文件
上传已下载的mysql-connector-java-8.0.27.jar和flink-connector-jdbc_2.11-1.13.0.jar两个JAR包。
单击目标作业名称,在部署详情页签资源配置区域,配置JM和TM资源信息和并行度。
在
页面,单击启动。如果作业的状态变为运行中,则表示作业已正常运行。
在RDS控制台上,双击rds_new_table2表后,单击执行(F8),查询对应的计算结果。
说明如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。