实现淘宝母婴订单实时查询和实时大屏

本文将为您介绍如何通过实时计算Flink版实时处理MySQL中的订单和婴儿信息,构建宽表,并将结果写入Elasticsearch。最终通过Kibana实现分组聚合与大屏展示,从而揭示订单数量与婴儿出生的潜在关系。

背景信息

随着“全面二孩”政策的实施和居民可支配收入的稳步增长,中国的母婴消费市场正迎来黄金发展期。与此同时,国民消费升级以及90后宝爸宝妈群体的崛起,推动了母婴消费需求与消费理念的深刻变革。据罗兰贝格最新报告显示,经过16年发展的母婴行业预计到2020年整体规模将达到3.6万亿元,2016-2020年的复合增长率高达17%,行业发展前景十分广阔。在此背景下,母婴人群在消费行为上呈现出哪些特点?哪些消费项目占据支出的主导地位?

在本场景中,订单信息和婴儿信息均存储在MySQL数据库中。为了便于分析,我们将订单表与婴儿信息进行关联,构建一张包含详细信息的宽表,并通过实时计算工具Flink将数据实时写入Elasticsearch。随后,通过Kibana实现分组聚合与大屏可视化动态展示,从而揭示订单数量与婴儿出生的潜在关系。

前提条件

步骤一:创建RDS MySQL表并导入数据

在这个例子中,我们将创建三张数据表,其中一张orders_dataset_tmp是导入数据的临时表,其他两张作为源表,体验淘宝母婴订单实时查询。

  1. 访问RDS实例列表,在上方选择地域,然后单击目标实例ID。

  2. 单击上方的登录数据库,在弹出的DMS页面中,填写数据库账号名和密码,然后单击登录

  3. 单击左侧的数据库实例,在已登录实例列表下,双击目标数据库名称。

  4. 在右侧SQL Console命令区输入以下建表语句,单击执行

    create table orders_dataset_tmp(
        user_id bigint comment '用户身份信息',            
        auction_id bigint comment '购买行为编号',        
        cat_id bigint comment '商品种类序列号',            
        cat1 bigint comment '商品序列号(根类别)',                
        property TEXT comment '商品属性',            
        buy_mount int comment '购买数量',            
        day TEXT comment '购买时间'                
    );
    
    create table orders_dataset(
        order_id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '订单id',
        user_id bigint comment '用户身份信息',            
        auction_id bigint comment '购买行为编号',        
        cat_id bigint comment '商品种类序列号',            
        cat1 bigint comment '商品序列号(根类别)',                
        property TEXT comment '商品属性',            
        buy_mount int comment '购买数量',            
        day TEXT comment '购买时间'                
    );
    
    
    create table baby_dataset(
        user_id bigint NOT NULL PRIMARY KEY,    
        birthday text comment '婴儿生日',
        gender int comment '0 denotes female, 1 denotes male, 2 denotes unknown'
    );
  5. 导入数据。

    分别将电商婴儿用户导入orders_dataset_tmp表,将婴儿信息导入baby_dataset表。

    1. 在顶部菜单栏单击数据导入

    2. 填写导入配置信息。

      配置项

      说明

      数据库

      模糊搜索数据库名后单击目标MySQL实例。

      文件编码

      自动识别。

      导入模式

      极速模式。

      文件类型

      CSV格式。

      目标表

      orders_dataset_tmpbaby_dataset。

      数据位置

      1行为属性。

      写入方式

      INSERT。

      附件

      单击上传文件,导入到表的对应文件。

    3. 单击提交申请,并在第4步需单击执行变更,在弹出的任务设置窗口中选择立即执行后,单击确定执行

  6. 导入完成后,执行以下SQL语句将订单数据导入到订单源表orders_dataset中。

    insert into orders_dataset(user_id,auction_id,cat_id,cat1,property,buy_mount,day)
    select * from orders_dataset_tmp;

步骤二:配置Elasticsearch自动创建索引

  1. 登录阿里云Elasticsearch控制台在顶部菜单栏处,选择资源组和地域。

  2. Elasticsearch实例中单击目标实例ID。

  3. 基本信息页面,单击配置与管理 > ES集群配置

    image

  4. 单击右侧的修改配置,选择允许自动创建索引,单击确定

    image

    重要

    该操作会重启实例,请确认后操作。

