首页 Flink+Hologres 搭建实时数仓

Flink+Hologres 搭建实时数仓

更新时间: 2025-01-09 09:47:29

技术解决方案部署

90

https://www.aliyun.com/solution/tech-solution/flink_hologres

方案概览

本方案将以某个电商平台数据为例,引导您使用 Flink+Hologres 搭建实时数仓,实现数据的实时加工清洗和对接上层应用数据查询,形成实时数据的分层和复用,以支撑各业务方的报表查询(交易大屏、行为数据分析、用户画像标签)以及个性化推荐等多个场景。

使用 Flink+Hologres 的优势:

  • 中间层可查:Hologres 的每一层数据都支持高效更新与修正、写入即可查,解决了传统实时数仓方案的中间层数据不易查、不易更新、不易修正的问题。

  • 数据可复用:Hologres 的每一层数据都可单独对外提供服务,使得数据可以高效复用,真正实现数仓分层复用的目标。

  • 模型统一,架构简化:​实时 ETL 链路的逻辑均基于 Flink SQL 实现;ODS 层、DWD 层和 DWS 层的数据统一存储在 Hologres 中,可以降低架构复杂度,提高数据处理效率。

本方案的体验预计产生费用不超过 35 元(假设运行时间不超过 1.5 小时)。如果您调整了资源规格、数据量、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。

方案架构

Flink+Hologres搭建实时数仓.jpg

本方案的实现原理如下:

  1. Flink 将数据源写入 Hologres,形成 ODS 层。

  2. Flink 订阅 ODS 层的 Binlog 进行加工,形成 DWD 层再次写入 Hologres。

  3. Flink 订阅 DWD 层的 Binlog,通过计算形成 DWS 层,再次写入 Hologres。

  4. 最后由 Hologres 对外提供应用查询。

准备资源

25

通过本部分您将创建 1 个专有网络 VPC、1 台交换机、1 个云数据库 RDS MySQL 实例、1 个 Hologres 实例、1 个 Flink 工作空间、1 个 OSS Bucket 存储空间。

请参考表格中的说明和方案默认示例值做详细规划,未提及的规划项请保持默认值。实际部署时可根据您的实际情况进行修改。

重要
  • RDS MySQL 实例、Flink 工作空间与 Hologres 实例需在同一 VPC 下。

  • 仅 1.3 及以上版本的独享 Hologres 实例支持该实时数仓方案。

1.创建专有网络 VPC 和交换机

  1. 登录专有网络管理控制台

  2. 在顶部菜单栏,选择华东 1(杭州)地域(本方案以杭州地域为例)。

  3. 在左侧导航栏,单击专有网络,单击创建专有网络

  4. 创建专有网络页面,按照资源配置创建 1 个专有网络(VPC)和 1 台交换机,然后单击确定。详情请参见创建专有网络和交换机

    资源配置

    规划项

    说明

    方案默认示例

    专有网络 VPC

    VPC 名称

    建议您在部署过程中新建一个 VPC 作为本方案的专有网络。部署过程中填写 VPC 名称即可创建对应名称的 VPC。

    长度为 2~128 个字符,以英文大小字母或中文开头,可包含数字、下划线(_)和连字符(-)。

    vpc_flink

    网段

    在创建 VPC 时,您必须按照无类域间路由块(CIDR block)的格式为您的专有网络划分私网网段。阿里云 VPC 支持的网段信息请参见专有网络组成部分

    在网络规划时可以按照管理网段-开发网段-测试网段-生产网段等规则做好规划。网段一旦投入使用,调整过程复杂,因此规划十分重要。

    192.168.0.0/16

    虚拟交换机

    台数

    本方案需要至少 1 台交换机,用来连接云资源实例。

    1

    名称

    建议您在部署过程中在新建的 VPC内创建虚拟交换机。部署过程中填写 vswitch 名称即可创建对应名称的虚拟交换机。

    长度为 2~128 个字符,以英文大小字母或中文开头,可包含数字、下划线(_)和连字符(-)。

    vsw_001

    可用区

    在规划的地域内选择1个可用区,用于部署虚拟交换机。

    可用区 J

    IPv4 网段

    每台虚拟交换机需要一个 IPv4 网段。

    192.168.5.0/16

