本文为您介绍如何将Table/SQL JAR作业迁移至Flink全托管的SQL作业中。

背景信息

本文将使用开源JDBC Connector写阿里云RDS的方式进行Table/SQL JAR迁移至SQL,因此需要额外配置附加依赖文件选项。实时计算Flink全托管版集群内置了商业化的RDS Connector,您也可以替换为开源JDBC Connector。商业化的RDS Connector详情请参见云数据库RDS MySQL结果表

本文介绍的迁移场景如下图所示。场景1

前提条件

  • 本地已安装Maven 3.x。
  • 已在Maven资源中心下载了开源JDBC Connector包,包括mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar
    重要 依赖文件的版本需要与Flink集群的引擎版本保持一致。
  • 已下载了代码示例,下载的flink2vvp-main包中的文件说明如下:
    • Table/SQL类型:TableJobKafka2Rds.java
    • Datastream类型:DataStreamJobKafka2Rds.java
  • 已搭建好了基础环境,详情请参见搭建基础环境
  • 已构建自建集群测试作业并跑通。详情请参见构建自建集群测试作业
    说明 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。

自建Flink迁移Flink全托管

  1. 在RDS控制台创建新结果表rds_new_table1。
    1. 在左侧已登录实例下,鼠标左键双击test_db。
    2. 将以下创建表的命令复制到SQL执行窗口。
      CREATE TABLE `rds_new_table1`
      ( 
        `window_start` timestamp NOT NULL, 
        `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
        `order_type` varchar(8) NOT NULL,
        `order_number` bigint NULL,
        `order_value_sum` double NULL,
         PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
    3. 单击执行(F8)
  2. 在Flink全托管开发控制台上,新建Flink SQL流作业。
    1. 登录实时计算Flink开发控制台。
      登录方式,请参见搭建基础环境
    2. 在左侧导航栏,单击作业开发
    3. 单击新建
    4. 新建文件对话框,填写作业配置信息。
      作业名称示例为sql_kafka1rds,文件类型选择为流作业/SQL。
    5. 在作业开发页面,编写DDL和DML代码。
      根据自建集群Table/SQL代码对应的业务逻辑,编写纯SQL代码,代码示例如下。
      CREATE TEMPORARY TABLE kafkatable (
        order_id varchar,
        order_time timestamp (3),
        order_type varchar,
        order_value float,
        WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'kafka-order',
        'properties.bootstrap.servers' = '192.*.*.76:9092,192.*.*.75:9092,192.*.*.74:9092',
        'properties.group.id' = 'demo-group_new1',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
      );
      
      CREATE TEMPORARY TABLE rdstable (
        window_start timestamp,
        window_end timestamp,
        order_type varchar,
        order_number bigint,
        order_value_sum double,
        PRIMARY KEY (window_start, window_end, order_type) NOT ENFORCED
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://rm-**********.mysql.rds.aliyuncs.com:3306/test_db',
        'table-name' = 'rds_new_table1',
        'driver' = 'com.mysql.jdbc.Driver',
        'username' = 'flinK*****',
        'password' = 'Test****'
      );
      
      INSERT INTO rdstable
      SELECT TUMBLE_START (order_time, INTERVAL '5' MINUTE) as window_start,
        TUMBLE_END (order_time, INTERVAL '5' MINUTE) as window_end,
        order_type,
        COUNT (1) as order_number,
        SUM (order_value) as order_value_sum
      FROM
        kafkatable
      GROUP BY TUMBLE (order_time, INTERVAL '5' MINUTE), order_type; 
    6. 修改Kafka和RDS参数配置信息。
      类别参数说明
      KafkatopicKafka Topic名称。本示例Topic名称为kafka-order。
      properties.bootstrap.serversKafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      properties.group.idKafka消费组ID。本示例为消费者ID为demo-group_new1。
      说明 为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
      RDSurlURL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中<databaseName>为对应的数据库名称。

      云数据库RDS版专有网络VPC地址,即内网地址,详情请​参见查看或修改内外网地址和端口

      table-name表名。本示例表名称为test_db。
      username用户名。
      password密码。
      请填写为Kafka实例详情的接入点信息中查看到的Kafka接入点信息。
  3. 修改Flink SQL作业配置。
    1. 高级配置页签的附加依赖文件选项中,上传已下载的mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar两个JAR包。
      上传JAR包
    2. 资源配置页签,配置作业的并发度。
      本示例参考自建Flink集群作业运行命令,设置作业并发度为2。并发度
    3. 单击保存
  4. 可选:在页面右上角,单击验证,进行语法检查。
  5. 在页面右上角,单击上线,将作业提交至集群。
  6. 作业运维页面,单击启动
    如果作业的状态变为运行中,则表示作业已正常运行。
  7. 在RDS控制台上,双击rds_new_table1表后,单击执行(F8),查询对应的计算结果。
    newtable1
    说明 如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。

构建自建集群测试作业

说明 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。
  1. 在RDS控制台,创建自建集群的sink表rds_old_table1。
    1. 登录RDS控制台。
      登录方式,请参见搭建基础环境
    2. 单击登录数据库
    3. 填写实例信息。
      实例信息
    4. 单击登录
    5. 单击复制IP网段
    6. 将复制的IP网段添加到实例白名单中。
      详情请参见设置IP白名单
    7. 鼠标左键双击test_db
    8. 将以下创建表的命令复制到SQL执行窗口。
      CREATE TABLE `rds_old_table1` ( 
        `window_start` timestamp NOT NULL, 
        `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
        `order_type` varchar(8) NOT NULL,
        `order_number` bigint NULL,
        `order_value_sum` double NULL,
        PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
      RDS
    9. 单击执行(F8)
  2. 在Maven中修改以下配置信息并构建新JAR。
    1. 在IntelliJ IDEA中,选择File > Open,打开下载并解压缩完成的flink2vvp-main包。
    2. 鼠标左键双击打开TableJobKafka2Rds
    3. 修改Kafka和RDS的连接信息。
      链接信息
      类别参数说明
      KafkatopicKafka Topic名称。本示例Topic名称为kafka-order。
      properties.bootstrap.serversKafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      properties.group.idKafka消费组ID。
      说明 为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。
      RDSurlURL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中<databaseName>为对应的数据库名称。

      云数据库RDS版专有网络VPC地址,即内网地址,详情请​参见查看或修改内外网地址和端口

      table-name表名。本示例表名称为rds_old_table1。
      username用户名。
      password密码。
    4. 在pom.xml文件中,添加以下两部分依赖信息。
      POM依赖
      • <exclusions>
           <exclusion>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
           </exclusion>
        </exclusions>
      • <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-core</artifactId>
           <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>
    5. 使用mvn clean package命令构建JAR包。
      mvn命令
      构建成功后可以在target目录下找到相应的JAR包。JAR包
  3. 拷贝新构建的JAR包到自建EMR-Flink集群。
    1. 鼠标左键双击EMR-Flink图标。
    2. 查看publp信息,即自建flink master ip。
      emr IP
    3. 使用以下命令拷贝构建的新JAR包到EMR-Flink集群。
      scp {jar包路径} root@{自建flink master ip}:/
      拷贝包到集群

      上个步骤获取的publp即为自建flink master ip。

  4. 连接EMR-Flink集群后,执行以下命令运行Flink作业。
    连接方式详情请参见 连接方式概述 ECS远程连接操作指南
    cd /
    flink  run -d -t yarn-per-job -p 2 -ynm 'tablejobkafka2rds' -yjm 1024m -ytm 2048m -yD yarn.appmaster.vcores=1 -yD yarn.containers.vcores=1 -yD state.checkpoints.dir=hdfs:///flink/flink-checkpoints -yD execution.checkpointing.interval="180s" -c com.alibaba.realtimecompute.TableJobKafka2Rds ./flink2vvp-1.0-SNAPSHOT.jar
    运行Flink作业
  5. 在DMS控制台查看写入RDS的结果。
    oldtable1

    由上图可见,自建Flink作业能正常消费Kafka数据并成功写入RDS结果表。