本文为您介绍如何将Table/SQL JAR作业迁移至Flink全托管的SQL作业中。
背景信息
本文将使用开源JDBC Connector写阿里云RDS的方式进行Table/SQL JAR迁移至SQL,因此需要额外配置附加依赖文件选项。实时计算Flink全托管版集群内置了商业化的RDS Connector,您也可以替换为开源JDBC Connector。商业化的RDS Connector详情请参见云数据库RDS MySQL结果表。
本文介绍的迁移场景如下图所示。
前提条件
本地已安装Maven 3.x。
已在Maven资源中心下载了开源JDBC Connector包,包括mysql-connector-java-8.0.27.jar和flink-connector-jdbc_2.11-1.13.0.jar。
重要依赖文件的版本需要与Flink集群的引擎版本保持一致。
已下载了代码示例,下载的flink2vvp-main包中的文件说明如下:
Table/SQL类型:TableJobKafka2Rds.java
Datastream类型:DataStreamJobKafka2Rds.java
已构建自建集群测试作业并跑通。详情请参见构建自建集群测试作业。
自建Flink迁移Flink全托管
在RDS控制台创建新结果表rds_new_table1。
在左侧已登录实例下,鼠标左键双击test_db。
将以下创建表的命令复制到SQL执行窗口。
CREATE TABLE `rds_new_table1` ( `window_start` timestamp NOT NULL, `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `order_type` varchar(8) NOT NULL, `order_number` bigint NULL, `order_value_sum` double NULL, PRIMARY KEY ( `window_start`, `window_end`, `order_type` ) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8;
单击执行(F8)。
新建Flink SQL作业。
在Flink开发控制台上,新建Flink SQL流作业sql_kafka1rds,详情请参见SQL作业开发。
在作业开发页面,编写DDL和DML代码。
根据自建集群Table/SQL代码对应的业务逻辑,编写纯SQL代码,代码示例如下。
CREATE TEMPORARY TABLE kafkatable ( order_id varchar, order_time timestamp (3), order_type varchar, order_value float, WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'kafka-order', 'properties.bootstrap.servers' = '192.*.*.76:9092,192.*.*.75:9092,192.*.*.74:9092', 'properties.group.id' = 'demo-group_new1', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE rdstable ( window_start timestamp, window_end timestamp, order_type varchar, order_number bigint, order_value_sum double, PRIMARY KEY (window_start, window_end, order_type) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://rm-**********.mysql.rds.aliyuncs.com:3306/test_db', 'table-name' = 'rds_new_table1', 'driver' = 'com.mysql.jdbc.Driver', 'username' = 'flinK*****', 'password' = 'Test****' ); INSERT INTO rdstable SELECT TUMBLE_START (order_time, INTERVAL '5' MINUTE) as window_start, TUMBLE_END (order_time, INTERVAL '5' MINUTE) as window_end, order_type, COUNT (1) as order_number, SUM (order_value) as order_value_sum FROM kafkatable GROUP BY TUMBLE (order_time, INTERVAL '5' MINUTE), order_type;
修改Kafka和RDS参数配置信息。
类别
参数
说明
Kafka
topic
Kafka Topic名称。本示例Topic名称为kafka-order。
properties.bootstrap.servers
Kafka Broker地址。
格式为
host:port,host:port,host:port
,以英文逗号(,)分割。properties.group.id
Kafka消费组ID。本示例为消费者ID为demo-group_new1。
说明为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
RDS
url
URL的格式为:
jdbc:mysql://<内网地址>/<databaseName>
,其中<databaseName>为对应的数据库名称。云数据库RDS版专有网络VPC地址,即内网地址,详情请参见查看和管理实例连接地址和端口。
table-name
表名。本示例表名称为test_db。
username
用户名。
password
密码。
请填写为Kafka实例详情的接入点信息中查看到的Kafka接入点信息。
(可选)在页面右上角,单击深度检查,进行语法检查。
在页面右上角,单击部署。
在
页面,单击目标作业名称。修改Flink SQL配置。
在部署详情页签基础配置区域的附加依赖文件选项中,上传已下载的mysql-connector-java-8.0.27.jar和flink-connector-jdbc_2.11-1.13.0.jar两个JAR包。
在资源配置区域,配置作业的并发度。
本示例参考自建Flink集群作业运行命令,设置作业并发度为2。
单击保存。
在
页面,单击启动。如果作业的状态变为运行中,则表示作业已正常运行。
在RDS控制台上,双击rds_new_table1表后,单击执行(F8),查询对应的计算结果。
说明如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。
构建自建集群测试作业
本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。
在RDS控制台,创建自建集群的sink表rds_old_table1。
登录RDS控制台。
单击登录数据库。
填写实例信息。
单击登录。
单击复制IP网段。
将复制的IP网段添加到实例白名单中。
详情请参见添加DMS IP地址。
鼠标左键双击test_db。
将以下创建表的命令复制到SQL执行窗口。
CREATE TABLE `rds_old_table1` ( `window_start` timestamp NOT NULL, `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, `order_type` varchar(8) NOT NULL, `order_number` bigint NULL, `order_value_sum` double NULL, PRIMARY KEY ( `window_start`, `window_end`, `order_type` ) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8;
单击执行(F8)。
在Maven中修改以下配置信息并构建新JAR。
在IntelliJ IDEA中,选择 ,打开下载并解压缩完成的flink2vvp-main包。
鼠标左键双击打开TableJobKafka2Rds。
修改Kafka和RDS的连接信息。
类别
参数
说明
Kafka
topic
Kafka Topic名称。本示例Topic名称为kafka-order。
properties.bootstrap.servers
Kafka Broker地址。
格式为
host:port,host:port,host:port
,以英文逗号(,)分割。properties.group.id
Kafka消费组ID。
说明为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
RDS
url
URL的格式为:
jdbc:mysql://<内网地址>/<databaseName>
,其中<databaseName>为对应的数据库名称。云数据库RDS版专有网络VPC地址,即内网地址,详情请参见查看和管理实例连接地址和端口。
table-name
表名。本示例表名称为rds_old_table1。
username
用户名。
password
密码。
在pom.xml文件中,添加以下两部分依赖信息。
<exclusions> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> </exclusion> </exclusions>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
使用
mvn clean package
命令构建JAR包。构建成功后可以在target目录下找到相应的JAR包。
拷贝新构建的JAR包到自建EMR-Flink集群。
鼠标左键双击EMR-Flink图标。
查看publp信息,即自建flink master ip。
使用以下命令拷贝构建的新JAR包到EMR-Flink集群。
scp {jar包路径} root@{自建flink master ip}:/
上个步骤获取的publp即为自建flink master ip。
连接EMR-Flink集群后,执行以下命令运行Flink作业。
连接方式详情请参见ECS远程连接方式概述。
cd / flink run -d -t yarn-per-job -p 2 -ynm 'tablejobkafka2rds' -yjm 1024m -ytm 2048m -yD yarn.appmaster.vcores=1 -yD yarn.containers.vcores=1 -yD state.checkpoints.dir=hdfs:///flink/flink-checkpoints -yD execution.checkpointing.interval="180s" -c com.alibaba.realtimecompute.TableJobKafka2Rds ./flink2vvp-1.0-SNAPSHOT.jar
在DMS控制台,查看写入RDS的结果。
由上图可见,自建Flink作业能正常消费Kafka数据并成功写入RDS结果表。