2.创建 RDS MySQL 实例

  1. 访问RDS 实例创建页面

  2. 按照资源配置完成相应参数配置后,根据页面提示完成支付。

    资源配置

    规划项

    说明

    方案默认示例

    计费方式

    RDS 实例的计费方式。详情请参见第一步:快捷创建 RDS MySQL 实例与配置数据库

    按量付费

    地域

    您的云服务部署的地域。选择地域的基本原则请参见地域和可用区

    华东 1(杭州)

    引擎

    引擎及版本。

    MySQL8.0

    产品系列

    实例系列。

    高可用系列

    主节点可用区

    本方案要求 RDS 实例与虚拟交换机位于同一可用区。

    杭州 可用区 J

    部署方案

    单可用区指主备节点位于同一可用区。多可用区指主备节点位于不同可用区,实现跨可用区容灾。请您根据实际需求进行选择。

    单可用区部署

    实例规格

    可通过测试业务负载来确认需要的实例规格。

    mysql.n2.small.2c

    VPC

    选择上述已创建的 VPC。

    vpc_flink

    主节点交换机

    选择上述已创建的交换机。

    vsw_001

  3. 为目标实例创建数据库 order_dw 和具有对应数据库读写权限的普通账号。具体操作请参见创建数据库和账号

  4. 准备 MySQL CDC 数据源。

    1. 单击上方的登录数据库

    2. 登录实例对话框中,填写在步骤 3 中创建的数据库账号名和密码,然后单击登录

    3. 登录成功后,在左侧双击 order_dw 数据库,切换数据库。

    4. 在 SQL Console 区域编写三张业务表的建表 DDL 以及插入的数据语句。

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee numeric(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      
      CREATE TABLE `orders_pay` (
        pay_id bigint not null primary key,
        order_id bigint not null,
        pay_platform int not null,
        create_time timestamp not null
      );
      
      
      CREATE TABLE `product_catalog` (
        product_id bigint not null primary key,
        catalog_name varchar(50) not null
      );
      
      -- 准备数据
      INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
      
      INSERT INTO orders_pay VALUES
      (2001, 100001, 1, '2023-02-15 17:40:56'),
      (2002, 100002, 1, '2023-02-15 17:40:56'),
      (2003, 100003, 0, '2023-02-15 17:40:56'),
      (2004, 100004, 0, '2023-02-15 17:40:56'),
      (2005, 100005, 0, '2023-02-15 18:40:56'),
      (2006, 100006, 0, '2023-02-15 18:40:56'),
      (2007, 100007, 0, '2023-02-15 18:40:56');
    5. 单击执行,单击直接执行

3.创建独享 Hologres 实例

  1. 进入Hologres 产品详情页,单击立即购买

  2. 选择商品类型,按照资源配置完成相应参数配置后,根据页面提示完成开通。

    资源配置

    规划项

    说明

    方案默认示例

    商品类型

    Hologres 实例的计费方式。计费的详细信息请参见计费概述

    独享实例(按量付费)

    地域

    选择与上述服务相同的地域。

    华东 1(杭州)

    实例类型

    计算组实例是主从实例的升级模式,支持将计算资源分解为不同的计算组,可支撑读写分离、资源隔离、业务隔离等场景,详情请参见计算组实例快速入门

    计算组型

    可用区

    本方案要求 Hologres 实例与虚拟交换机位于同一可用区。

    可用区 J

    计算组预留计算资源

    需要确保预留资源大于等于实例所有计算组的总计算资源。计算组默认最小资源为 32 CU,本方案需要 2 个计算组。

    64 CU

    专有网络(VPC)

    选择上述已创建的 VPC。

    vpc_flink

    专有网络交换机

    选择上述已创建的交换机。

    vsw_001

    实例名称

    自定义填写。

    hologres_test

  3. 创建成功后,进入管理控制台,单击上面创建的实例名称,单击右上方的登录实例,进入 HoloWeb 页面。

  4. 创建名为 order_dw 的数据库,并为用户授权(赋予 admin 权限),使用简单权限模型创建数据库,详情请参见 DB 管理

    说明

    如果在被授权账号的下拉列表找不到对应的账号,则说明该账号并未添加至当前实例,您需要前往用户管理页面添加用户为 SuperUser。

  5. 新增计算组。

    您可以通过不同计算组实现资源隔离,使用初始计算组 init_warehouse 用于写入数据,使用read_warehouse_1计算组用于服务查询。

    预留计算资源会全部分配给初始计算组 init_warehouse,需先减少计算组资源,再新增计算组。详情请参见场景 1:创建全新的计算组实例

    1. 单击安全中心 > 计算组管理,确认实例名为目标实例名称。

    2. 单击已有资源组 init_warehouse 操作列下的调整配置,本示例减少资源至 32 CU,单击确认

    3. 单击新增计算组,新增名称为 read_warehouse_1,资源为 32 CU 的计算组,单击确认

4.创建 OSS Bucket

OSS Bucket 用于存储 Flink 作业系统检查点、作业快照、日志和 JAR 包等信息,创建时请参见注意事项

  1. 登录OSS管理控制台在左侧导航栏,单击 Bucket 列表 ,然后单击创建 Bucket

  2. 在创建页面,填写 Bucket 名称(本方案默认示例 flink-test-oss),选择您的云服务部署的地域(本方案默认示例华东 1(杭州)),单击完成创建后,单击我知道了,确认创建

5.开通 Flink 全托管

  1. 登录实时计算控制台。单击立即购买

  2. 在购买页面,按照资源配置完成相应参数配置后,根据页面提示完成开通。开通详情请参见开通实时计算 Flink 版

    资源配置

    规划项

    说明

    方案默认示例

    付费模式

    Flink 工作空间的计费方式。计费详情请参见产品计费

    按量付费

    地域

    选择与上述服务相同的地域。

    华东 1(杭州)

    可用区

    本方案要求 Flink 工作空间与虚拟交换机位于同一可用区。

    可用区 J

    专有网络

    选择上述已创建的 VPC。

    vpc_flink

    虚拟交换机

    选择上述已创建的交换机。

    vsw_001

    工作空间名称

    开通成功后,不可修改名称。

    flink-test

    OSS 存储

    用于存储作业系统检查点、作业快照、日志和 JAR 包等信息。需要与Flink 工作空间服务在同一地域,且Flink 开通成功后,不可修改 OSS 存储。详情请参见开通实时计算 Flink 版

    flink-test-oss

    监控服务

    支持选择免费监控服务按量付费的 Prometheus 监控服务。具体信息和计费说明请参见开通实时计算 Flink 版

    免费监控服务

搭建实时数仓

40

1.创建 Catalog

在 Flink 开发控制台的 SQL 开发页面,通过 SQL 方式分别创建 Hologres 和 MySQL 的 Catalog。

  1. 登录实时计算控制台,单击目标工作空间操作列下的控制台

  2. 在左侧导航栏,单击 Session 管理,单击创建 Session 集群,为后续创建 Catalog 和查询脚本提供执行环境,详情请参见Session 集群注意事项

    1. 填写名称,并将状态设置为 Running

    2. 选择引擎版本,本方案示例选择最新版本。

    3. 其他参数本方案示例保持默认,单击创建 Session 集群

  3. 在左侧导航栏,单击数据开发 > 数据查询 > 查询脚本,单击新建

    image

  4. 创建 Hologres Catalog。

    在文本编辑区域,输入右侧配置 Hologres Catalog 的命令,创建名称为 dw 的 Hologres Catalog。其中,您需要修改以下参数取值为您实际 Hologres 服务信息。关于 Hologres Catalog 的更多信息,详情请参见管理 Hologres Catalog

    参数

    说明

    备注

    endpoint

    Hologres的 Endpoint 地址。

    本示例使用指定 VPC 域名,详情请参见实例配置

    username

    阿里云账号的 AccessKey。

    当前配置的 AccessKey 对应的用户需要能够访问所有的 Hologres 数据库,Hologres 数据库权限请参见 Hologres 权限模型

    password

    阿里云账号的 AccessSecret。

    CREATE CATALOG dw WITH (
      'type' = 'hologres',
      'endpoint' = '<ENDPOINT>', 
      'username' = '<USERNAME>',
      'password' = '<PASSWORD>',
      'dbname' = 'order_dw@init_warehouse', --数据库名称,并指定连接init_warehouse计算组
      'binlog' = 'true', -- 创建catalog时可以设置源表、维表和结果表支持的with参数,之后在使用此catalog下的表时会默认添加这些默认参数。
      'sdkMode' = 'jdbc', -- 推荐使用jdbc模式。
      'cdcmode' = 'true',
      'connectionpoolname' = 'the_conn_pool',
      'ignoredelete' = 'true',  -- 宽表merge需要开启,防止回撤。
      'partial-insert.enabled' = 'true', -- 宽表merge需要开启此参数,实现部分列更新。
      'mutateType' = 'insertOrUpdate', -- 宽表merge需要开启此参数,实现部分列更新。
      'table_property.binlog.level' = 'replica', --也可以在创建catalog时传入持久化的hologres表属性,之后创建表时,默认都开启binlog。
      'table_property.binlog.ttl' = '259200'
    );
  5. 选中创建 Catalog 的代码后,单击左侧代码行数上的运行

    返回The following statement has been executed successfully!表示创建成功。

  6. 创建 MySQL Catalog。

    输入右侧的命令创建 MySQL Catalog,您需要修改表格中的参数取值为您实际的 MySQL 服务信息。选中创建 MySQL Catalog 的代码后,单击左侧代码行数上的运行。关于 MySQL Catalog 的更多信息,详情请参见管理 MySQL Catalog

    参数

    说明

    hostname

    MySQL 数据库的 IP 地址或者 Hostname。

    port

    MySQL 数据库服务的端口号,默认值为 3306。

    username

    MySQL 数据库服务的用户名。

    本示例为具有数据库 order_dw 读写权限的账号名。

    password

    MySQL 数据库服务的密码。

    CREATE CATALOG mysqlcatalog WITH(
      'type' = 'mysql',
      'hostname' = '<hostname>',
      'port' = '<port>',
      'username' = '<username>',
      'password' = '<password>',
      'default-database' = 'order_dw'
    );

2.构建 ODS 层:业务数据库实时入仓

基于 Catalog 的 CREATE DATABASE AS(CDAS)语句功能,可以一次性把 ODS 层建出来。ODS 层一般不直接做 OLAP 或 SERVING(KV 点查),主要作为流式作业的事件驱动,开启 binlog 即可满足需求。Binlog 是 Hologres 的核心能力之一,Hologres 连接器也支持先全量读取再增量消费 Binlog 的全增量模式。

  1. 创建 CDAS 同步作业。

    1. 实时计算控制台上,创建新的 SQL 流作业草稿,并将如下代码拷贝到 SQL 编辑器。

      CREATE DATABASE IF NOT EXISTS dw.order_dw   -- 创建catalog时设置了table_property.binlog.level参数,因此通过CDAS创建的所有表都开启了binlog。
      AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- 可以根据需要选择上游数据库需要入仓的表。本案例需选择前面创建了三张业务表的数据库。
      /*+ OPTIONS('server-id'='8001-8004') */ ;   -- 指定mysql-cdc实例server-id范围。
    2. 单击右上方的部署,单击确定,进行作业部署。

    3. 单击左侧导航栏的作业运维,单击刚刚部署的 ODS 作业操作列的启动,选择无状态启动后单击启动

  2. 向计算组加载数据。

    Table Group 是 Hologres 中数据的载体。使用 read_warehouse_1 查询 order_dw 数据库中 Table Group(本示例为 order_dw_tg_default)的数据时,为计算组 read_warehouse_1 加载 order_dw_tg_default,以实现使用init_warehouse计算组写入数据,使用read_warehouse_1计算组进行服务查询。

    HoloWeb 开发页单击 SQL 编辑器,确认实例名和数据库名称后,执行右侧命令。更多详情请参见场景 1:创建全新的计算组实例。加载后,可以查看到 read_warehouse_1 已经加载了 order_dw_tg_default Table Group 的数据。

    --查看当前数据库有哪些Table Group
    SELECT tablegroup_name FROM hologres.hg_table_group_properties GROUP BY tablegroup_name;
    
    --为计算组加载Table Group
    CALL hg_table_group_load_to_warehouse ('order_dw.order_dw_tg_default', 'read_warehouse_1', 1);
    
    --查看计算组加载Table Group的情况
    select * from hologres.hg_warehouse_table_groups;
  3. 右上角切换计算组为 read_warehouse_1,后续使用 read_warehouse_1 进行查询分析。

    image

  4. HoloWeb 中执行右侧命令,查看 MySQL 同步到 Hologres 的 3 张表数据。

    ---查orders中的数据。
    SELECT * FROM orders;
    
    ---查orders_pay中的数据。
    SELECT * FROM orders_pay;
    
    ---查product_catalog中的数据。
    SELECT * FROM product_catalog;

    image

3.构建 DWD 层:实时主题宽表

构建 DWD 层用到了 Hologres 连接器特有的部分列更新能力,可以使用 INSERT DML 方便地表达部分列更新的语义。作业中需要对不同的维表进行查询,是基于 Hologres 行存以及行列共存表提供的高性能的点查能力。同时,Hologres 资源强隔离的架构,可以保证写入、读取、分析等作业之间互不干扰。

  1. 通过 Flink Catalog 功能在 Hologres 中创建 DWD 层的宽表 dwd_orders。

    将代码拷贝到 Flink 的查询脚本中,选中目标片段后单击左侧代码行上的运行

    -- 宽表字段要nullable,因为不同的流写入到同一张结果表,每一列都可能出现null的情况。
    CREATE TABLE dw.order_dw.dwd_orders (
      order_id bigint not null,
      order_user_id string,
      order_shop_id bigint,
      order_product_id bigint,
      order_product_catalog_name string,
      order_fee numeric(20,2),
      order_create_time timestamp,
      order_update_time timestamp,
      order_state int,
      pay_id bigint,
      pay_platform int comment 'platform 0: phone, 1: pc', 
      pay_create_time timestamp,
      PRIMARY KEY(order_id) NOT ENFORCED
    );
    
    -- 支持通过catalog修改Hologres物理表属性。
    ALTER TABLE dw.order_dw.dwd_orders SET (
      'table_property.binlog.ttl' = '604800' --修改binlog的超时时间为一周。
    );
  2. 实现实时消费 ODS 层 orders、orders_pay 表的 binlog。

    创建新的 Flink SQL 流作业草稿,并将代码拷贝到 SQL 编辑器后,部署启动作业。通过如下 SQL 作业,orders 表会与 product_catalog 表进行维表关联,将最终结果写入 dwd_orders 表中,实现数据的实时打宽。

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dwd_orders 
     (
       order_id,
       order_user_id,
       order_shop_id,
       order_product_id,
       order_fee,
       order_create_time,
       order_update_time,
       order_state,
       order_product_catalog_name
     ) 
     SELECT o.*, dim.catalog_name FROM dw.order_dw.orders AS o
     LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
     ON o.product_id = dim.product_id;
    
    
    INSERT INTO dw.order_dw.dwd_orders 
     (pay_id, order_id, pay_platform, pay_create_time)
     SELECT * FROM dw.order_dw.orders_pay;
    
    END;
  3. 查看宽表 dwd_orders 数据。

    HoloWeb 开发页面的 SQL 编辑器执行右侧所示命令。

    SELECT * FROM dwd_orders;

    image

4.构建 DWS 层:实时指标计算

  1. 通过 Flink Catalog 功能,在 Hologres 中创建 DWS 层的聚合 dws_users 以及 dws_shops。

    将代码拷贝到 Flink 的查询脚本中,选中目标片段后单击左侧代码行上的运行

    -- 用户维度聚合指标表。
    CREATE TABLE dw.order_dw.dws_users (
      user_id string not null,
      ds string not null comment 'pay_create_date',
      paied_buy_fee_sum numeric(20,2) not null comment '当日完成支付的总金额', 
      primary key(user_id,ds)  NOT ENFORCED
    );
    
    -- 商户维度聚合指标表。
    CREATE TABLE dw.order_dw.dws_shops (
      shop_id bigint not null,
      ds string not null comment 'pay_create_date',
      paied_buy_fee_sum numeric(20,2) not null comment '当日完成支付的总金额',
      primary key(shop_id,ds)  NOT ENFORCED
    );
  2. 实时消费 DWD 层的宽表 dw.order_dw.dwd_orders,在 Flink 中做聚合计算,写入 Hologres 中的 DWS 表。

    创建新的 Flink SQL 作业草稿,并将右侧代码拷贝到 SQL 编辑器后,部署启动作业。

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dws_users
      SELECT 
        order_user_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
        FROM dw.order_dw.dwd_orders c
        WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。
        GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    
    INSERT INTO dw.order_dw.dws_shops
      SELECT 
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
       FROM dw.order_dw.dwd_orders c
       WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。
       GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    END;
  3. 查看 DWS 层的聚合结果,其结果会根据上游数据的变更实时更新。

    1. 在 HoloWeb 开发页面的 SQL 编辑器执行所示命令,查询 dws_users 表结果,dws_shops 表结果查询类似。

    SELECT * FROM dws_users;

    image

    1. 在 RDS 控制台向 orders 和 orders_pay 表中分别插入 1 条新数据。

    INSERT INTO orders VALUES
    
    (100008, 'user_003', 12345, 5, 6000.02, '2023-02-15 09:40:56', '2023-02-15 18:42:56', 1);
    
    
    INSERT INTO orders_pay VALUES
    
    (2008, 100008, 1, '2023-02-15 19:40:56');
    1. 在 Hologres 控制台查看变更后的表结果(以 dwd_orders 为例)。

    SELECT * FROM dwd_orders;

    p782966.png

方案验证

10

一、通过监控业务延时,验证数据处理实时性

  1. 登录实时计算控制台,打开工作空间,找到作业运维,可以看到之前创建的 3 个 SQL 流作业,当前作业的业务延时都在毫秒级。

image (1)

  1. 点击 dws 层的作业名称查看详情,找到自动调优标签,若开启智能调优功能,可以根据作业延迟情况自动调整作业的并发度,以应对流量变化和优化延迟表现。(具体操作参考文档配置自动调优

image (2)

二、通过执行中间层查询作业,验证数据探查功能

在 Flink 作业运维详情页的血缘关系标签,可以查看完整的数据流动层次结构。

image (3)

如果对中间结果需要即席(Ad-hoc)性质的业务数据探查,或者对最终计算结果进行数据正确性排查,此方案的每一层数据都实现了持久化,可以便捷地探查中间过程。

  1. 流模式探查

将代码拷贝到 Flink 的查询脚本中,选中目标片段后单击左侧代码行上的运行

-- 流模式探查
SET 'execution.runtime-mode' = 'streaming';
SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ --这里的startTime是binlog生成的时间
WHERE order_user_id = 'user_001';

image

  1. 批模式探查

将代码拷贝到 Flink 的查询脚本中,选中目标片段后单击左侧代码行上的运行。批模式探查是获取当前时刻的终态数据,结果如图所示。

SELECT *
FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */ 
WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; --批量模式支持filter下推,提升批作业执行效率。

image

三、通过创建实时报表,实现数据应用

前面已经体验了通过 Flink Catalog,可以仅在 Flink 侧搭建一个基于 Flink 和 Hologres 的 Streaming Warehouse 实时分层数仓。接下来将展示在数仓搭建完成后,一个简易的实时报表应用场景

  1. 创建实时报表

基于 DWD 层宽表数据展示实时报表,支持秒级响应。在 HoloWeb 开发页面,查询 23 年 2 月内每个品类的订单总量和订单总金额代码示例如下。

--holo sql
SELECT
  TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

image

  1. 验证增量数据实时统计

在 RDS 控制台向 orders 和 orders_pay 表中分别插入新的订单数据。

   -- 插入新订单数据
   INSERT INTO orders (order_id, order_user_id, order_shop_id, order_product_id, order_fee, order_create_time, order_update_time, order_state)
   VALUES (100009, 'user_003', 12349, 2, 3000.00, '2023-02-25 10:00:00', '2023-02-25 11:00:00', 1);

   -- 插入新支付数据
   INSERT INTO orders_pay (pay_id, order_id, pay_platform, pay_create_time)
   VALUES (2009, 100009, 1, '2023-02-25 11:30:00');

再次查询 23 年 2 月内每个品类的订单总量和订单总金额,可以看到数据已经被实时统计到报表中。

image (1)

清理资源

15

在本方案中,您创建了 1 台交换机、1 个专有网络 VPC、1 个云数据库 RDS MySQL 实例、1 个 Hologres 实例、1 个 Flink 项目空间、1 个 OSS Bucket 存储空间。体验完方案后,建议及时释放不需要的资源,避免继续产生费用。

  1. 释放 1 个云数据库 RDS MySQL 实例:

    登录RDS 管理控制台实例列表页面,找到目标实例,然后在操作列选择更多>释放实例,根据界面提示释放实例。

  2. 释放 1 个 Hologres 实例:

    登录Hologres 管理控制台,在实例列表页面释放目标 Hologres 实例,详情请参见删除实例

  3. 释放 1 个 Flink 工作空间:

    登录实时计算控制台,在 Flink 全托管页签,单击目标工作空间操作列的更多 > 释放资源,按照界面提示释放工作空间。

  4. 删除 OSS Bucket 存储空间:

    登录OSS管理控制台,删除上述步骤在 OSS Bucket 存储空间下创建的目录及上传的文件。单击 Bucket 列表,然后单击目标 Bucket 名称。在左侧导航栏,单击删除 Bucket,并按照页面指引完成删除操作。

  5. 释放 1 台交换机:

    登录专有网络管理控制台,在交换机页面,找到目标交换机,然后在操作列单击删除,按照界面提示释放实例。

  6. 释放 1 个专有网络 VPC:

    登录专有网络管理控制台,在专有网络页面,找到目标 VPC,然后在操作列单击删除,按照界面提示释放实例。