使用Flink+Hologres搭建实时数仓可以充分利用Flink强大的实时处理能力和Hologres提供的Binlog、行列共存和资源强隔离等能力,实现高效、可扩展的实时数据处理和分析,帮助您更好地应对不断增长的数据量和实时业务需求。本文介绍如何通过实时计算Flink版和实时数仓Hologres搭建实时数仓。
背景信息
随着社会数字化发展,企业对数据时效性的需求越来越强烈。除传统的面向海量数据加工场景设计的离线场景外,大量业务需要解决面向实时加工、实时存储、实时分析的实时场景问题。传统离线数仓搭建的方法论比较明确,通过定时调度实现数仓分层(ODS->DWD->DWS->ADS);但对于实时数仓的搭建,目前缺乏明确的方法体系。基于Streaming Warehouse理念,实现数仓分层之间实时数据的高效流动,可以解决实时数仓分层问题。
方案架构
实时计算Flink版是强大的流式计算引擎,支持对海量实时数据高效处理。Hologres是一站式实时数仓,支持数据实时写入与更新,实时数据写入即可查。Hologres与Flink深度集成,能够提供一体化的实时数仓联合解决方案。本文基于Flink+Hologres搭建实时数仓的方案架构如下:
Flink将数据源写入Hologres,形成ODS层。
Flink订阅ODS层的Binlog进行加工,形成DWD层再次写入Hologres。
Flink订阅DWD层的Binlog,通过计算形成DWS层,再次写入Hologres。
最后由Hologres对外提供应用查询。
该方案有如下优势:
Hologres的每一层数据都支持高效更新与修正、写入即可查,解决了传统实时数仓解决方案的中间层数据不易查、不易更新、不易修正的问题。
Hologres的每一层数据都可单独对外提供服务,数据的高效复用,真正实现数仓分层复用的目标。
模型统一,架构简化。实时ETL链路的逻辑是基于Flink SQL实现的;ODS层、DWD层和DWS层的数据统一存储在Hologres中,可以降低架构复杂度,提高数据处理效率。
该方案依赖于Hologres的3个核心能力,详情如下表所示。
Hologres核心能力 | 详情 |
Binlog | Hologres提供Binlog能力,用于驱动Flink进行实时计算,以此作为流式计算的上游。Hologres的Binlog能力详情请参见订阅Hologres Binlog。 |
行列共存 | Hologres支持行列共存的存储格式。一张表同时存储行存数据和列存数据,并且两份数据强一致。该特性保证中间层表不仅可以作为Flink的源表,也可以作为Flink的维表进行主键点查与维表Join,还可以供其他应用(OLAP、线上服务等)查询。Hologres的行列共存能力详情请参见表存储格式:列存、行存、行列共存。 |
资源强隔离 | Hologres实例的负载较高时,可能影响中间层的点查性能。Hologres支持通过主从实例读写分离部署(共享存储)或计算组实例架构实现资源强隔离,从而保证Flink对Hologres Binlog的数据拉取不影响线上服务。 |
实践场景
本文以某个电商平台为例,通过搭建一套实时数仓,实现数据的实时加工清洗和对接上层应用数据查询,形成实时数据的分层和复用,支撑各个业务方的报表查询(交易大屏、行为数据分析、用户画像标签)以及个性化推荐等多个业务场景。
构建ODS层:业务数据库实时入仓
MySQL有orders(订单表),orders_pay(订单支付表),product_catalog(商品类别字典表)3张业务表,这3张表通过Flink实时同步到Hologres中作为ODS层。
构建DWD层:实时主题宽表
将订单表、商品类别字典表、订单支付表进行实时打宽,生成DWD层宽表。
构建DWS层:实时指标计算
实时消费宽表的binlog,事件驱动地聚合出相应的DWS层指标表。
前提条件
实时计算Flink版、RDS MySQL和Hologres需要在同一VPC。如果不在同一VPC,需要先打通跨VPC的网络或者使用公网的形式访问,详情请参见如何访问跨VPC的其他服务?和Flink全托管如何访问公网?。
通过RAM用户或RAM角色等身份访问Hologres和RDS MySQL资源时,需要其具备对应资源的权限。
已开通实时计算Flink版,详情请参见开通实时计算Flink版。
已购买独享通用型Hologres实例,详情请参见购买Hologres。
购买实例后,需要创建order_dw数据库和用户(为用户赋予admin权限),推荐使用简单权限模型创建数据库,详情请参见简单权限模型的使用和DB管理。
如果在被授权账号的下拉列表找不到对应的账号,则说明该账号并未添加至当前实例,您需要前往用户管理页面添加用户为SuperUser。
说明Hologres1.3版本在创建完数据库后,需要执行create extension hg_binlog命令才能开启binlog扩展。
Hologres2.0之后版本默认开启binlog扩展,无需手动执行。
已创建RDS MySQL实例,并准备MySQL CDC数据源(order_dw数据库中的三张业务表的建表DDL以及插入的数据如下),详情请参见创建RDS MySQL实例和管理数据库。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- 准备数据 INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
使用限制
仅实时计算引擎VVR 6.0.7及以上版本支持该实时数仓方案。
仅1.3及以上版本的Hologres支持该实时数仓方案。
构建实时数仓
创建Catalog
创建Hologres Catalog。
在实时计算控制台SQL开发页面的查询脚本页签,将如下代码拷贝到查询脚本,并修改目标参数取值,选中目标片段后单击左侧代码行上的运行。
CREATE CATALOG dw WITH ( 'type' = 'hologres', 'endpoint' = '<ENDPOINT>', 'username' = '<USERNAME>', 'password' = '${secret_values.ak_holo}', 'dbname' = 'order_dw', 'binlog' = 'true', -- 创建catalog时可以设置源表、维表和结果表支持的with参数,之后在使用此catalog下的表时会默认添加这些默认参数。 'sdkMode' = 'jdbc', -- 推荐使用jdbc模式。 'cdcmode' = 'true', 'connectionpoolname' = 'the_conn_pool', 'ignoredelete' = 'true', -- 宽表merge需要开启,防止回撤。 'partial-insert.enabled' = 'true', -- 宽表merge需要开启此参数,实现部分列更新。 'mutateType' = 'insertOrUpdate', -- 宽表merge需要开启此参数,实现部分列更新。 'table_property.binlog.level' = 'replica', --也可以在创建catalog时传入持久化的hologres表属性,之后创建表时,默认都开启binlog。 'table_property.binlog.ttl' = '259200' );
您需要修改以下参数取值为您实际Hologres服务信息。
参数
说明
备注
endpoint
Hologres的Endpoint地址。
详情请参见实例配置。
username
阿里云账号的AccessKey ID。
当前配置的AccessKey对应的用户需要能够访问所有的Hologres数据库,Hologres数据库权限请参见Hologres权限模型概述。
为了避免您的AK信息泄露,本示例通过使用名为ak_holo密钥的方式填写AccessKey Secret取值,详情请参见变量和密钥管理。
password
阿里云账号的AccessKey Secret。
说明创建Catalog时可以设置默认的源表、维表和结果表的WITH参数,也可以设置创建Hologres物理表的默认属性,例如上方table_property开头的参数。详情请参见管理Hologres Catalog和实时数仓Hologres WITH参数。
创建MySQL Catalog。
将如下代码拷贝到查询脚本,并修改目标参数取值,选中目标片段后单击左侧代码行上的运行。
CREATE CATALOG mysqlcatalog WITH( 'type' = 'mysql', 'hostname' = '<hostname>', 'port' = '<port>', 'username' = '<username>', 'password' = '${secret_values.mysql_pw}', 'default-database' = 'order_dw' );
您需要修改以下参数取值为您实际的MySQL服务信息。
参数
说明
hostname
MySQL数据库的IP地址或者Hostname。
port
MySQL数据库服务的端口号,默认值为3306。
username
MySQL数据库服务的用户名。
password
MySQL数据库服务的密码。
本示例通过使用名为mysql_pw密钥的方式填写密码取值,避免信息泄露,详情请参见变量和密钥管理。
构建ODS层:业务数据库实时入仓
基于Catalog的CREATE DATABASE AS(CDAS)语句功能,可以一次性把ODS层建出来。ODS层一般不直接做OLAP或SERVING(KV点查),主要作为流式作业的事件驱动,开启binlog即可满足需求。Binlog是Hologres的核心能力之一,Hologres连接器也支持先全量读取再增量消费Binlog的全增量模式。
创建CDAS同步作业ODS。
在实时计算控制台上,新建名为ODS的SQL流作业,并将如下代码拷贝到SQL编辑器。
CREATE DATABASE IF NOT EXISTS dw.order_dw -- 创建catalog时设置了table_property.binlog.level参数,因此通过CDAS创建的所有表都开启了binlog。 AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- 可以根据需要选择上游数据库需要入仓的表。 /*+ OPTIONS('server-id'='8001-8004') */ ; -- 指定mysql-cdc源表。
说明本示例默认将数据同步到数据库order_dw的Public Schema下。您也可以将数据同步到Hologres目标库的指定Schema中,详情请参见作为CDAS的目标端Catalog,指定后使用Catalog时的表名格式也会发生变化,详情请参见使用Hologres Catalog。
单击右上方的部署,进行作业部署。
单击左侧导航栏的作业运维,单击刚刚部署的ODS作业操作列的启动,选择无状态启动启动作业。
查看MySQL同步到Hologres的3张表数据。
在HoloWeb开发页面连接Hologres实例并登录目标数据库后,在SQL编辑器上执行如下命令。
---查orders中的数据。 SELECT * FROM orders; ---查orders_pay中的数据。 SELECT * FROM orders_pay; ---查product_catalog中的数据。 SELECT * FROM product_catalog;
构建DWD层:实时主题宽表
构建DWD层用到了Hologres连接器特有的部分列更新能力,可以使用INSERT DML方便地表达部分列更新的语义。作业中需要对不同的维表进行查询,是基于Hologres行存以及行列共存表提供的高性能的点查能力。同时,Hologres资源强隔离的架构,可以保证写入、读取、分析等作业之间互不干扰。
通过Flink Catalog功能在Hologres中建DWD层的宽表dwd_orders。
在实时计算控制台SQL开发页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。
-- 宽表字段要nullable,因为不同的流写入到同一张结果表,每一列都可能出现null的情况。 CREATE TABLE dw.order_dw.dwd_orders ( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int comment 'platform 0: phone, 1: pc', pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ); -- 支持通过catalog修改Hologres物理表属性。 ALTER TABLE dw.order_dw.dwd_orders SET ( 'table_property.binlog.ttl' = '604800' --修改binlog的超时时间为一周。 );
实现实时消费ODS层orders、orders_pay表的binlog。
在实时计算控制台上,新建名为DWD的SQL作业,并将如下代码拷贝到SQL编辑器后,部署并启动作业。通过如下SQL作业,orders表会与product_catalog表进行维表关联,将最终结果写入dwd_orders表中,实现数据的实时打宽。
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dwd_orders ( order_id, order_user_id, order_shop_id, order_product_id, order_fee, order_create_time, order_update_time, order_state, order_product_catalog_name ) SELECT o.*, dim.catalog_name FROM dw.order_dw.orders as o LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id; INSERT INTO dw.order_dw.dwd_orders (pay_id, order_id, pay_platform, pay_create_time) SELECT * FROM dw.order_dw.orders_pay; END;
查看宽表dwd_orders数据。
在HoloWeb开发页面连接Hologres实例并登录目标数据库后,在SQL编辑器上执行如下命令。
SELECT * FROM dwd_orders;
构建DWS层:实时指标计算
通过Flink Catalog功能,在Hologres中创建dws层的聚合dws_users以及dws_shops。
在实时计算控制台SQL开发页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行。
-- 用户维度聚合指标表。 CREATE TABLE dw.order_dw.dws_users ( user_id string not null, ds string not null, paied_buy_fee_sum numeric(20,2) not null comment '当日完成支付的总金额', primary key(user_id,ds) NOT ENFORCED ); -- 商户维度聚合指标表。 CREATE TABLE dw.order_dw.dws_shops ( shop_id bigint not null, ds string not null, paied_buy_fee_sum numeric(20,2) not null comment '当日完成支付总金额', primary key(shop_id,ds) NOT ENFORCED );
实时消费DWD层的宽表dw.order_dw.dwd_orders,在Flink中做聚合计算,最终写入Hologres中的DWS表。
在实时计算控制台上,新建名为DWS的SQL流作业,并将如下代码拷贝到SQL编辑器后,部署并启动作业。
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dws_users SELECT order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。 GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); INSERT INTO dw.order_dw.dws_shops SELECT order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。 GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); END;
查看DWS层的聚合结果,其结果会根据上游数据的变更实时更新。
在HoloWeb开发页面连接Hologres实例并登录目标数据库后,在SQL编辑器上执行如下命令。
查询dws_users表结果。
SELECT * FROM dws_users;
查询dws_shops表结果。
SELECT * FROM dws_shops;
数据探查
因为开启了Binlog,所以可直接探查到数据的变化情况。如果对中间结果需要即席(Ad-hoc)性质的业务数据探查,或者对最终计算结果进行数据正确性排查,此方案的每一层数据都实现了持久化,可以便捷地探查中间过程。
流模式探查
新建并启动数据探查流作业。
在实时计算控制台上,新建名为Data-exploration的SQL流作业,并将如下代码拷贝到SQL编辑器后,部署并启动作业。
-- 流模式探查,打印到print可以看到数据的变化情况。 CREATE TEMPORARY TABLE print_sink( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int, pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ --这里的startTime是binlog生成的时间 WHERE order_user_id = 'user_001';
查看数据探查结果。
在作业运维详情页面,单击目标作业名称,在作业探查页签下左侧运行日志页签,单击运行Task Managers页签下的Path, ID。在Stdout页面搜索user_001相关的日志信息。
批模式探查
在实时计算控制台上,创建SQL流作业,并将如下代码拷贝到SQL编辑器后,单击调试。详情请参见作业调试。
批模式探查是获取当前时刻的终态数据,在Flink作业开发界面调试结果如下图所示。
SELECT * FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */ WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; --批量模式支持filter下推,提升批作业执行效率。
使用实时数仓
上一小节展示了通过Flink Catalog,可以仅在Flink侧搭建一个基于Flink和Hologres的Streaming Warehouse实时分层数仓。本节则展示数仓搭建完成之后的一些简单应用场景。
Key-Value服务
根据主键查询DWS层的聚合指标表,支持百万级RPS。
在HoloWeb开发页面查询指定用户指定日期的消费额的代码示例如下。
-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';
明细查询
对DWD层宽表进行OLAP分析。
在HoloWeb开发页面查询某个客户23年2月特定支付平台支付的订单明细的代码示例如下。
-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;
实时报表
基于DWD层宽表数据展示实时报表,Hologres的行列共存以及列存表有非常优秀的OLAP分析能力,支持秒级响应。
在HoloWeb开发页面查询23年2月内每个品类的订单总量和订单总金额的代码示例如下。
-- holo sql
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
order_product_catalog_name,
COUNT(*),
SUM(order_fee)
FROM
dwd_orders
WHERE
order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date, order_product_catalog_name
ORDER BY
order_create_date, order_product_catalog_name;
相关文档
Hologres的Binlog能力详情,请参见订阅Hologres Binlog。
Flink支持在一个作业中写入多个INSERT INTO语句,语法请参见INSERT INTO语句。
实时计算Flink版支持丰富的连接器,详情请参见支持的连接器。