Table/SQL & DataStream JAR作业迁移

本文为您介绍如何将开源自建Flink集群的JAR作业迁移至阿里云实时计算Flink版。

背景信息

说明

本文已准备示例JAR作业,GitHub项目请参见flink2vvp

作业以统计每5分钟窗口内订单的订单总量和订单总金额的业务场景为例,通过Flink消费云消息队列 Kafka 版数据并写入云数据库RDS实现计算逻辑。迁移至阿里云实时计算Flink版可通过SQLJAR方式进行部署,迁移场景如下图所示。

image

前提条件

已搭建迁移环境,详情请参见搭建基础环境

注意事项

  • 作业中非Connector的依赖范围需添加provided,避免与云上依赖产生冲突。

  • 目前不支持迁移作业的状态数据。

  • 在使用多个Connector时,注意META-INF目录需要合并,即在pom.xml文件中maven-shade-plugin插件中添加如下代码:

    <transformers>
      <!-- The service transformer is needed to merge META-INF/services files -->
      <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
      <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
        <projectName>Apache Flink</projectName>
        <encoding>UTF-8</encoding>
      </transformer>
    </transformers>

准备工作

创建RDS MySQL数据库和表

  1. 创建数据库

    为目标实例创建test_db示例数据库。

  2. 创建结果表rds_new_table

    1. 在目标实例详情页面,单击上方的登录数据库

    2. 双击目标数据库,在SQL执行窗口创建结果表。

      本文在test_db数据库下创建表如下。

      CREATE TABLE `rds_new_table`
      ( 
        `window_start` timestamp NOT NULL, 
        `window_end` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
        `order_type` varchar(8) NOT NULL,
        `order_number` bigint NULL,
        `order_value_sum` double NULL,
         PRIMARY KEY ( `window_start`, `window_end`, `order_type` )
      ) ENGINE=InnoDB
      DEFAULT CHARACTER SET=utf8;
    3. 单击执行后,单击直接执行

准备Kafka测试数据

  1. 本文创建名为kafka-orderTopic和名为demo-groupGroup,详情请参见创建TopicGroup

  2. 编写DDLDML代码,向Kafka中写入测试数据。

    1. 实时计算控制台,单击目标工作空间操作列下的控制台,进入项目空间。

    2. 数据开发 > ETL页面,单击新建,单击空白的流作业草稿,并将如下代码拷贝到SQL编辑区域。

      CREATE TEMPORARY TABLE data_in (
        id VARCHAR, 
        order_value FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '100',
        'fields.id.length' = '10',
        'fields.order_value.min' = '1.0',
        'fields.order_value.max' = '100.0'
      );
      CREATE TEMPORARY TABLE kafka_order (
        order_id VARCHAR,
        order_time TIMESTAMP,
        order_type VARCHAR,
        order_value FLOAT
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'kafka-order',
        'properties.bootstrap.servers' = 'alikafka-pre-cn-0dw3yxao****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-0dw3yxao****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-0dw3yxao****-3-vpc.alikafka.aliyuncs.com:9092',
        'format' = 'csv'
      );
      INSERT INTO kafka_order
      SELECT id as order_id,
        CURRENT_TIMESTAMP as order_time,
        CASE
          WHEN (substring (id, 1, 1) <= 'z' AND substring (id, 1, 1) >= 'a') and (substring (id, 2, 1) <= 'z' AND substring (id, 2, 1) >= 'a') THEN 'typeA'
          WHEN (substring (id, 1, 1) <= 'z' AND substring (id, 1, 1) >= 'a') and (substring (id, 2, 1) <= '9' AND substring (id, 2, 1) >= '0') THEN 'typeB'
          WHEN (substring (id, 1, 1) <= '9' AND substring (id, 1, 1) >= '0') and (substring (id, 2, 1) <= '9' AND substring (id, 2, 1) >= '0') THEN 'typeC'
          ELSE 'typeD'
        END as order_type,
        order_value
      FROM
        data_in;

      本文使用datagen连接器随机生成数据,实时写入Kafka。WITH参数详情请参见Datagen消息队列Kafka

    3. 单击右上方的部署,进行作业部署。

    4. 单击左侧导航栏的运维中心 > 作业运维,单击目标作业操作列的启动,选择无状态启动后单击启动

自建Flink迁移阿里云Flink