步骤三:创建Flink SQL流作业

  1. 登录实时计算管理控制台,单击目标工作空间操作列下的控制台

  2. 在左侧导航栏,单击数据开发 > ETL

  3. 单击image后,单击新建流作业,填写文件名称并选择引擎版本,单击创建

    image

    作业参数

    说明

    示例

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    flink-test

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍

    vvr-8.0.11-flink-1.17

  4. 编辑Flink SQL流作业代码。

    拷贝如下SQLSQL编辑区域并替换相关参数取值为您实际业务取值。

    本代码定义了两个MySQL 表(orders_dataset 和 baby_dataset)作为数据源,分别存储订单和用户信息,并通过两个Elasticsearch目标表(es_sink1 和 es_sink2)将数据写入同一索引(enriched_orders_view)。通过配置 sink.delete-strategy 为 NON_PK_FIELD_TO_NULL,利用Elasticsearch的部分更新能力,在主键相同时仅更新非主键字段,确保数据一致性。

    CREATE TEMPORARY TABLE orders_dataset (
      `order_id` BIGINT,
      `user_id` bigint,            
      `auction_id` bigint,        
      `cat_id` bigint,            
      `cat1` bigint,                
      `property` varchar,            
      `buy_mount` int,            
      `day` varchar    ,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'flinkrds***',
      'password' = 'Flink***@1',
      'database-name' = 'ecommerce',
      'table-name' = 'orders_dataset'
    );
    
    CREATE TEMPORARY TABLE baby_dataset (
      `user_id` bigint,
      `birthday` varchar,
      `gender` int,
      PRIMARY KEY(user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'rm-2zew*******.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'flinkrds***',
      'password' = 'Flink***@1',
      'database-name' = 'ecommerce',
      'table-name' = 'baby_dataset'
    );
    
    
    CREATE TEMPORARY TABLE es_sink1(
      `order_id` BIGINT,
      `user_id` BIGINT,
      `buy_mount` INT,
      `day` VARCHAR,
      PRIMARY KEY(`user_id`) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-8',
      'hosts' = 'http://192.xx.xx.252:9200',
      'index' = 'enriched_orders_view',
      'username' ='elastic',
      'password' ='Flink***@1',
      'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL'
    );
    
    
    CREATE TEMPORARY TABLE es_sink2(
      `user_id` BIGINT,
      `birthday` VARCHAR,
      `gender` INT,
      PRIMARY KEY(`user_id`) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-8',
      'hosts' = 'http://192.xx.xx.252:9200',
      'index' = 'enriched_orders_view',
      'username' ='elastic',
      'password' ='Flink***@1',
      'sink.delete-strategy' = 'NON_PK_FIELD_TO_NULL'
    );
    
    BEGIN STATEMENT SET;   
    INSERT INTO es_sink1
    SELECT 
        `order_id`,
        `user_id`,
        `buy_mount`,
        `day`
    FROM orders_dataset;
    
    
    INSERT INTO es_sink2
    SELECT 
        `user_id`,
        `birthday`,
        `gender`
    FROM baby_dataset;
    END;     

    存储类型

    参数

    是否必填

    说明

    MySQL

    connector

    表类型。固定值为mysql

    hostname

    MySQL数据库的IP地址或者Hostname。建议填写专有网络VPC地址。

    port

    MySQL数据库服务的端口号。

    username

    MySQL数据库服务的用户名。

    password

    MySQL数据库服务的密码。

    database-name

    MySQL数据库名称。

    table-name

    MySQL表名。

    Elasticsearch

    connector

    结果表类型。

    hosts

    Elasticsearch服务地址。

    格式为http://host_name:port

    index

    索引名称。

    本示例值为enriched_orders_view。

  5. 单击部署

  6. 作业运维界面,选择无状态启动后,单击启动

步骤四:在Elasticsearch控制台查看数据结果

Elasticsearchenriched_orders_view索引创建成功后,通过以下步骤您就可以看到写入的数据了。

1、准备工作

  1. 配置Kibana公网访问白名单

  2. 重启Elasticsearch实例。

  3. 配置与管理 > 可视化控制页面,在Kibana区域单击公网入口,并填写账号和密码。

    Kibana控制台的用户名默认为elastic,密码为您创建阿里云Elasticsearch实例时设置的密码。

    image

  4. 对写入数据字段类型进行处理。

    为了后续能够顺利使用直方图,需要将字段 day 的数据类型从文本(text)转换为日期(date)类型。您可以在Management > 开发工具中执行如下命令。

    1. 创建新索引(如 enriched_orders_view_new)并定义映射。

      注意将day字段的类型设置为 date,并保留其他字段的映射结构。

      PUT enriched_orders_view_new
      {
        "mappings": {
          "properties": {
            "birthday": {
              "type": "text",
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              },
              "fielddata": true
            },
            "buy_mount": {
              "type": "long"
            },
            "day": {
              "type": "date",
              "format": "yyyy-MM-dd" // 指定日期格式,确保与原始数据一致
            },
            "gender": {
              "type": "long"
            },
            "order_id": {
              "type": "long"
            },
            "user_id": {
              "type": "long"
            }
          }
        }
      }
      
    2. 使用 _reindex API 将原始索引中的数据复制到新索引中,并在过程中转换 day 字段的值为日期格式。

      POST _reindex
      {
        "source": {
          "index": "enriched_orders_view"
        },
        "dest": {
          "index": "enriched_orders_view_new"
        },
        "script": {
          "source": """
            if (ctx._source['day'] != null) {
              // 将 'yyyyMMdd' 格式的日期转换为 'yyyy-MM-dd'
              def originalDate = ctx._source['day'];
              if (originalDate.length() == 8) {
                ctx._source['day'] = originalDate.substring(0, 4) + '-' + originalDate.substring(4, 6) + '-' + originalDate.substring(6, 8);
              } else {
                ctx.op = 'noop'; // 如果格式不正确,跳过该文档
              }
            }
          """
        }
      }
      
      
    3. 验证新索引day字段已转化为正确的数据格式(如 yyyy-MM-dd)。

      GET enriched_orders_view_new/_search
      {
        "size": 10
      }
  5. 创建数据视图。

    1. 在左侧导航栏,单击Discover。

      image

    2. 单击创建数据视图,填写名称,索引模式中输入enriched_orders_view_new,时间戳字段选择为day,单击保存数据视图到Kibana

      image

