本文为您介绍如何将自建Flink集群上的Table/SQL JAR作业,迁移至实时计算Flink全托管的JAR作业。

背景信息

本文将使用开源JDBC Connector写阿里云RDS的方式进行作业迁移,因此需要额外配置附加依赖文件选项。实时计算Flink全托管版集群内置了商业化的RDS Connector,您也可以替换为开源JDBC Connector。商业化的RDS Connector详情请参见云数据库RDS MySQL结果表

本文介绍的迁移场景如下图所示。场景2

前提条件

  • 本地已安装Maven 3.x。
  • 已在Maven资源中心下载了开源JDBC Connector包,包括mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar
    重要 依赖文件的版本需要与Flink集群的引擎版本保持一致。
  • 已下载代码示例,下载的flink2vvp-main包中的文件说明如下:
    • Table/SQL类型:TableJobKafka2Rds.java
    • Datastream类型:DataStreamJobKafka2Rds.java
  • 已搭建好了基础环境,详情请参见搭建基础环境
  • 已构建自建集群测试作业并跑通。详情请参见构建自建集群测试作业
    说明 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。

操作步骤

  1. 在RDS控制台,创建sink新表rds_new_table2。
    1. 登录RDS控制台。
      登录方式,请参见搭建基础环境
    2. 单击登录数据库
    3. 填写实例信息。
      实例信息
    4. 单击登录
    5. 单击复制IP网段
    6. 将复制的IP网段添加到实例白名单中。
      详情请参见设置IP白名单
    7. 将以下创建表的命令复制到SQL执行窗口。
      CREATE TABLE `rds_new_table2` 
      ( 
        `window_start` timestamp NOT NULL, 
        `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
        `order_type` varchar(8) NOT NULL,
        `order_number` bigint NULL,
        `order_value_sum` double NULL,
        PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
        DEFAULT CHARACTER SET=utf8;
    8. 单击执行(F8)
  2. 在Maven中修改以下配置信息,并构建新JAR。
    1. 在IntelliJ IDEA中,选择File > Open,打开下载并解压缩完成的flink2vvp-main包。
    2. 鼠标左键双击打开TableJobKafka2Rds
    3. 修改Kafka和RDS的连接信息。
      链接信息
      类别参数说明
      KafkatopicKafka Topic名称。本示例Topic名称为kafka-order。
      properties.bootstrap.serversKafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      properties.group.idKafka消费组ID。
      说明 为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
      RDSurl云数据库RDS版专有网络VPC地址。

      URL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中<databaseName>为对应的数据库名称。

      table-name表名。本示例表名称为rds_new_table2。
      username用户名。
      password密码。
    4. 可选:在pom.xml文件中,删除Table/SQL JAR迁移至SQL添加的下述依赖。
      POM依赖
      说明 如果您是直接打开的新代码示例,没有添加上述该依赖信息,则可以忽略该步骤。
    5. 使用mvn clean package命令构建JAR包。
      mvn命令
      构建成功后可以在target目录下找到相应的JAR包。JAR包
  3. 在Flink全托管开发控制台上,新建Flink SQL作业。
    1. 登录实时计算Flink开发控制台。
    2. 在左侧导航栏,单击作业开发
    3. 单击新建
    4. 新建文件对话框,填写作业配置信息。
      配置项说明
      文件名称示例为sql_kafka2rds。
      文件类型请选择为流作业/JAR。
    5. 单击确认
    6. 填写作业信息。
      配置项说明
      JAR URI上传刚编译好的JAR包或者填写对应的JAR信息。
      Entry Point Class指定主类名为com.alibaba.realtimecompute.TableJobKafka2Rdsclass
      并行度设置为2。
      附加依赖文件上传已下载的mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar两个JAR包。附加依赖文件
  4. 高级配置页签,配置JM和TM的资源量。
    sql-kafka2rds
  5. 单击保存
  6. 在页面右上角,单击上线,将作业提交至集群。
  7. 作业运维页面,单击启动
    如果作业的状态变为运行中,则表示作业已正常运行。
  8. 在RDS控制台上,双击rds_new_table2表后,单击执行(F8),查询对应的计算结果。
    newtable2
    说明 如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。