本文为您介绍如何将开源自建Flink集群的JAR作业迁移至阿里云实时计算Flink版。
背景信息
本文已准备示例JAR作业,GitHub项目请参见flink2vvp。
作业以统计每5分钟窗口内订单的订单总量和订单总金额的业务场景为例,通过Flink消费云消息队列 Kafka 版数据并写入云数据库RDS实现计算逻辑。迁移至阿里云实时计算Flink版可通过SQL或JAR方式进行部署,迁移场景如下图所示。
前提条件
已搭建迁移环境,详情请参见搭建基础环境。
注意事项
作业中非Connector的依赖范围需添加provided,避免与云上依赖产生冲突。
目前不支持迁移作业的状态数据。
在使用多个Connector时,注意META-INF目录需要合并,即在pom.xml文件中
maven-shade-plugin
插件中添加如下代码:<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
准备工作
创建RDS MySQL数据库和表
为目标实例创建
test_db
示例数据库。创建结果表
rds_new_table
。在目标实例详情页面,单击上方的登录数据库。
双击目标数据库,在SQL执行窗口创建结果表。
本文在test_db数据库下创建表如下。
CREATE TABLE `rds_new_table` ( `window_start` timestamp NOT NULL, `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `order_type` varchar(8) NOT NULL, `order_number` bigint NULL, `order_value_sum` double NULL, PRIMARY KEY ( `window_start`, `window_end`, `order_type` ) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8;
单击执行后,单击直接执行。
准备Kafka测试数据
本文创建名为
kafka-order
的Topic和名为demo-group
的Group,详情请参见创建Topic和Group。编写DDL和DML代码,向Kafka中写入测试数据。
在实时计算控制台,单击目标工作空间操作列下的控制台,进入项目空间。
在
页面,单击新建,单击空白的流作业草稿,并将如下代码拷贝到SQL编辑区域。CREATE TEMPORARY TABLE data_in ( id VARCHAR, order_value FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.id.length' = '10', 'fields.order_value.min' = '1.0', 'fields.order_value.max' = '100.0' ); CREATE TEMPORARY TABLE kafka_order ( order_id VARCHAR, order_time TIMESTAMP, order_type VARCHAR, order_value FLOAT ) WITH ( 'connector' = 'kafka', 'topic' = 'kafka-order', 'properties.bootstrap.servers' = 'alikafka-pre-cn-0dw3yxao****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-0dw3yxao****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-0dw3yxao****-3-vpc.alikafka.aliyuncs.com:9092', 'format' = 'csv' ); INSERT INTO kafka_order SELECT id as order_id, CURRENT_TIMESTAMP as order_time, CASE WHEN (substring (id, 1, 1) <= 'z' AND substring (id, 1, 1) >= 'a') and (substring (id, 2, 1) <= 'z' AND substring (id, 2, 1) >= 'a') THEN 'typeA' WHEN (substring (id, 1, 1) <= 'z' AND substring (id, 1, 1) >= 'a') and (substring (id, 2, 1) <= '9' AND substring (id, 2, 1) >= '0') THEN 'typeB' WHEN (substring (id, 1, 1) <= '9' AND substring (id, 1, 1) >= '0') and (substring (id, 2, 1) <= '9' AND substring (id, 2, 1) >= '0') THEN 'typeC' ELSE 'typeD' END as order_type, order_value FROM data_in;
本文使用datagen连接器随机生成数据,实时写入Kafka。WITH参数详情请参见Datagen和消息队列Kafka。
单击右上方的部署,进行作业部署。
单击左侧导航栏的
,单击目标作业操作列的启动,选择无状态启动后单击启动。
自建Flink迁移阿里云Flink
自建Flink作业可通过在实时计算Flink版部署SQL作业或者JAR作业完成迁移。本文使用内置连接器创建了一个Flink SQL或JAR作业,实现自建Flink作业迁移并验证迁移后的作业运行结果。
作业迁移
SQL方式迁移
抽取Table/SQL代码中的SQL语句,使用SQL方式部署。
在实时计算控制台,单击目标工作空间操作列下的控制台,进入项目空间。
在
页面,单击新建,单击空白的流作业草稿。编写DDL和DML代码。
抽取提炼flink2vvp中TableJobKafka2Rds.java代码的SQL语句,SQL示例如下。
CREATE TEMPORARY TABLE kafkatable ( order_id varchar, order_time timestamp (3), order_type varchar, order_value float, WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'kafka-order', 'properties.bootstrap.servers' = 'alikafka-pre-cn-0dw3yxao****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-0dw3yxao****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-0dw3yxao****-3-vpc.alikafka.aliyuncs.com:9092', 'properties.group.id' = 'demo-group', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE rdstable ( window_start timestamp, window_end timestamp, order_type varchar, order_number bigint, order_value_sum double, PRIMARY KEY (window_start, window_end, order_type) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-**********.mysql.rds.aliyuncs.com', 'port'='3306', 'database-name' ='test_db', 'table-name'= 'rds_new_table', 'username' = 'flink****', 'password' = '${secret_values.rdspw}' ); INSERT INTO rdstable SELECT TUMBLE_START (order_time, INTERVAL '5' MINUTE) as window_start, TUMBLE_END (order_time, INTERVAL '5' MINUTE) as window_end, order_type, COUNT (1) as order_number, SUM (order_value) as order_value_sum FROM kafkatable GROUP BY TUMBLE (order_time, INTERVAL '5' MINUTE), order_type;
修改Kafka和RDS参数配置信息。
类别
参数
说明
Kafka
topic
Kafka Topic名称。本示例为kafka-order。
properties.bootstrap.servers
Kafka Broker地址。格式为
host:port,host:port,host:port
,以英文逗号(,)分割。您可以在云消息队列Kafka版实例详情页的接入点信息中查看,详情请参见查看接入点。properties.group.id
Kafka消费组ID。本示例为demo-group。
说明为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
RDS
connector
使用内置MySQL连接器,固定值为mysql,支持所有兼容MySQL协议的数据库,包括RDS MySQL、PolarDB for MySQL或者自建MySQL,详情请参见MySQL。
hostname
MySQL数据库的IP地址或者Hostname。建议填写专有网络VPC地址。
云数据库RDS版专有网络VPC地址,即内网地址,详情请参见查看和管理实例连接地址和端口。
port
MySQL数据库服务的端口号。
database-name
MySQL数据库名称。本示例为test_db。
table-name
MySQL表名。本示例为rds_new_table。
username
MySQL数据库服务的用户名。
password
MySQL数据库服务的密码。
为了避免密码等信息明文暴露,本文使用了变量功能,详情请参见变量管理。
(可选)如果您使用了开源连接器JAR包或其他相关依赖,需单击页面右侧的更多配置,在附加依赖文件位置上传文件。
说明注意依赖文件的版本需要与Flink作业的引擎版本保持一致。
(可选)单击页面右上角的深度检查和调试进行语法检查与作业逻辑性验证。
调试功能可以模拟作业运行、检查输出结果,验证SELECT或INSERT业务逻辑的正确性,但需要创建Session集群,详情请参见作业调试。
单击页面右上角的部署。
SQL作业操作流程请参见Flink SQL作业快速入门。
Table/SQL JAR方式迁移
编译打包Table/SQL代码生成JAR包,使用JAR方式部署。
在Maven中修改以下配置信息,并构建新JAR。
在IntelliJ IDEA中,选择
,打开下载并解压缩完成的flink2vvp-main包。双击打开TableJobKafka2Rds,修改内置Kafka和RDS连接器需要配置的WITH参数。
类别
参数
说明
Kafka
topic
Kafka Topic名称。本示例为kafka-order。
properties.bootstrap.servers
Kafka Broker地址。格式为
host:port,host:port,host:port
,以英文逗号(,)分割。您可以在云消息队列Kafka版实例详情页的接入点信息中查看,详情请参见查看接入点。properties.group.id
Kafka消费组ID。本示例为demo-group。
说明为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
RDS
connector
使用内置MySQL连接器,固定值为mysql,支持所有兼容MySQL协议的数据库,包括RDS MySQL、PolarDB for MySQL或者自建MySQL,详情请参见MySQL。
hostname
MySQL数据库的IP地址或者Hostname。建议填写专有网络VPC地址。
云数据库RDS版专有网络VPC地址,即内网地址,详情请参见查看和管理实例连接地址和端口。
port
MySQL数据库服务的端口号。
database-name
MySQL数据库名称。本示例为test_db。
table-name
MySQL表名。本示例为rds_new_table。
username
MySQL数据库服务的用户名。
password
MySQL数据库服务的密码。
为了避免密码等信息明文暴露,本文使用了变量功能,详情请参见变量管理。
修改pom.xml文件中的相关配置信息。
说明下载的代码示例中pom依赖为Flink 1.13版本,请您根据您实际作业版本情况修改pom信息。
添加Properties
添加
<vvr.version>
标签,设置的版本号需与后续部署的JAR作业版本一致。修改连接器依赖
修改连接器依赖为内置Kafka和MySQL连接器。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> </dependency>
构建配置修改
在
<configuration>
标签内添加<excludes>
部分,构建时忽略DataStreamJobKafka2Rds.java
源文件。<excludes> <exclude>**/DataStreamJobKafka2Rds.java</exclude> </excludes>
使用
mvn clean package
命令构建JAR包。构建成功后可以在target目录下找到相应的JAR包。
部署JAR作业。
JAR作业操作流程请参见Flink JAR作业快速入门。
DataStream JAR方式迁移
编译打包DataStream代码生成JAR包,使用JAR方式部署。
在Maven中修改以下配置信息,并构建新JAR。
在IntelliJ IDEA中,选择
,打开下载并解压缩完成的flink2vvp包。双击打开DataStreamJobKafka2Rds,修改Kafka和RDS的连接信息。
类别
参数
说明
Kafka
KAFKA_TOPIC
Kafka Topic名称。本示例Topic名称为kafka-order。
KAFKA_BOOT_SERVERS
Kafka Broker地址。
格式为
host:port,host:port,host:port
,以英文逗号(,)分割。KAFKA_GROUP_ID
Kafka消费组ID。本示例消费组ID为
demo-group
。说明为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
RDS
RDS_URL
URL的格式为:
jdbc:mysql://<内网地址>/<databaseName>
,其中<databaseName>为对应的数据库名称。云数据库RDS版专有网络VPC地址,即内网地址,详情请参见查看和管理实例连接地址和端口。
RDS_USER_NAME
用户名。
RDS_PASSWORD
密码。
RDS_TABLE
表名称。本示例填写为rds_new_table。
使用
mvn clean package
命令构建新JAR包。构建成功后可以在target目录下找到相应的JAR包。
部署JAR作业。
JAR作业操作流程请参见Flink JAR作业快速入门。
修改Flink作业配置并启动作业。
在
页面,单击目标作业名称。在部署详情页签资源配置区域,配置作业并发度。
本示例参考自建Flink集群作业运行命令,设置作业并发度为2。您也可以根据需要配置其他参数。
单击保存。
单击作业详情页面右上方的启动,选择无状态启动后单击启动。详情请参见作业启动。
作业状态变为运行中后,可在RDS控制台上,查询对应的计算结果。
说明如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。