2、查看数据写入情况

  1. 在页面左上角,单击Analytics > Discover

  2. 数据视图切换为刚才新建的数据视图。

  3. 单击搜索整个时间范围

    image

  4. 查看数据写入情况。

    image

3、配置可视化图表

  1. 单击day字段,单击Visualize

    image

  2. 在页面右侧,设置垂直条形图的水平轴和垂直轴。

    设置完其中一个后,单击关闭,再设置另外一个。

    配置项

    配置说明

    图示

    水平轴

    • 函数选择为Date Histogram

    • 字段选择为day

    • 名称填写为year_month

    image

    垂直轴

    • 函数选择为求和

    • 字段选择为buy_mount

    • 名称填写为buy_num

    • 轴侧填写为左

    image

  3. 在页面右侧,设置折线图的水平轴和垂直轴。

    右下角单击添加图层,可视化选择为折线图,然后配置水平轴和垂直轴。设置完其中一个后,单击关闭,再设置另外一个。

    配置项

    配置说明

    图示

    水平轴

    • 函数选择为Date Histogram

    • 字段选择为day

    • 名称填写为year_month

    image

    垂直轴

    • 函数选择为计数

    • 字段选择为birthday

    • 名称填写为baby_num

    • 轴侧填写为右

    image

4、保存与查看可视化结果

单击页面右上角的保存,即可保存此折线图与柱状图的复合图表。

image

相关文档

  • Elasticsearch连接器的语法结构、WITH参数及使用示例详情,请参见Elasticsearch

  • RDS MySQL连接器的语法结构、WITH参数及使用示例详情,请参见云数据库RDS MySQL