Table/SQL JAR迁移至SQL

本文为您介绍如何将Table/SQL JAR作业迁移至Flink全托管的SQL作业中。

背景信息

本文将使用开源JDBC Connector写阿里云RDS的方式进行Table/SQL JAR迁移至SQL,因此需要额外配置附加依赖文件选项。实时计算Flink全托管版集群内置了商业化的RDS Connector,您也可以替换为开源JDBC Connector。商业化的RDS Connector详情请参见云数据库RDS MySQL结果表

本文介绍的迁移场景如下图所示。场景1

前提条件

  • 本地已安装Maven 3.x。

  • 已在Maven资源中心下载了开源JDBC Connector包,包括mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar

    重要

    依赖文件的版本需要与Flink集群的引擎版本保持一致。

  • 已下载了代码示例,下载的flink2vvp-main包中的文件说明如下:

    • Table/SQL类型:TableJobKafka2Rds.java

    • Datastream类型:DataStreamJobKafka2Rds.java

  • 已构建自建集群测试作业并跑通。详情请参见构建自建集群测试作业

    说明
    • 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。

    • Maven资源中心代码示例属于第三方搭建的网站,访问时可能会存在无法打开或访问延迟的问题。

自建Flink迁移Flink全托管

  1. RDS控制台创建新结果表rds_new_table1。

    1. 在左侧已登录实例下,鼠标左键双击test_db。

    2. 将以下创建表的命令复制到SQL执行窗口。

      CREATE TABLE `rds_new_table1`
      ( 
        `window_start` timestamp NOT NULL, 
        `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
        `order_type` varchar(8) NOT NULL,
        `order_number` bigint NULL,
        `order_value_sum` double NULL,
         PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
    3. 单击执行(F8)

  2. 新建Flink SQL作业。

    1. Flink开发控制台上,新建Flink SQL流作业sql_kafka1rds,详情请参见SQL作业开发

    2. 在作业开发页面,编写DDLDML代码。

      根据自建集群Table/SQL代码对应的业务逻辑,编写纯SQL代码,代码示例如下。

      CREATE TEMPORARY TABLE kafkatable (
        order_id varchar,
        order_time timestamp (3),
        order_type varchar,
        order_value float,
        WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'kafka-order',
        'properties.bootstrap.servers' = '192.*.*.76:9092,192.*.*.75:9092,192.*.*.74:9092',
        'properties.group.id' = 'demo-group_new1',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
      );
      
      CREATE TEMPORARY TABLE rdstable (
        window_start timestamp,
        window_end timestamp,
        order_type varchar,
        order_number bigint,
        order_value_sum double,
        PRIMARY KEY (window_start, window_end, order_type) NOT ENFORCED
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://rm-**********.mysql.rds.aliyuncs.com:3306/test_db',
        'table-name' = 'rds_new_table1',
        'driver' = 'com.mysql.jdbc.Driver',
        'username' = 'flinK*****',
        'password' = 'Test****'
      );
      
      INSERT INTO rdstable
      SELECT TUMBLE_START (order_time, INTERVAL '5' MINUTE) as window_start,
        TUMBLE_END (order_time, INTERVAL '5' MINUTE) as window_end,
        order_type,
        COUNT (1) as order_number,
        SUM (order_value) as order_value_sum
      FROM
        kafkatable
      GROUP BY TUMBLE (order_time, INTERVAL '5' MINUTE), order_type; 
    3. 修改KafkaRDS参数配置信息。

      类别

      参数

      说明

      Kafka

      topic

      Kafka Topic名称。本示例Topic名称为kafka-order。

      properties.bootstrap.servers

      Kafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      properties.group.id

      Kafka消费组ID。本示例为消费者IDdemo-group_new1。

      说明

      为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。

      RDS

      url

      URL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中<databaseName>为对应的数据库名称。

      云数据库RDS版专有网络VPC地址,即内网地址,详情请​参见查看和管理实例连接地址和端口

      table-name

      表名。本示例表名称为test_db。

      username

      用户名。

      password

      密码。

      请填写为Kafka实例详情的接入点信息中查看到的Kafka接入点信息。

  3. (可选)在页面右上角,单击深度检查,进行语法检查。

  4. 在页面右上角,单击部署

  5. 运维中心 > 作业运维页面,单击目标作业名称。

  6. 修改Flink SQL配置。

    1. 部署详情页签基础配置区域的附加依赖文件选项中,上传已下载的mysql-connector-java-8.0.27.jarflink-connector-jdbc_2.11-1.13.0.jar两个JAR包。

    2. 资源配置区域,配置作业的并发度。

      本示例参考自建Flink集群作业运行命令,设置作业并发度为2。

    3. 单击保存

  7. 运维中心 > 作业运维页面,单击启动

    如果作业的状态变为运行中,则表示作业已正常运行。

  8. RDS控制台上,双击rds_new_table1表后,单击执行(F8),查询对应的计算结果。

    newtable1

    说明

    如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。

构建自建集群测试作业

说明

本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。

  1. RDS控制台,创建自建集群的sinkrds_old_table1。

    1. 登录RDS控制台。

    2. 单击登录数据库

    3. 填写实例信息。

      实例信息

    4. 单击登录

    5. 单击复制IP网段

    6. 将复制的IP网段添加到实例白名单中。

      详情请参见添加DMS IP地址

    7. 鼠标左键双击test_db

    8. 将以下创建表的命令复制到SQL执行窗口。

      CREATE TABLE `rds_old_table1` ( 
        `window_start` timestamp NOT NULL, 
        `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
        `order_type` varchar(8) NOT NULL,
        `order_number` bigint NULL,
        `order_value_sum` double NULL,
        PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;

      RDS

    9. 单击执行(F8)

  2. Maven中修改以下配置信息并构建新JAR。

    1. IntelliJ IDEA中,选择File > Open,打开下载并解压缩完成的flink2vvp-main包。

    2. 鼠标左键双击打开TableJobKafka2Rds

    3. 修改KafkaRDS的连接信息。

      链接信息

      类别

      参数

      说明

      Kafka

      topic

      Kafka Topic名称。本示例Topic名称为kafka-order。

      properties.bootstrap.servers

      Kafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。

      properties.group.id

      Kafka消费组ID。

      说明

      为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。

      RDS

      url

      URL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中<databaseName>为对应的数据库名称。

      云数据库RDS版专有网络VPC地址,即内网地址,详情请​参见查看和管理实例连接地址和端口

      table-name

      表名。本示例表名称为rds_old_table1。

      username

      用户名。

      password

      密码。

    4. pom.xml文件中,添加以下两部分依赖信息。

      POM依赖

      • <exclusions>
           <exclusion>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
           </exclusion>
        </exclusions>
      • <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-core</artifactId>
           <version>${flink.version}</version>
           <scope>provided</scope>
        </dependency>
    5. 使用mvn clean package命令构建JAR包。

      mvn命令

      构建成功后可以在target目录下找到相应的JAR包。JAR包

  3. 拷贝新构建的JAR包到自建EMR-Flink集群。

    1. 鼠标左键双击EMR-Flink图标。

    2. 查看publp信息,即自建flink master ip。

      emr IP

    3. 使用以下命令拷贝构建的新JAR包到EMR-Flink集群。

      scp {jar包路径} root@{自建flink master ip}:/

      拷贝包到集群

      上个步骤获取的publp即为自建flink master ip。

  4. 连接EMR-Flink集群后,执行以下命令运行Flink作业。

    连接方式详情请参见ECS远程连接方式概述

    cd /
    flink  run -d -t yarn-per-job -p 2 -ynm 'tablejobkafka2rds' -yjm 1024m -ytm 2048m -yD yarn.appmaster.vcores=1 -yD yarn.containers.vcores=1 -yD state.checkpoints.dir=hdfs:///flink/flink-checkpoints -yD execution.checkpointing.interval="180s" -c com.alibaba.realtimecompute.TableJobKafka2Rds ./flink2vvp-1.0-SNAPSHOT.jar

    运行Flink作业

  5. DMS控制台,查看写入RDS的结果。

    oldtable1

    由上图可见,自建Flink作业能正常消费Kafka数据并成功写入RDS结果表。