Table/SQL JAR迁移至JAR

本文为您介绍如何将自建Flink集群上的Table/SQL JAR作业,迁移至实时计算Flink全托管的JAR作业。

背景信息

本文将使用开源JDBC Connector写阿里云RDS的方式进行作业迁移,因此需要额外配置附加依赖文件选项。实时计算Flink全托管版集群内置了商业化的RDS Connector,您也可以替换为开源JDBC Connector。商业化的RDS Connector详情请参见云数据库RDS MySQL结果表

本文介绍的迁移场景如下图所示。场景2

前提条件

  • 本地已安装Maven 3.x。

  • 已在Maven资源中心下载了开源JDBC Connector包,包括mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar

    重要

    依赖文件的版本需要与Flink集群的引擎版本保持一致。

  • 已下载代码示例,下载的flink2vvp-main包中的文件说明如下:

    • Table/SQL类型:TableJobKafka2Rds.java

    • Datastream类型:DataStreamJobKafka2Rds.java

  • 已构建自建集群测试作业并跑通。详情请参见构建自建集群测试作业

    说明

    本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。

操作步骤

  1. 在RDS控制台,创建sink新表rds_new_table2。

    1. 登录RDS控制台。

    2. 单击登录数据库

    3. 填写实例信息。

      实例信息

    4. 单击登录

    5. 单击复制IP网段

    6. 将复制的IP网段添加到实例白名单中。

      详情请参见添加DMS IP地址

    7. 将以下创建表的命令复制到SQL执行窗口。

      CREATE TABLE `rds_new_table2` 
      ( 
        `window_start` timestamp NOT NULL, 
        `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
        `order_type` varchar(8) NOT NULL,
        `order_number` bigint NULL,
        `order_value_sum` double NULL,
        PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
        DEFAULT CHARACTER SET=utf8;
    8. 单击执行(F8)

  2. 在Maven中修改以下配置信息,并构建新JAR。

    1. 在IntelliJ IDEA中,选择File > Open,打开下载并解压缩完成的flink2vvp-main包。

    2. 鼠标左键双击打开TableJobKafka2Rds

    3. 修改Kafka和RDS的连接信息。

      链接信息

      类别

      参数

      说明

      Kafka

      topic

      Kafka Topic名称。本示例Topic名称为kafka-order。

      properties.bootstrap.servers

      Kafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      properties.group.id

      Kafka消费组ID。

      说明

      为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。

      RDS

      url

      云数据库RDS版专有网络VPC地址。

      URL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中<databaseName>为对应的数据库名称。

      table-name

      表名。本示例表名称为rds_new_table2。

      username

      用户名。

      password

      密码。

    4. (可选)在pom.xml文件中,删除Table/SQL JAR迁移至SQL添加的下述依赖。

      POM依赖

      说明

      如果您是直接打开的新代码示例,没有添加上述该依赖信息,则可以忽略该步骤。

    5. 使用mvn clean package命令构建JAR包。

      mvn命令

      构建成功后可以在target目录下找到相应的JAR包。JAR包

  3. 在Flink开发控制台,新建Flink JAR流作业sql_kafka2rds,详情请参见JAR作业开发

    配置项

    说明

    JAR URI

    上传刚编译好的JAR包或者填写对应的JAR信息。

    Entry Point Class

    指定主类名为com.alibaba.realtimecompute.TableJobKafka2Rds

    附加依赖文件

    上传已下载的mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar两个JAR包。

  4. 单击目标作业名称,在部署详情页签资源配置区域,配置JM和TM资源信息和并行度。

  5. 运维中心 > 作业运维页面,单击启动

    如果作业的状态变为运行中,则表示作业已正常运行。

  6. 在RDS控制台上,双击rds_new_table2表后,单击执行(F8),查询对应的计算结果。

    newtable2

    说明

    如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。