本文为您介绍如何将自建Flink集群上的Datastream JAR作业,迁移至实时计算Flink全托管的JAR作业类型中。

背景信息

本文介绍的迁移场景如下图所示。场景3

前提条件

  • 本地已安装Maven 3.x。
  • 已在Maven资源中心下载了开源JDBC Connector包,包括mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar
    重要 依赖文件的版本需要与Flink集群的引擎版本保持一致。
  • 已下载了代码示例,下载的flink2vvp-main包中的文件说明如下:
    • Table/SQL类型:TableJobKafka2Rds.java
    • Datastream类型:DataStreamJobKafka2Rds.java
  • 已搭建好了基础环境,详情请参见搭建基础环境
  • 已构建自建集群测试作业并跑通。详情请参见构建自建集群测试作业
    说明 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。

自建Flink迁移Flink全托管

  1. 在RDS控制台,创建自建集群的sink表rds_old_table1。
    1. 登录RDS控制台。
      登录方式,请参见搭建基础环境
    2. 单击登录数据库
    3. 填写实例信息。
      实例信息
    4. 单击登录
    5. 单击复制IP网段
    6. 将复制的IP网段添加到实例白名单中。
      详情请参见设置IP白名单
    7. 将以下创建表的命令复制到SQL执行窗口,创建新表rds_new_table3。
      CREATE TABLE `rds_new_table3` ( 
        `window_start` timestamp NOT NULL, 
        `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
        `order_type` varchar(8) NOT NULL,
        `order_number` bigint NULL,
        `order_value_sum` double NULL,
        PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
    8. 单击执行(F8)
  2. 在Maven中修改以下配置信息并打包。
    1. 在IntelliJ IDEA中,选择File > Open,打开下载并解压缩完成的flink2vvp包。
    2. 双击打开DataStreamJobKafka2Rds
    3. 修改Kafka和RDS的连接信息。
      配置信息
      类别参数说明
      KafkaKAFKA_TOPICKafka Topic名称。本示例Topic名称为kafka-order。
      KAFKA_BOOT_SERVERSKafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      KAFKA_GROUP_IDKafka消费组ID。本示例消费组ID为demo-group_new3。
      说明 为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
      RDSRDS_URLURL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中<databaseName>为对应的数据库名称。

      云数据库RDS版专有网络VPC地址,即内网地址,详情请​参见查看或修改内外网地址和端口

      RDS_USER_NAME用户名。
      RDS_PASSWORD密码。
      RDS_TABLE表名称。本示例填写为rds_new_table3。
    4. 使用mvn clean package命令构建新JAR包。
      mvn命令
      构建成功后可以在target目录下找到相应的JAR包。JAR包
  3. 在Flink全托管开发控制台上,新建Flink JAR流作业。
    1. 登录实时计算Flink开发控制台。
      登录方式,请参见搭建基础环境
    2. 在左侧导航栏,单击作业开发
    3. 单击新建
    4. 新建文件对话框,填写作业配置信息。
      配置项说明
      文件名称示例为ds_kafka3rds。
      文件类型请选择为流作业/JAR。
    5. 单击确认。
    6. 填写作业信息。
      配置项说明
      JAR URI上传刚编译好的JAR包或者填写对应的JAR信息。
      Entry Point Class指定主类名为com.alibaba.realtimecompute.DataStreamJobKafka2Rdsds
      并行度设置为2。
      附加依赖文件上传已下载的mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar两个JAR包。附加依赖文件
  4. 高级配置页签,配置JM和TM的资源量。
    JM和TM资源量
  5. 单击保存
  6. 在页面右上角,单击上线,将作业提交至集群。
  7. 作业运维页面,单击启动
    如果作业的状态变为运行中,则表示作业已正常运行。
  8. 在RDS控制台上,双击rds_new_table3表后,单击执行(F8),查询对应的计算结果。
    结果3
    说明 如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。

构建自建集群测试作业

说明 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。
  1. 登录RDS数据库创建自建集群的sink表rds_old_table3。
    1. 登录RDS数据库。
      登录方式,请参见搭建基础环境
    2. 执行以下创建rds_old_table3表的命令。
      CREATE TABLE `rds_old_table3` ( 
          `window_start` timestamp NOT NULL, 
          `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
          `order_type` varchar(8) NOT NULL,
          `order_number` bigint NULL,
          `order_value_sum` double NULL,
      PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
      table3
  2. 在Maven中修改以下配置信息并打包。
    1. 在IntelliJ IDEA中,选择File > Open,打开下载并解压缩完成的flink2vvp-main包。
    2. 双击打开DataStreamJobKafka2Rds
    3. 修改Kafka和RDS的连接信息。
      消费组
      类别参数说明
      KafkaKAFKA_TOPICKafka Topic名称。本示例Topic名称为kafka-order。
      KAFKA_BOOT_SERVERSKafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      KAFKA_GROUP_IDKafka消费组ID。
      说明 为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
      RDSRDS_URLURL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中<databaseName>为对应的数据库名称。

      云数据库RDS版专有网络VPC地址,即内网地址,详情请​参见查看或修改内外网地址和端口

      RDS_USER_NAME用户名。
      RDS_PASSWORD密码。
      RDS_TABLE表名称。本示例填写为rds_old_table3。
    4. 使用mvn clean package命令构建新JAR包。
      mvn命令
      构建成功后可以在target目录下找到相应的JAR包。JAR包
  3. 拷贝新构建的JAR包到自建EMR-Flink集群。
    1. 鼠标左键双击EMR-Flink图标。
    2. 查看publp信息,即自建flink master ip。
      emr IP
    3. 使用以下命令拷贝构建的新JAR包到EMR-Flink集群。
      scp {jar包路径} root@{自建flink master ip}:/
      拷贝包到集群

      上个步骤获取的publp即为自建flink master ip。

  4. 连接EMR-Flink集群后,执行以下命令运行Flink作业。
    连接方式详情请参见 连接方式概述 ECS远程连接操作指南
    cd /
    flink  run -d -t yarn-per-job -p 2 -ynm 'tablejobkafka2rds' -yjm 1024m -ytm 2048m -yD yarn.appmaster.vcores=1 -yD yarn.containers.vcores=1 -yD state.checkpoints.dir=hdfs:///flink/flink-checkpoints -yD execution.checkpointing.interval="180s" -c com.alibaba.realtimecompute.TableJobKafka2Rds ./flink2vvp-1.0-SNAPSHOT.jar
    运行Flink作业
  5. 在DMS控制台,查看写入RDS的结果。
    oldtable3

    由上图可见,自建Flink作业正常消费Kafka数据并成功写入RDS结果表。