在进行自建Flink作业迁移之前,需首先搭建基础环境。本文将为您介绍基础环境的搭建方法。
背景信息
CADT是一个云资源创建工具,借助该工具,您可以快速开通各种云资源,搭建好迁移环境。当然,您也可以在各云产品控制台开通云资源,和在CADT上效果一样。
本文将按以下步骤为您介绍如何搭建基础环境并测试环境:
步骤一:使用云速搭CADT创建基础环境
登录CADT控制台,创建应用。
登录CADT控制台。
在顶部菜单栏,选择
。在搜索框中,输入Flink迁移后,按回车键。
鼠标悬浮在开源Flink迁移实时计算Flink全托管版上,单击基于方案新建。
(可选)修改部署地域。
鼠标左键双击外边框。
在右侧面板上选择区域。
修改区域后单击确认修改。
修改Flink集群密码。
鼠标左键双击My-FLINK集群图标。
在右侧面板上,点击清除后修改密码。
修改Kafka实例配置。
鼠标左键双击kafka实例图标。
修改部署实例-版本,选用2.6.2最新版本。
修改创建Topic。
本文修改为kafka-order。
修改云数据库RDS配置。
鼠标左键双击flink-rds实例图标。
修改创建账号。
填写数据库账号、账号类型、密码信息。
修改创建数据库。
填写数据库名称,选择授权账号和数据库授权模式。
部署应用。
在页面右上角,单击保存。输入应用名称后点击确认。
单击切换到编辑模式。
确认后模板会变成到浏览模式,此时需要切回编辑模式才能部署应用。
单击部署应用。
单击下一步:价格清单。
单击下一步:确认订单。
选中云速搭服务条款,单击下一步:支付并创建。
创建成功后应用如下图。
步骤二:创建Flink SQL作业并上线后测试数据生产
这里使用Flink往Kafka实时写入测试数据,首先创建Flink作业。说明:源表为Kafka中的订单表,topic名为kafka-order,存储格式为 CSV,Schema 如下。
登录实时计算控制台。
鼠标左键双击flink-on-cloud图标。
单击前往控制台。
单击目标工作空间操作列下的控制台。
创建Flink SQL流作业。
在左侧导航栏,单击
。单击
后,单击新建流作业,填写文件名称并选择引擎版本,单击创建。
在SQL作业开发页面,编写DDL和DML代码。
代码示例如下。
CREATE TEMPORARY TABLE data_in ( id VARCHAR, order_value FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.id.length' = '10', 'fields.order_value.min' = '1.0', 'fields.order_value.max' = '100.0' ); CREATE TEMPORARY TABLE kafka_order ( order_id VARCHAR, order_time TIMESTAMP, order_type VARCHAR, order_value FLOAT ) WITH ( 'connector' = 'kafka', 'topic' = 'kafka-order', 'properties.bootstrap.servers' = 'alikafka-****.aliyuncs.com:9092,alikafka-****.aliyuncs.com:9092', 'format' = 'csv' ); INSERT INTO kafka_order SELECT id as order_id, CURRENT_TIMESTAMP as order_time, CASE WHEN (substring (id, 1, 1) <= 'z' AND substring (id, 1, 1) >= 'a') and (substring (id, 2, 1) <= 'z' AND substring (id, 2, 1) >= 'a') THEN 'typeA' WHEN (substring (id, 1, 1) <= 'z' AND substring (id, 1, 1) >= 'a') and (substring (id, 2, 1) <= '9' AND substring (id, 2, 1) >= '0') THEN 'typeB' WHEN (substring (id, 1, 1) <= '9' AND substring (id, 1, 1) >= '0') and (substring (id, 2, 1) <= '9' AND substring (id, 2, 1) >= '0') THEN 'typeC' ELSE 'typeD' END as order_type, order_value FROM data_in;
修改SQL中
properties.bootstrap.servers
取值,改为Kafka实例的接入点信息。在云速搭CADT模板界面中,鼠标左键双击kafka实例图标。
单击前往控制台,查看接入点信息。
在SQL编辑区域右上方,单击部署,在部署新版本对话框,单击确定。
在
页面,单击目标作业操作列中的启动。在作业启动对话框中,默认选择无状态启动,单击启动。
步骤三:在Kafka控制台查看测试数据
登录Kafka控制台。
在云速搭CADT模板界面中,鼠标左键双击kafka实例图标。
单击前往控制台。
在Kafka Topic中,查看Flink全托管写入的消息。
在左侧导航栏,单击Topic管理。
单击kafka_order。
在消息查询页签,查询消息。
说明等Flink作业启动运行后,在Kafka Topic中就可以看到测试数据已生成。