在进行自建Flink作业迁移前,需要先搭建基础环境。基础环境用于演示作业迁移的过程,如果您已经有了自建Flink环境,则可以不用搭建基础环境。本文为您介绍如何搭建基础环境。
背景信息
CADT是一个云资源创建工具,借助该工具,您可以快速开通各种云资源,搭建好迁移环境。当然,您也可以在各云产品控制台开通云资源,和在CADT上效果一样。本文将按以下步骤为您介绍如何搭建基础环境并测试环境:
步骤一:使用云速搭CADT创建资源
- 登录CADT控制台,创建应用。
- 登录CADT控制台。
- 在顶部菜单栏,选择。
- 在搜索框中,输入Flink迁移后,按回车键。
- 鼠标悬浮在开源Flink迁移实时计算Flink全托管版上,单击基于应用新建。
- 修改EMR-Flink集群密码。
说明 本文使用EMR-Flink作为用户自建Flink集群来运行迁移前的作业,以此来对比验证自建Flink迁移后的作业运行结果。如果您已经有了自建Flink集群,可以忽略此步骤。
- 鼠标左键双击EMR-Flink集群图标。
- 在右侧面板上,修改EMR-Flink集群密码。
- 在页面右上角,单击保存。
- 输入应用名称。
- 单击确认。
- 修改Kafka实例版本值。
- 鼠标左键双击Kafka实例图标。
- 修改部署实例-版本信息。
- 单击保存。
- 部署应用。
- 在页面右上角,单击部署应用。
- 单击下一步:价格清单。
- 单击下一步:确认订单。
- 选中云速搭服务条款。
- 单击下一步:支付并创建。

创建成功后应用如下图。

步骤二:在RDS中,创建数据库账号和测试数据库
- 登录RDS控制台。
- 鼠标左键双击RDS实例图标。
- 单击返回。
- 单击资源名称。
- 创建数据库账号。
- 在左侧导航栏,单击账号管理。
- 单击创建账号。
- 填写账号和密码信息。
- 填写完成后,单击确定。
- 创建测试数据库test_db。
- 在左侧导航栏,单击数据库管理。
- 单击创建数据库。
- 填写数据库信息。
- 单击创建。
步骤三:在Kafka中,准备测试数据
这里将通过Flink全托管往Kafka实时写入数据的方式准备测试数据。
- 登录Kafka控制台。
- 鼠标左键双击Kafka实例图标。
- 单击返回。
- 单击资源名称。
- 创建一个名称为kafka-order的Kafka Topic。
- 在左侧导航栏,单击Topic管理。
- 单击创建Topic。
- 填写Topic信息。
- 单击确定。
- 将Flink全托管所在VPC的网段添加到Kafka白名单中。
- 在左侧导航栏,单击实例详情。
- 在接入点信息区域,单击编辑白名单。
- 将Flink全托管所在VPC的网段信息填入白名单。
- 单击确定。
步骤四:创建Flink SQL作业并上线后测试数据生产
这里使用Flink往Kafka实时写入测试数据,首先创建Flink作业。说明:源表为Kafka中的订单表,topic名为kafka-order,存储格式为 CSV,Schema
如下。

- 登录Flink全托管控制台。
- 鼠标左键双击Flink全托管图标。
- 单击返回。
- 单击资源名称。
- 单击控制台。
- 创建Flink SQL流作业。
- 在左侧导航栏,单击作业开发。
- 单击新建。
- 在新建文件对话框,填写作业配置信息。
- 在作业开发页面,编写DDL和DML代码。
代码示例如下。
CREATE TEMPORARY TABLE data_in (
id VARCHAR,
order_value FLOAT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100',
'fields.id.length' = '10',
'fields.order_value.min' = '1.0',
'fields.order_value.max' = '100.0'
);
CREATE TEMPORARY TABLE kafka_order (
order_id VARCHAR,
order_time TIMESTAMP,
order_type VARCHAR,
order_value FLOAT
) WITH (
'connector' = 'kafka',
'topic' = 'kafka-order',
'properties.bootstrap.servers' = '192.*.*.224:9092,192.*.*.225:9092,192.*.*.226:9092',
'format' = 'csv'
);
INSERT INTO kafka_order
SELECT id as order_id,
CURRENT_TIMESTAMP as order_time,
CASE
WHEN (substring (id, 1, 1) <= 'z' AND substring (id, 1, 1) >= 'a') and (substring (id, 2, 1) <= 'z' AND substring (id, 2, 1) >= 'a') THEN 'typeA'
WHEN (substring (id, 1, 1) <= 'z' AND substring (id, 1, 1) >= 'a') and (substring (id, 2, 1) <= '9' AND substring (id, 2, 1) >= '0') THEN 'typeB'
WHEN (substring (id, 1, 1) <= '9' AND substring (id, 1, 1) >= '0') and (substring (id, 2, 1) <= '9' AND substring (id, 2, 1) >= '0') THEN 'typeC'
ELSE 'typeD'
END as order_type,
order_value
FROM
data_in;
- 修改Kafka参数properties.bootstrap.servers的取值。
请填写为Kafka实例详情的接入点信息中查看到的Kafka接入点信息。
- 单击上线。
- 在作业运维页面,单击启动。
步骤五:在Kafka控制台查看测试数据
- 登录Kafka控制台。
- 鼠标左键双击Kafka实例图标。
- 单击返回。
- 单击资源名称。
- 在Kafka Topic中,查看Flink全托管写入的消息。
- 在左侧导航栏,单击Topic管理。
- 单击kafka_order。
- 在消息查询页签,查询消息。

说明 等Flink作业启动运行后,在Kafka Topic中就可以看到测试数据已生成。