本文将为您介绍如何通过实时计算Flink版实时处理MySQL中的订单和婴儿信息,构建宽表,并将结果写入Elasticsearch。最终通过Kibana实现分组聚合与大屏展示,从而揭示订单数量与婴儿出生的潜在关系。
背景信息
随着“全面二孩”政策的实施和居民可支配收入的稳步增长,中国的母婴消费市场正迎来黄金发展期。与此同时,国民消费升级以及90后宝爸宝妈群体的崛起,推动了母婴消费需求与消费理念的深刻变革。据罗兰贝格最新报告显示,经过16年发展的母婴行业预计到2020年整体规模将达到3.6万亿元,2016-2020年的复合增长率高达17%,行业发展前景十分广阔。在此背景下,母婴人群在消费行为上呈现出哪些特点?哪些消费项目占据支出的主导地位?
在本场景中,订单信息和婴儿信息均存储在MySQL数据库中。为了便于分析,我们将订单表与婴儿信息进行关联,构建一张包含详细信息的宽表,并通过实时计算工具Flink将数据实时写入Elasticsearch。随后,通过Kibana实现分组聚合与大屏可视化动态展示,从而揭示订单数量与婴儿出生的潜在关系。
前提条件
已创建Flink工作空间。具体操作,请参见开通实时计算Flink版。
已创建云数据库RDS MySQL版实例、数据库和账号。具体操作,请参见快速创建RDS MySQL实例和创建数据库和账号。
已创建Elasticsearch实例,本文以8.17.0版本为例。具体操作,请参见创建阿里云Elasticsearch实例。
重要Flink工作空间、RDS MySQL实例和Elasticsearch实例需要同一个VPC下,且RDS MySQL和ES已设置IP白名单。具体请参见RDS MySQL白名单配置和ES白名单配置。
步骤一:创建RDS MySQL表并导入数据
在这个例子中,我们将创建三张数据表,其中一张orders_dataset_tmp是导入数据的临时表,其他两张作为源表,体验淘宝母婴订单实时查询。
访问RDS实例列表,在上方选择地域,然后单击目标实例ID。
单击上方的登录数据库,在弹出的DMS页面中,填写数据库账号名和密码,然后单击登录。
单击左侧的数据库实例,在已登录实例列表下,双击目标数据库名称。
在右侧SQL Console命令区输入以下建表语句,单击执行。
create table orders_dataset_tmp( user_id bigint comment '用户身份信息', auction_id bigint comment '购买行为编号', cat_id bigint comment '商品种类序列号', cat1 bigint comment '商品序列号(根类别)', property TEXT comment '商品属性', buy_mount int comment '购买数量', day TEXT comment '购买时间' ); create table orders_dataset( order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '订单id', user_id bigint comment '用户身份信息', auction_id bigint comment '购买行为编号', cat_id bigint comment '商品种类序列号', cat1 bigint comment '商品序列号(根类别)', property TEXT comment '商品属性', buy_mount int comment '购买数量', day TEXT comment '购买时间' ); create table baby_dataset( user_id bigint NOT NULL PRIMARY KEY, birthday text comment '婴儿生日', gender int comment '0 denotes female, 1 denotes male, 2 denotes unknown' );
导入数据。
分别将电商婴儿用户导入orders_dataset_tmp表,将婴儿信息导入baby_dataset表。
在顶部菜单栏单击数据导入。
填写导入配置信息。
配置项
说明
数据库
模糊搜索数据库名后单击目标MySQL实例。
文件编码
自动识别。
导入模式
极速模式。
文件类型
CSV格式。
目标表
orders_dataset_tmp或baby_dataset。
数据位置
第1行为属性。
写入方式
INSERT。
附件
单击上传文件,导入到表的对应文件。
单击提交申请,并在第4步需单击执行变更,在弹出的任务设置窗口中选择立即执行后,单击确定执行。
导入完成后,执行以下SQL语句将订单数据导入到订单源表orders_dataset中。
insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day) select * from orders_dataset_tmp;
步骤二:配置Elasticsearch自动创建索引
登录阿里云Elasticsearch控制台,在顶部菜单栏处,选择资源组和地域。
在Elasticsearch实例中单击目标实例ID。
在基本信息页面,单击
。单击右侧的修改配置,选择允许自动创建索引,单击确定。
重要该操作会重启实例,请确认后操作。
步骤三:创建Flink SQL流作业
登录实时计算管理控制台,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击
。单击
后,单击新建流作业,填写文件名称并选择引擎版本,单击创建。
作业参数
说明
示例
文件名称
作业的名称。
说明作业名称在当前项目中必须保持唯一。
flink-test
引擎版本
当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。
vvr-8.0.11-flink-1.17
编辑Flink SQL流作业代码。
拷贝如下SQL到SQL编辑区域并替换相关参数取值为您实际业务取值。
本代码定义了两个MySQL 表(
orders_dataset
和baby_dataset
)作为数据源,分别存储订单和用户信息,并通过两个Elasticsearch目标表(es_sink1
和es_sink2
)将数据写入同一索引(enriched_orders_view
)。通过配置sink.delete-strategy
为NON_PK_FIELD_TO_NULL
,利用Elasticsearch的部分更新能力,在主键相同时仅更新非主键字段,确保数据一致性。CREATE TEMPORARY TABLE orders_dataset ( `order_id` BIGINT, `user_id` bigint, `auction_id` bigint, `cat_id` bigint, `cat1` bigint, `property` varchar, `buy_mount` int, `day` varchar , PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'flinkrds***', 'password' = 'Flink***@1', 'database-name' = 'ecommerce', 'table-name' = 'orders_dataset' ); CREATE TEMPORARY TABLE baby_dataset ( `user_id` bigint, `birthday` varchar, `gender` int, PRIMARY KEY(user_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'flinkrds***', 'password' = 'Flink***@1', 'database-name' = 'ecommerce', 'table-name' = 'baby_dataset' ); CREATE TEMPORARY TABLE es_sink1( `order_id` BIGINT, `user_id` BIGINT, `buy_mount` INT, `day` VARCHAR, PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-8', 'hosts' = 'http://192.xx.xx.252:9200', 'index' = 'enriched_orders_view', 'username' ='elastic', 'password' ='Flink***@1', 'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL' ); CREATE TEMPORARY TABLE es_sink2( `user_id` BIGINT, `birthday` VARCHAR, `gender` INT, PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-8', 'hosts' = 'http://192.xx.xx.252:9200', 'index' = 'enriched_orders_view', 'username' ='elastic', 'password' ='Flink***@1', 'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL' ); BEGIN STATEMENT SET; INSERT INTO es_sink1 SELECT `order_id`, `user_id`, `buy_mount`, `day` FROM orders_dataset; INSERT INTO es_sink2 SELECT `user_id`, `birthday`, `gender` FROM baby_dataset; END;
存储类型
参数
是否必填
说明
MySQL
connector
是
表类型。固定值为
mysql
。hostname
是
MySQL数据库的IP地址或者Hostname。建议填写专有网络VPC地址。
port
否
MySQL数据库服务的端口号。
username
是
MySQL数据库服务的用户名。
password
是
MySQL数据库服务的密码。
database-name
是
MySQL数据库名称。
table-name
是
MySQL表名。
Elasticsearch
connector
是
结果表类型。
hosts
是
Elasticsearch服务地址。
格式为
http://host_name:port
。index
是
索引名称。
本示例值为enriched_orders_view。
单击部署。
在作业运维界面,选择无状态启动后,单击启动。
步骤四:在Elasticsearch控制台查看数据结果
Elasticsearch的enriched_orders_view
索引创建成功后,通过以下步骤您就可以看到写入的数据了。
1、准备工作
重启Elasticsearch实例。
在
页面,在Kibana区域单击公网入口,并填写账号和密码。Kibana控制台的用户名默认为elastic,密码为您创建阿里云Elasticsearch实例时设置的密码。
对写入数据字段类型进行处理。
为了后续能够顺利使用直方图,需要将字段
day
的数据类型从文本(text)转换为日期(date)类型。您可以在 中执行如下命令。创建新索引(如
enriched_orders_view_new
)并定义映射。注意将
day
字段的类型设置为date
,并保留其他字段的映射结构。PUT enriched_orders_view_new { "mappings": { "properties": { "birthday": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } }, "fielddata": true }, "buy_mount": { "type": "long" }, "day": { "type": "date", "format": "yyyy-MM-dd" // 指定日期格式,确保与原始数据一致 }, "gender": { "type": "long" }, "order_id": { "type": "long" }, "user_id": { "type": "long" } } } }
使用
_reindex
API 将原始索引中的数据复制到新索引中,并在过程中转换day
字段的值为日期格式。POST _reindex { "source": { "index": "enriched_orders_view" }, "dest": { "index": "enriched_orders_view_new" }, "script": { "source": """ if (ctx._source['day'] != null) { // 将 'yyyyMMdd' 格式的日期转换为 'yyyy-MM-dd' def originalDate = ctx._source['day']; if (originalDate.length() == 8) { ctx._source['day'] = originalDate.substring(0, 4) + '-' + originalDate.substring(4, 6) + '-' + originalDate.substring(6, 8); } else { ctx.op = 'noop'; // 如果格式不正确,跳过该文档 } } """ } }
验证新索引day字段已转化为正确的数据格式(如
yyyy-MM-dd
)。GET enriched_orders_view_new/_search { "size": 10 }
创建数据视图。
在左侧导航栏,单击Discover。
单击创建数据视图,填写名称,索引模式中输入
enriched_orders_view_new
,时间戳字段选择为day,单击保存数据视图到Kibana。
2、查看数据写入情况
在页面左上角,单击
。数据视图切换为刚才新建的数据视图。
单击搜索整个时间范围。
查看数据写入情况。
3、配置可视化图表
单击day字段,单击Visualize。
在页面右侧,设置垂直条形图的水平轴和垂直轴。
设置完其中一个后,单击关闭,再设置另外一个。
配置项
配置说明
图示
水平轴
函数选择为Date Histogram
字段选择为day
名称填写为year_month
垂直轴
函数选择为求和
字段选择为buy_mount
名称填写为buy_num
轴侧填写为左
在页面右侧,设置折线图的水平轴和垂直轴。
右下角单击添加图层,可视化选择为折线图,然后配置水平轴和垂直轴。设置完其中一个后,单击关闭,再设置另外一个。
配置项
配置说明
图示
水平轴
函数选择为Date Histogram
字段选择为day
名称填写为year_month
垂直轴
函数选择为计数
字段选择为birthday
名称填写为baby_num
轴侧填写为右
4、保存与查看可视化结果
单击页面右上角的保存,即可保存此折线图与柱状图的复合图表。
相关文档
Elasticsearch连接器的语法结构、WITH参数及使用示例详情,请参见Elasticsearch。
RDS MySQL连接器的语法结构、WITH参数及使用示例详情,请参见云数据库RDS MySQL版。