Flink+Hologres 搭建实时数仓
技术解决方案部署
90
https://www.aliyun.com/solution/tech-solution/flink_hologres
方案概览
本方案将以某个电商平台数据为例,引导您使用 Flink+Hologres 搭建实时数仓,实现数据的实时加工清洗和对接上层应用数据查询,形成实时数据的分层和复用,以支撑各业务方的报表查询(交易大屏、行为数据分析、用户画像标签)以及个性化推荐等多个场景。
使用 Flink+Hologres 的优势:
中间层可查:Hologres 的每一层数据都支持高效更新与修正、写入即可查,解决了传统实时数仓方案的中间层数据不易查、不易更新、不易修正的问题。
数据可复用:Hologres 的每一层数据都可单独对外提供服务,使得数据可以高效复用,真正实现数仓分层复用的目标。
模型统一,架构简化:实时 ETL 链路的逻辑均基于 Flink SQL 实现;ODS 层、DWD 层和 DWS 层的数据统一存储在 Hologres 中,可以降低架构复杂度,提高数据处理效率。
本方案的体验预计产生费用不超过 35 元(假设运行时间不超过 1.5 小时)。如果您调整了资源规格、数据量、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。
方案架构
本方案的实现原理如下:
Flink 将数据源写入 Hologres,形成 ODS 层。
Flink 订阅 ODS 层的 Binlog 进行加工,形成 DWD 层再次写入 Hologres。
Flink 订阅 DWD 层的 Binlog,通过计算形成 DWS 层,再次写入 Hologres。
最后由 Hologres 对外提供应用查询。
准备资源
25
通过本部分您将创建 1 个专有网络 VPC、1 台交换机、1 个云数据库 RDS MySQL 实例、1 个 Hologres 实例、1 个 Flink 工作空间、1 个 OSS Bucket 存储空间。
请参考表格中的说明和方案默认示例值做详细规划,未提及的规划项请保持默认值。实际部署时可根据您的实际情况进行修改。
RDS MySQL 实例、Flink 工作空间与 Hologres 实例需在同一 VPC 下。
仅 1.3 及以上版本的独享 Hologres 实例支持该实时数仓方案。
1.创建专有网络 VPC 和交换机
登录专有网络管理控制台。
在顶部菜单栏,选择华东 1(杭州)地域(本方案以杭州地域为例)。
在左侧导航栏,单击专有网络,单击创建专有网络。
在创建专有网络页面,按照资源配置创建 1 个专有网络(VPC)和 1 台交换机,然后单击确定。详情请参见创建专有网络和交换机。
2.创建 RDS MySQL 实例
访问RDS 实例创建页面。
按照资源配置完成相应参数配置后,根据页面提示完成支付。
为目标实例创建数据库 order_dw 和具有对应数据库读写权限的普通账号。具体操作请参见创建数据库和账号。
准备 MySQL CDC 数据源。
单击上方的登录数据库。
在登录实例对话框中,填写在步骤 3 中创建的数据库账号名和密码,然后单击登录。
登录成功后,在左侧双击 order_dw 数据库,切换数据库。
在 SQL Console 区域编写三张业务表的建表 DDL 以及插入的数据语句。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- 准备数据 INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
单击执行,单击直接执行。
3.创建独享 Hologres 实例
进入Hologres 产品详情页,单击立即购买。
选择商品类型,按照资源配置完成相应参数配置后,根据页面提示完成开通。
创建成功后,进入管理控制台,单击上面创建的实例名称,单击右上方的登录实例,进入 HoloWeb 页面。
创建名为 order_dw 的数据库,并为用户授权(赋予 admin 权限),使用简单权限模型创建数据库,详情请参见 DB 管理。
说明如果在被授权账号的下拉列表找不到对应的账号,则说明该账号并未添加至当前实例,您需要前往用户管理页面添加用户为 SuperUser。
新增计算组。
您可以通过不同计算组实现资源隔离,使用初始计算组 init_warehouse 用于写入数据,使用
read_warehouse_1
计算组用于服务查询。预留计算资源会全部分配给初始计算组 init_warehouse,需先减少计算组资源,再新增计算组。详情请参见场景 1:创建全新的计算组实例。
单击
,确认实例名为目标实例名称。单击已有资源组 init_warehouse 操作列下的调整配置,本示例减少资源至 32 CU,单击确认。
单击新增计算组,新增名称为 read_warehouse_1,资源为 32 CU 的计算组,单击确认。
4.创建 OSS Bucket
OSS Bucket 用于存储 Flink 作业系统检查点、作业快照、日志和 JAR 包等信息,创建时请参见注意事项。
登录OSS管理控制台,在左侧导航栏,单击 Bucket 列表 ,然后单击创建 Bucket。
在创建页面,填写 Bucket 名称(本方案默认示例 flink-test-oss),选择您的云服务部署的地域(本方案默认示例华东 1(杭州)),单击完成创建后,单击我知道了,确认创建。
5.开通 Flink 全托管
登录实时计算控制台。单击立即购买。
在购买页面,按照资源配置完成相应参数配置后,根据页面提示完成开通。开通详情请参见开通实时计算 Flink 版。
搭建实时数仓
40
1.创建 Catalog
在 Flink 开发控制台的 SQL 开发页面,通过 SQL 方式分别创建 Hologres 和 MySQL 的 Catalog。
登录实时计算控制台,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击 Session 管理,单击创建 Session 集群,为后续创建 Catalog 和查询脚本提供执行环境,详情请参见Session 集群注意事项。
填写名称,并将状态设置为 Running。
选择引擎版本,本方案示例选择最新版本。
其他参数本方案示例保持默认,单击创建 Session 集群。
在左侧导航栏,单击数据开发 > 数据查询 > 查询脚本,单击新建。
创建 Hologres Catalog。
在文本编辑区域,输入右侧配置 Hologres Catalog 的命令,创建名称为 dw 的 Hologres Catalog。其中,您需要修改以下参数取值为您实际 Hologres 服务信息。关于 Hologres Catalog 的更多信息,详情请参见管理 Hologres Catalog。
参数
说明
备注
endpoint
Hologres的 Endpoint 地址。
本示例使用指定 VPC 域名,详情请参见实例配置。
username
阿里云账号的 AccessKey。
当前配置的 AccessKey 对应的用户需要能够访问所有的 Hologres 数据库,Hologres 数据库权限请参见 Hologres 权限模型。
password
阿里云账号的 AccessSecret。
CREATE CATALOG dw WITH ( 'type' = 'hologres', 'endpoint' = '<ENDPOINT>', 'username' = '<USERNAME>', 'password' = '<PASSWORD>', 'dbname' = 'order_dw@init_warehouse', --数据库名称,并指定连接init_warehouse计算组 'binlog' = 'true', -- 创建catalog时可以设置源表、维表和结果表支持的with参数,之后在使用此catalog下的表时会默认添加这些默认参数。 'sdkMode' = 'jdbc', -- 推荐使用jdbc模式。 'cdcmode' = 'true', 'connectionpoolname' = 'the_conn_pool', 'ignoredelete' = 'true', -- 宽表merge需要开启,防止回撤。 'partial-insert.enabled' = 'true', -- 宽表merge需要开启此参数,实现部分列更新。 'mutateType' = 'insertOrUpdate', -- 宽表merge需要开启此参数,实现部分列更新。 'table_property.binlog.level' = 'replica', --也可以在创建catalog时传入持久化的hologres表属性,之后创建表时,默认都开启binlog。 'table_property.binlog.ttl' = '259200' );
选中创建 Catalog 的代码后,单击左侧代码行数上的运行。
返回
The following statement has been executed successfully!
表示创建成功。创建 MySQL Catalog。
输入右侧的命令创建 MySQL Catalog,您需要修改表格中的参数取值为您实际的 MySQL 服务信息。选中创建 MySQL Catalog 的代码后,单击左侧代码行数上的运行。关于 MySQL Catalog 的更多信息,详情请参见管理 MySQL Catalog。
参数
说明
hostname
MySQL 数据库的 IP 地址或者 Hostname。
port
MySQL 数据库服务的端口号,默认值为 3306。
username
MySQL 数据库服务的用户名。
本示例为具有数据库 order_dw 读写权限的账号名。
password
MySQL 数据库服务的密码。
CREATE CATALOG mysqlcatalog WITH( 'type' = 'mysql', 'hostname' = '<hostname>', 'port' = '<port>', 'username' = '<username>', 'password' = '<password>', 'default-database' = 'order_dw' );
2.构建 ODS 层:业务数据库实时入仓
基于 Catalog 的 CREATE DATABASE AS(CDAS)语句功能,可以一次性把 ODS 层建出来。ODS 层一般不直接做 OLAP 或 SERVING(KV 点查),主要作为流式作业的事件驱动,开启 binlog 即可满足需求。Binlog 是 Hologres 的核心能力之一,Hologres 连接器也支持先全量读取再增量消费 Binlog 的全增量模式。
创建 CDAS 同步作业。
在实时计算控制台上,创建新的 SQL 流作业草稿,并将如下代码拷贝到 SQL 编辑器。
CREATE DATABASE IF NOT EXISTS dw.order_dw -- 创建catalog时设置了table_property.binlog.level参数,因此通过CDAS创建的所有表都开启了binlog。 AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- 可以根据需要选择上游数据库需要入仓的表。本案例需选择前面创建了三张业务表的数据库。 /*+ OPTIONS('server-id'='8001-8004') */ ; -- 指定mysql-cdc实例server-id范围。
单击右上方的部署,单击确定,进行作业部署。
单击左侧导航栏的作业运维,单击刚刚部署的 ODS 作业操作列的启动,选择无状态启动后单击启动。
向计算组加载数据。
Table Group 是 Hologres 中数据的载体。使用 read_warehouse_1 查询 order_dw 数据库中 Table Group(本示例为 order_dw_tg_default)的数据时,为计算组 read_warehouse_1 加载 order_dw_tg_default,以实现使用
init_warehouse
计算组写入数据,使用read_warehouse_1
计算组进行服务查询。在 HoloWeb 开发页单击 SQL 编辑器,确认实例名和数据库名称后,执行右侧命令。更多详情请参见场景 1:创建全新的计算组实例。加载后,可以查看到 read_warehouse_1 已经加载了 order_dw_tg_default Table Group 的数据。
--查看当前数据库有哪些Table Group SELECT tablegroup_name FROM hologres.hg_table_group_properties GROUP BY tablegroup_name; --为计算组加载Table Group CALL hg_table_group_load_to_warehouse ('order_dw.order_dw_tg_default', 'read_warehouse_1', 1); --查看计算组加载Table Group的情况 select * from hologres.hg_warehouse_table_groups;
右上角切换计算组为 read_warehouse_1,后续使用 read_warehouse_1 进行查询分析。
在 HoloWeb 中执行右侧命令,查看 MySQL 同步到 Hologres 的 3 张表数据。
---查orders中的数据。 SELECT * FROM orders; ---查orders_pay中的数据。 SELECT * FROM orders_pay; ---查product_catalog中的数据。 SELECT * FROM product_catalog;
3.构建 DWD 层:实时主题宽表
构建 DWD 层用到了 Hologres 连接器特有的部分列更新能力,可以使用 INSERT DML 方便地表达部分列更新的语义。作业中需要对不同的维表进行查询,是基于 Hologres 行存以及行列共存表提供的高性能的点查能力。同时,Hologres 资源强隔离的架构,可以保证写入、读取、分析等作业之间互不干扰。
通过 Flink Catalog 功能在 Hologres 中创建 DWD 层的宽表 dwd_orders。
将代码拷贝到 Flink 的查询脚本中,选中目标片段后单击左侧代码行上的运行。
-- 宽表字段要nullable,因为不同的流写入到同一张结果表,每一列都可能出现null的情况。 CREATE TABLE dw.order_dw.dwd_orders ( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int comment 'platform 0: phone, 1: pc', pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ); -- 支持通过catalog修改Hologres物理表属性。 ALTER TABLE dw.order_dw.dwd_orders SET ( 'table_property.binlog.ttl' = '604800' --修改binlog的超时时间为一周。 );
实现实时消费 ODS 层 orders、orders_pay 表的 binlog。
创建新的 Flink SQL 流作业草稿,并将代码拷贝到 SQL 编辑器后,部署并启动作业。通过如下 SQL 作业,orders 表会与 product_catalog 表进行维表关联,将最终结果写入 dwd_orders 表中,实现数据的实时打宽。
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dwd_orders ( order_id, order_user_id, order_shop_id, order_product_id, order_fee, order_create_time, order_update_time, order_state, order_product_catalog_name ) SELECT o.*, dim.catalog_name FROM dw.order_dw.orders AS o LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id; INSERT INTO dw.order_dw.dwd_orders (pay_id, order_id, pay_platform, pay_create_time) SELECT * FROM dw.order_dw.orders_pay; END;
查看宽表 dwd_orders 数据。
在 HoloWeb 开发页面的 SQL 编辑器执行右侧所示命令。
SELECT * FROM dwd_orders;
4.构建 DWS 层:实时指标计算
通过 Flink Catalog 功能,在 Hologres 中创建 DWS 层的聚合 dws_users 以及 dws_shops。
将代码拷贝到 Flink 的查询脚本中,选中目标片段后单击左侧代码行上的运行。
-- 用户维度聚合指标表。 CREATE TABLE dw.order_dw.dws_users ( user_id string not null, ds string not null comment 'pay_create_date', paied_buy_fee_sum numeric(20,2) not null comment '当日完成支付的总金额', primary key(user_id,ds) NOT ENFORCED ); -- 商户维度聚合指标表。 CREATE TABLE dw.order_dw.dws_shops ( shop_id bigint not null, ds string not null comment 'pay_create_date', paied_buy_fee_sum numeric(20,2) not null comment '当日完成支付的总金额', primary key(shop_id,ds) NOT ENFORCED );
实时消费 DWD 层的宽表 dw.order_dw.dwd_orders,在 Flink 中做聚合计算,写入 Hologres 中的 DWS 表。
创建新的 Flink SQL 作业草稿,并将右侧代码拷贝到 SQL 编辑器后,部署并启动作业。
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dws_users SELECT order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。 GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); INSERT INTO dw.order_dw.dws_shops SELECT order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 订单流和支付流数据都已写入宽表。 GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); END;
查看 DWS 层的聚合结果,其结果会根据上游数据的变更实时更新。
在 HoloWeb 开发页面的 SQL 编辑器执行所示命令,查询 dws_users 表结果,dws_shops 表结果查询类似。
SELECT * FROM dws_users;
在 RDS 控制台向 orders 和 orders_pay 表中分别插入 1 条新数据。
INSERT INTO orders VALUES (100008, 'user_003', 12345, 5, 6000.02, '2023-02-15 09:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, '2023-02-15 19:40:56');
在 Hologres 控制台查看变更后的表结果(以 dwd_orders 为例)。
SELECT * FROM dwd_orders;
方案验证
10
清理资源
15
在本方案中,您创建了 1 台交换机、1 个专有网络 VPC、1 个云数据库 RDS MySQL 实例、1 个 Hologres 实例、1 个 Flink 项目空间、1 个 OSS Bucket 存储空间。体验完方案后,建议及时释放不需要的资源,避免继续产生费用。
释放 1 个云数据库 RDS MySQL 实例:
登录RDS 管理控制台在实例列表页面,找到目标实例,然后在操作列选择更多>释放实例,根据界面提示释放实例。
释放 1 个 Hologres 实例:
登录Hologres 管理控制台,在实例列表页面释放目标 Hologres 实例,详情请参见删除实例。
释放 1 个 Flink 工作空间:
登录实时计算控制台,在 Flink 全托管页签,单击目标工作空间操作列的更多 > 释放资源,按照界面提示释放工作空间。
删除 OSS Bucket 存储空间:
登录OSS管理控制台,删除上述步骤在 OSS Bucket 存储空间下创建的目录及上传的文件。单击 Bucket 列表,然后单击目标 Bucket 名称。在左侧导航栏,单击删除 Bucket,并按照页面指引完成删除操作。
释放 1 台交换机:
登录专有网络管理控制台,在交换机页面,找到目标交换机,然后在操作列单击删除,按照界面提示释放实例。
释放 1 个专有网络 VPC:
登录专有网络管理控制台,在专有网络页面,找到目标 VPC,然后在操作列单击删除,按照界面提示释放实例。