自建Flink作业可通过在实时计算Flink版部署SQL作业或者JAR作业完成迁移。本文使用内置连接器创建了一个Flink SQLJAR作业,实现自建Flink作业迁移并验证迁移后的作业运行结果。

  1. 作业迁移

    SQL方式迁移

    抽取Table/SQL代码中的SQL语句,使用SQL方式部署。

    1. 实时计算控制台,单击目标工作空间操作列下的控制台,进入项目空间。

    2. 数据开发 > ETL页面,单击新建,单击空白的流作业草稿

    3. 编写DDLDML代码。

      抽取提炼flink2vvpTableJobKafka2Rds.java代码的SQL语句,SQL示例如下。

      CREATE TEMPORARY TABLE kafkatable (
        order_id varchar,
        order_time timestamp (3),
        order_type varchar,
        order_value float,
        WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'kafka-order',
        'properties.bootstrap.servers' = 'alikafka-pre-cn-0dw3yxao****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-0dw3yxao****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-0dw3yxao****-3-vpc.alikafka.aliyuncs.com:9092',
        'properties.group.id' = 'demo-group',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'csv'
      );
      
      CREATE TEMPORARY TABLE rdstable (
        window_start timestamp,
        window_end timestamp,
        order_type varchar,
        order_number bigint,
        order_value_sum double,
        PRIMARY KEY (window_start, window_end, order_type) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql',
        'hostname' = 'rm-**********.mysql.rds.aliyuncs.com',
        'port'='3306',
        'database-name' ='test_db',
        'table-name'= 'rds_new_table',
        'username' = 'flink****',
        'password' = '${secret_values.rdspw}'
      );
      
      INSERT INTO rdstable
      SELECT TUMBLE_START (order_time, INTERVAL '5' MINUTE) as window_start,
        TUMBLE_END (order_time, INTERVAL '5' MINUTE) as window_end,
        order_type,
        COUNT (1) as order_number,
        SUM (order_value) as order_value_sum
      FROM
        kafkatable
      GROUP BY TUMBLE (order_time, INTERVAL '5' MINUTE), order_type; 
    4. 修改KafkaRDS参数配置信息。

      类别

      参数

      说明

      Kafka

      topic

      Kafka Topic名称。本示例为kafka-order。

      properties.bootstrap.servers

      Kafka Broker地址。格式为host:port,host:port,host:port,以英文逗号(,)分割。您可以在云消息队列Kafka版实例详情页的接入点信息中查看,详情请参见查看接入点

      properties.group.id

      Kafka消费组ID。本示例为demo-group。

      说明

      为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。

      RDS

      connector

      使用内置MySQL连接器,固定值为mysql,支持所有兼容MySQL协议的数据库,包括RDS MySQL、PolarDB for MySQL或者自建MySQL,详情请参见MySQL

      hostname

      MySQL数据库的IP地址或者Hostname。建议填写专有网络VPC地址。

      云数据库RDS版专有网络VPC地址,即内网地址,详情请​参见查看和管理实例连接地址和端口

      port

      MySQL数据库服务的端口号。

      database-name

      MySQL数据库名称。本示例为test_db。

      table-name

      MySQL表名。本示例为rds_new_table。

      username

      MySQL数据库服务的用户名。

      password

      MySQL数据库服务的密码。

      为了避免密码等信息明文暴露,本文使用了变量功能,详情请参见变量管理

    5. (可选)如果您使用了开源连接器JAR包或其他相关依赖,需单击页面右侧的更多配置,在附加依赖文件位置上传文件。

      说明

      注意依赖文件的版本需要与Flink作业的引擎版本保持一致。

    6. (可选)单击页面右上角的深度检查调试进行语法检查与作业逻辑性验证。

      调试功能可以模拟作业运行、检查输出结果,验证SELECTINSERT业务逻辑的正确性,但需要创建Session集群,详情请参见作业调试

    7. 单击页面右上角的部署

    SQL作业操作流程请参见Flink SQL作业快速入门

    Table/SQL JAR方式迁移

    编译打包Table/SQL代码生成JAR包,使用JAR方式部署。

    1. Maven中修改以下配置信息,并构建新JAR。

      1. IntelliJ IDEA中,选择File > Open,打开下载并解压缩完成的flink2vvp-main包。

      2. 双击打开TableJobKafka2Rds,修改内置KafkaRDS连接器需要配置的WITH参数。

        image

        类别

        参数

        说明

        Kafka

        topic

        Kafka Topic名称。本示例为kafka-order。

        properties.bootstrap.servers

        Kafka Broker地址。格式为host:port,host:port,host:port,以英文逗号(,)分割。您可以在云消息队列Kafka版实例详情页的接入点信息中查看,详情请参见查看接入点

        properties.group.id

        Kafka消费组ID。本示例为demo-group。

        说明

        为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。

        RDS

        connector

        使用内置MySQL连接器,固定值为mysql,支持所有兼容MySQL协议的数据库,包括RDS MySQL、PolarDB for MySQL或者自建MySQL,详情请参见MySQL

        hostname

        MySQL数据库的IP地址或者Hostname。建议填写专有网络VPC地址。

        云数据库RDS版专有网络VPC地址,即内网地址,详情请​参见查看和管理实例连接地址和端口

        port

        MySQL数据库服务的端口号。

        database-name

        MySQL数据库名称。本示例为test_db。

        table-name

        MySQL表名。本示例为rds_new_table。

        username

        MySQL数据库服务的用户名。

        password

        MySQL数据库服务的密码。

        为了避免密码等信息明文暴露,本文使用了变量功能,详情请参见变量管理

      3. 修改pom.xml文件中的相关配置信息。

        说明

        下载的代码示例中pom依赖为Flink 1.13版本,请您根据您实际作业版本情况修改pom信息。

        添加Properties

        添加<vvr.version>标签,设置的版本号需与后续部署的JAR作业版本一致。

        image

        修改连接器依赖

        修改连接器依赖为内置KafkaMySQL连接器。

        image

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-kafka</artifactId>
            <version>${vvr.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>ververica-connector-mysql</artifactId>
            <version>${vvr.version}</version>
        </dependency>

        构建配置修改

        <configuration>标签内添加<excludes>部分,构建时忽略DataStreamJobKafka2Rds.java源文件。

        image

        <excludes>
            <exclude>**/DataStreamJobKafka2Rds.java</exclude>
        </excludes>
      4. 使用mvn clean package命令构建JAR包。

        构建成功后可以在target目录下找到相应的JAR包。JAR包

    2. 部署JAR作业。

      1. 实时计算控制台,单击目标工作空间操作列下的控制台,进入项目空间。

      2. 运维中心 > 作业运维页面,单击部署作业 > JAR 作业

      3. 填写部署信息,更多参数详情请参见部署JAR作业

        配置项

        说明

        部署名称

        自定义作业名称。

        引擎版本

        需要与依赖文件的版本保持一致。

        JAR URI

        上传刚编译好的JAR包或者填写对应的JAR信息。

        Entry Point Class

        指定主类名为com.alibaba.realtimecompute.TableJobKafka2Rds

        附加依赖文件

        本文使用了内置连接器无需上传依赖文件。如果您使用了开源连接器JAR包或其他相关依赖,请在此处上传。

      4. 单击部署

    JAR作业操作流程请参见Flink JAR作业快速入门

    DataStream JAR方式迁移

    编译打包DataStream代码生成JAR包,使用JAR方式部署。

    1. Maven中修改以下配置信息,并构建新JAR。

      1. IntelliJ IDEA中,选择File > Open,打开下载并解压缩完成的flink2vvp包。

      2. 双击打开DataStreamJobKafka2Rds,修改KafkaRDS的连接信息。

        配置信息

        类别

        参数

        说明

        Kafka

        KAFKA_TOPIC

        Kafka Topic名称。本示例Topic名称为kafka-order。

        KAFKA_BOOT_SERVERS

        Kafka Broker地址。

        格式为host:port,host:port,host:port,以英文逗号(,)分割。

        KAFKA_GROUP_ID

        Kafka消费组ID。本示例消费组IDdemo-group

        说明

        为了避免消费组ID冲突,您可以在Kafka控制台上,创建新的消费组,并在此处使用新的Kafka消费组ID。

        RDS

        RDS_URL

        URL的格式为:jdbc:mysql://<内网地址>/<databaseName>,其中<databaseName>为对应的数据库名称。

        云数据库RDS版专有网络VPC地址,即内网地址,详情请​参见查看和管理实例连接地址和端口

        RDS_USER_NAME

        用户名。

        RDS_PASSWORD

        密码。

        RDS_TABLE

        表名称。本示例填写为rds_new_table。

      3. 使用mvn clean package命令构建新JAR包。

        mvn命令

        构建成功后可以在target目录下找到相应的JAR包。JAR包

    2. 部署JAR作业。

      1. 实时计算控制台,单击目标工作空间操作列下的控制台,进入项目空间。

      2. 运维中心 > 作业运维页面,单击部署作业 > JAR 作业

      3. 填写部署信息,更多参数详情请参见部署JAR作业

        配置项

        说明

        部署名称

        自定义作业名称。

        引擎版本

        需要与依赖文件的版本保持一致。

        JAR URI

        上传刚编译好的JAR包或者填写对应的JAR信息。

        Entry Point Class

        指定主类名为com.alibaba.realtimecompute.DataStreamJobKafka2Rds。

        附加依赖文件

        本文使用了内置连接器无需上传依赖文件。如果您使用了开源连接器JAR包或其他相关依赖,请在此处上传。

      4. 单击部署

    JAR作业操作流程请参见Flink JAR作业快速入门

  2. 修改Flink作业配置并启动作业。

    1. 运维中心 > 作业运维页面,单击目标作业名称。

    2. 部署详情页签资源配置区域,配置作业并发度。

      本示例参考自建Flink集群作业运行命令,设置作业并发度为2。您也可以根据需要配置其他参数。

    3. 单击保存

    4. 单击作业详情页面右上方的启动,选择无状态启动后单击启动。详情请参见作业启动

  3. 作业状态变为运行中后,可在RDS控制台上,查询对应的计算结果。

    image

    说明

    如果上游Kafka有持续的流数据,则5分钟后即可到RDS控制台上查询到对应的计算结果。