搭建基础环境

更新时间:
复制为 MD 格式

在进行自建Flink作业迁移之前,需首先搭建基础环境。本文将为您介绍基础环境的搭建方法。

背景信息

CADT是一个云资源创建工具,借助该工具,您可以快速开通各种云资源,搭建好迁移环境。当然,您也可以在各云产品控制台开通云资源,和在CADT上效果一样。

本文将按以下步骤为您介绍如何搭建基础环境并测试环境:

步骤一:使用云速搭CADT创建基础环境

步骤二:创建Flink SQL作业并上线后测试数据生产

步骤三:在Kafka控制台查看测试数据

步骤一:使用云速搭CADT创建基础环境

  1. 登录CADT控制台,创建应用。

    1. 登录CADT控制台

    2. 在顶部菜单栏,选择新建 > 官方解决方案 > 模板

    3. 在搜索框中,输入Flink迁移后,按回车键。

    4. 鼠标悬浮在开源Flink迁移实时计算Flink全托管版上,单击基于方案新建

  2. (可选)修改部署地域。

    1. 鼠标左键双击外边框。

    2. 在右侧面板上选择区域。

      面板标题为Region单可用区 详情,在区域下拉列表中选择目标地域,例如华东2(上海)

    3. 修改区域后单击确认修改。此时弹出确认对话框,提示修改后需重新配置该区域下虚拟交换机的可用区。

  3. 修改Flink集群密码。

    1. 鼠标左键双击My-FLINK集群图标。

    2. 在右侧面板上,点击清除后修改密码。

      E-MapReduce on ECS 详情面板中,将登录方式设置为设置密码,然后单击登录密码确认登录密码右侧的清除按钮,重新输入新密码。

  4. 修改Kafka实例配置。

    1. 鼠标左键双击kafka实例图标。

    2. 修改部署实例-版本,选用2.6.2最新版本。

      消息队列 Kafka 版 详情面板中,找到部署实例-版本下拉列表并选择2.6.2

    3. 修改创建Topic。

      本文修改为kafka-order。

      同时将Topic描述也设置为kafka-order分区数设置为12存储引擎消息类型保持默认值。

  5. 修改云数据库RDS配置。

    1. 鼠标左键双击flink-rds实例图标。

    2. 修改创建账号。

      填写数据库账号、账号类型、密码信息。

      例如将数据库账号设置为flink_rds账号类型选择普通账号,然后设置密码。

    3. 修改创建数据库。

      填写数据库名称,选择授权账号和数据库授权模式。

      例如将数据库名称设置为test_db支持字符集选择utf8授权账号选择flink_rds数据库授权设置为读写

  6. 部署应用。

    1. 在页面右上角,单击保存。输入应用名称后点击确认

    2. 单击切换到编辑模式

      确认后模板会变成到浏览模式,此时需要切回编辑模式才能部署应用。

    3. 单击部署应用。

      页面顶部显示资源验证资源询价资源部署三个步骤的部署进度。

    4. 单击下一步:价格清单

    5. 单击下一步:确认订单

    6. 选中云速搭服务条款,单击下一步:支付并创建

      确认订单页面列出待创建的资源清单,包括My-FLINK(E-MapReduce on ECS)、kafka(消息队列 Kafka 版)、flink-rds(云数据库RDS)和oss-flink-cadt(对象存储 OSS),确认费用信息后进行支付。

      创建成功后,页面左上角显示部署成功标识,CADT架构图中各组件(kafka、My-FLINK、flink-rds、flink-on-cloud、oss-flink-namech等)均显示绿色对勾,表示所有云资源已成功创建并部署。

步骤二:创建Flink SQL作业并上线后测试数据生产

这里使用FlinkKafka实时写入测试数据,首先创建Flink作业。说明:源表为Kafka中的订单表,topic名为kafka-order,存储格式为 CSV,Schema 如下。Schema

  1. 登录实时计算控制台。

    1. 鼠标左键双击flink-on-cloud图标。

      CADT架构图中,flink-on-cloud组件位于底部区域,为实时计算Flink全托管实例。

    2. 单击前往控制台

    3. 单击目标工作空间操作列下的控制台

      实时计算控制台Flink 全托管页签中,可以看到名为flink-on-cloud的工作空间,其工作空间状态运行中处理器架构X86付费类型按量付费

  2. 创建Flink SQL流作业。

    1. 在左侧导航栏,单击数据开发 > ETL

    2. 单击image后,单击新建流作业,填写文件名称并选择引擎版本,单击创建

    3. SQL作业开发页面,编写DDLDML代码。

      代码示例如下。

      CREATE TEMPORARY TABLE data_in (
        id VARCHAR, 
        order_value FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '100',
        'fields.id.length' = '10',
        'fields.order_value.min' = '1.0',
        'fields.order_value.max' = '100.0'
      );
      
      CREATE TEMPORARY TABLE kafka_order (
        order_id VARCHAR,
        order_time TIMESTAMP,
        order_type VARCHAR,
        order_value FLOAT
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'kafka-order',
        'properties.bootstrap.servers' = 'alikafka-****.aliyuncs.com:9092,alikafka-****.aliyuncs.com:9092',
        'format' = 'csv'
      );
      
      INSERT INTO kafka_order
      SELECT id as order_id,
        CURRENT_TIMESTAMP as order_time,
        CASE
          WHEN (substring (id, 1, 1) <= 'z' AND substring (id, 1, 1) >= 'a') and (substring (id, 2, 1) <= 'z' AND substring (id, 2, 1) >= 'a') THEN 'typeA'
          WHEN (substring (id, 1, 1) <= 'z' AND substring (id, 1, 1) >= 'a') and (substring (id, 2, 1) <= '9' AND substring (id, 2, 1) >= '0') THEN 'typeB'
          WHEN (substring (id, 1, 1) <= '9' AND substring (id, 1, 1) >= '0') and (substring (id, 2, 1) <= '9' AND substring (id, 2, 1) >= '0') THEN 'typeC'
          ELSE 'typeD'
        END as order_type,
        order_value
      FROM
        data_in;
  3. 修改SQLproperties.bootstrap.servers取值,改为Kafka实例的接入点信息。

    1. 在云速搭CADT模板界面中,鼠标左键双击kafka实例图标。

    2. 单击前往控制台,查看接入点信息

      Kafka实例的实例信息页面,找到接入点信息区域。默认接入点的网络VPC协议PLAINTEXT,复制域名接入点地址用于配置SQL中的properties.bootstrap.servers参数。

  4. SQL编辑区域右上方,单击部署,在部署新版本对话框,单击确定

  5. 运维中心 > 作业运维页面,单击目标作业操作列中的启动

  6. 作业启动对话框中,默认选择无状态启动,单击启动

步骤三:在Kafka控制台查看测试数据

  1. 登录Kafka控制台。

    1. 在云速搭CADT模板界面中,鼠标左键双击kafka实例图标。

    2. 单击前往控制台

      选中kafka实例后,CADT页面底部显示该实例的资源信息。

  2. Kafka Topic中,查看Flink全托管写入的消息。

    1. 在左侧导航栏,单击Topic管理

    2. 单击kafka_order

    3. 消息查询页签,查询消息。

      选择按时间点查询方式,分区选择全部分区,设置查询时间点后单击查询按钮。查询结果表格中显示分区位点KeyValue消息创建时间等字段,Value列中的CSV格式数据即为Flink写入的测试消息。

      说明

      Flink作业启动运行后,在Kafka Topic中就可以看到测试数据已生成。