StarRocks提供了开箱即用的数据湖查询功能,非常适用于对湖中的数据进行探查式查询分析。通过使用异步物化视图,您可以为数据湖中的报表和应用实现更高的并发,以及更好的性能。本文为您介绍如何使用StarRocks的异步物化视图来优化数据湖中的查询性能。
适用场景
StarRocks支持基于External Catalog,如Hive Catalog、Iceberg Catalog、Hudi Catalog、JDBC Catalog和Paimon Catalog构建异步物化视图。基于External Catalog的物化视图适用于以下场景。
数据湖报表的透明加速
为了确保数据湖报表的查询性能,数据工程师通常需要与数据分析师紧密合作,研究报告加速层的构建逻辑。如果加速层需求更新,他们必须相应地更新构建逻辑、执行计划和查询语句。通过物化视图的查询改写能力,可以使用户不感知报表加速过程。当识别出慢查询时,数据工程师可以分析慢查询的模式并按需创建物化视图。然后,上层查询会被智能改写,并通过物化视图透明加速,从而实现在不修改业务应用的逻辑或查询语句情况下,快速改善查询性能。
实时数据与离线数据关联的增量计算
假设您的业务应用需要将StarRocks本地表中的实时数据与数据湖中的历史数据关联起来以进行增量计算。在这种情况下,物化视图可以提供一个简单的解决方案。例如,如果实时事实表是StarRocks中的本地表,而维度表存储在数据湖中,您可以通过构建物化视图将本地表与外部数据源中的表关联起来,轻松进行增量计算。
指标层的快速搭建
在处理高维度数据时,计算和处理指标可能会遇到挑战。您可以使用物化视图进行数据预聚合和上卷,以创建一个相对轻量级的指标层。此外,您还可以利用物化视图自动刷新的特性,进一步降低指标计算的复杂性。
功能对比
物化视图、Data Cache和StarRocks中的本地表都是实现显著查询性能提升的有效方法。下表比较了它们的主要区别。
对比项 | Data Cache | 物化视图 | 本地表 |
数据导入和更新 | 查询会自动触发数据缓存 | 自动触发刷新任务 | 支持各种导入方法,但需要手动维护导入任务 |
数据缓存粒度 |
| 存储预计算的查询结果 | 基于表定义存储数据 |
查询性能 | Data Cache ≤ 物化视图 = 本地表 | ||
查询语句 |
|
| 需要修改查询语句以查询本地表 |
与直接查询数据湖数据或将数据导入到本地表中相比,物化视图提供了几个独特的优势:
本地存储加速:物化视图可以利用StarRocks的本地存储加速优势,如索引、分区分桶和Colocate Group,从而相较直接从数据湖查询数据具有更好的查询性能。
无需维护加载任务:物化视图通过自动刷新任务透明地更新数据,无需维护导入任务。此外,基于Hive、Iceberg和Paimon Catalog的物化视图可以检测数据更改并在分区级别执行增量刷新。
智能查询改写:查询可以被透明改写至物化视图,无需修改应用使用的查询语句即可加速查询。
使用建议
建议在以下情况下使用物化视图:
在启用了Data Cache的情况下,查询性能仍不符合您对查询延迟和并发性的要求。
查询涉及可复用的部分,如固定的聚合方式、Join模式。
数据以分区方式组织,而查询聚合度较高(例如按天聚合)。
在以下情况下,建议优先通过Data Cache来实现加速:
查询没有大量可复用的部分,并且可能涉及数据湖中的任何数据。
远程存储存在显著的波动或不稳定性,可能会对访问产生潜在影响。
创建基于External Catalog的物化视图
在External Catalog中的表上创建物化视图与在StarRocks本地表上创建物化视图类似。您只需根据正在使用的数据源设置合适的刷新策略,并手动启用External Catalog物化视图的查询改写功能。
选择合适的刷新策略
目前,StarRocks无法检测Hudi Catalog中的分区级别数据更改。因此,一旦触发刷新任务,将执行全量刷新。
对于Hive Catalog、Iceberg Catalog(从v3.1.4版本起)、JDBC Catalog(从v3.1.4版本起,且仅支持MySQL Range分区)和Paimon Catalog(从v3.2.1版本起),StarRocks支持检测分区级别数据更改。从而,StarRocks可以:
仅刷新数据有更改的分区,避免全量刷新,减少刷新导致的资源消耗。
在查询改写期间在一定程度上确保数据一致性。如果数据湖中的基表发生数据更改,查询将不会被改写为使用物化视图。
您仍然可以选择在创建物化视图时通过设置属性mv_rewrite_staleness_second
来容忍一定程度的数据不一致。
请注意,如需按照分区刷新,物化视图的分区键必须包含在基表的分区键中。
从v3.2.3版本开始,StarRocks支持在使用Partition Transforms
的Iceberg表上创建分区物化视图,物化视图将根据变换后的列进行分区。目前,仅支持使用identity
、year
、month
、day
或hour
Transform的Iceberg表。
以下示例展示了一个使用day
Transform的Iceberg表的定义,并在该表上创建了一个分区对齐的物化视图:
-- Iceberg表定义。
CREATE TABLE spark_catalog.test_db.iceberg_sample_datetime_day (
id BIGINT,
data STRING,
category STRING,
ts TIMESTAMP)
USING iceberg
PARTITIONED BY (days(ts))
-- 基于以上Iceberg表创建物化视图。
CREATE MATERIALIZED VIEW `test_iceberg_datetime_day_mv` (`id`, `data`, `category`, `ts`)
PARTITION BY (`ts`)
DISTRIBUTED BY HASH(`id`)
REFRESH MANUAL
AS
SELECT
`iceberg_sample_datetime_day`.`id`,
`iceberg_sample_datetime_day`.`data`,
`iceberg_sample_datetime_day`.`category`,
`iceberg_sample_datetime_day`.`ts`
FROM `iceberg`.`test`.`iceberg_sample_datetime_day`;
对于Hive Catalog,您可以启用Hive元数据缓存刷新功能,允许StarRocks在分区级别检测数据更改。启用此功能后,StarRocks定期访问Hive元数据存储服务(HMS)或AWS Glue,以检查最近查询的热数据的元数据信息。
配置项
要启用Hive元数据缓存刷新功能,您可以使用ADMIN SET FRONTEND CONFIG
设置以下FE动态配置项。语法如下。
ADMIN SET FRONTEND CONFIG ("key" = "value")
配置名称 | 默认值 | 描述 |
|
| 是否开启Hive元数据缓存周期性刷新。开启后,StarRocks会轮询Hive集群的元数据服务(HMS或AWS Glue),并刷新经常访问的Hive外部数据目录的元数据缓存,以感知数据更新。 |
| 600000(10分钟) | 接连两次Hive元数据缓存刷新之间的间隔。单位:毫秒。 |
| 86400(24小时) | Hive元数据缓存刷新任务过期时间。对于已被访问过的Hive Catalog,如果超过该时间没有被访问,则停止刷新其元数据缓存。对于未被访问过的Hive Catalog,StarRocks不会刷新其元数据缓存。单位:秒。 |
对于Iceberg Catalog,从v3.1.4版本开始,StarRocks支持检测分区级别的数据更改,当前只支持Iceberg V1表。
启用External Catalog物化视图的查询改写
由于不保证数据的强一致性,StarRocks默认禁用Hudi和JDBC Catalog物化视图的查询改写功能。您可以通过在创建物化视图时将Propertyforce_external_table_query_rewrite
设置为true
来启用此功能。对于基于Hive Catalog中的表创建的物化视图,查询改写功能默认开启。在涉及查询改写的情况下,如果您使用非常复杂的查询语句来构建物化视图,我们建议您拆分查询语句并以嵌套方式构建多个简单的物化视图。嵌套的物化视图更加灵活,可以适应更广泛的查询模式。
示例如下。
CREATE MATERIALIZED VIEW ex_mv_par_tbl
PARTITION BY emp_date
DISTRIBUTED BY hash(empid)
PROPERTIES (
"force_external_table_query_rewrite" = "true"
)
AS
SELECT empid, deptno, emp_date
FROM `hudi_catalog`.`emp_db`.`emps_par_tbl`
WHERE empid < 5;
最佳实践
在实际业务场景中,您可以通过分析Audit Log或大查询日志来识别执行较慢、资源消耗较高的查询。您还可以使用Query Profile
来精确定位查询缓慢的特定阶段。以下为通过物化视图提高数据湖查询性能的说明和示例。
案例一:加速数据湖中的Join计算
您可以使用物化视图来加速数据湖中的Join查询。
假设以下Hive catalog上的查询为慢查询。
--Q1
SELECT SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
WHERE
lo_orderdate = d_datekey
AND d_year = 1993
AND lo_discount BETWEEN 1 AND 3
AND lo_quantity < 25;
--Q2
SELECT SUM(lo_extendedprice * lo_discount) AS REVENUE
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates
WHERE
lo_orderdate = d_datekey
AND d_yearmonth = 'Jan1994'
AND lo_discount BETWEEN 4 AND 6
AND lo_quantity BETWEEN 26 AND 35;
--Q3
SELECT SUM(lo_revenue), d_year, p_brand
FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates, hive.ssb_1g_csv.part, hive.ssb_1g_csv.supplier
WHERE
lo_orderdate = d_datekey
AND lo_partkey = p_partkey
AND lo_suppkey = s_suppkey
AND p_brand BETWEEN 'MFGR#2221' AND 'MFGR#2228'
AND s_region = 'ASIA'
GROUP BY d_year, p_brand
ORDER BY d_year, p_brand;
通过分析查询概要,您可能会注意到查询执行时间主要花费在表lineorder
与其他维度表在列lo_orderdate
上的Hash Join上。
此处,Q1和Q2在Joinlineorder
和dates
后执行聚合,而Q3在Joinlineorder
、dates
、part
和supplier
后执行聚合。
因此,您可以利用StarRocks的View Delta Join改写能力来构建物化视图,对lineorder
、dates
、part
和supplier
进行Join。
CREATE MATERIALIZED VIEW lineorder_flat_mv
DISTRIBUTED BY HASH(LO_ORDERDATE, LO_ORDERKEY) BUCKETS 48
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
-- 指定唯一约束。
"unique_constraints" = "
hive.ssb_1g_csv.supplier.s_suppkey;
hive.ssb_1g_csv.part.p_partkey;
hive.ssb_1g_csv.dates.d_datekey",
-- 指定外键约束。
"foreign_key_constraints" = "
hive.ssb_1g_csv.lineorder(lo_partkey) REFERENCES hive.ssb_1g_csv.part(p_partkey);
hive.ssb_1g_csv.lineorder(lo_suppkey) REFERENCES hive.ssb_1g_csv.supplier(s_suppkey);
hive.ssb_1g_csv.lineorder(lo_orderdate) REFERENCES hive.ssb_1g_csv.dates(d_datekey)",
-- 启用查询改写。
"force_external_table_query_rewrite" = "TRUE"
)
AS SELECT
l.LO_ORDERDATE AS LO_ORDERDATE,
l.LO_ORDERKEY AS LO_ORDERKEY,
l.LO_PARTKEY AS LO_PARTKEY,
l.LO_SUPPKEY AS LO_SUPPKEY,
l.LO_QUANTITY AS LO_QUANTITY,
l.LO_EXTENDEDPRICE AS LO_EXTENDEDPRICE,
l.LO_DISCOUNT AS LO_DISCOUNT,
l.LO_REVENUE AS LO_REVENUE,
s.S_REGION AS S_REGION,
p.P_BRAND AS P_BRAND,
d.D_YEAR AS D_YEAR,
d.D_YEARMONTH AS D_YEARMONTH
FROM hive.ssb_1g_csv.lineorder AS l
INNER JOIN hive.ssb_1g_csv.supplier AS s ON s.S_SUPPKEY = l.LO_SUPPKEY
INNER JOIN hive.ssb_1g_csv.part AS p ON p.P_PARTKEY = l.LO_PARTKEY
INNER JOIN hive.ssb_1g_csv.dates AS d ON l.LO_ORDERDATE = d.D_DATEKEY;
案例二:加速数据湖中的聚合和Join后聚合计算
物化视图可用于加速聚合查询,无论是在单个表上还是涉及多个表。使用OSS Foreign Table进行数据湖分析
单表聚合查询
对于典型的单表查询,如果Query Profile显示AGGREGATE节点消耗了大量时间,您可以使用常见的聚合算子构建物化视图。假设以下查询速度较慢。
--Q4 SELECT lo_orderdate, count(distinct lo_orderkey) FROM hive.ssb_1g_csv.lineorder GROUP BY lo_orderdate ORDER BY lo_orderdate limit 100;
Q4是一个计算每日去重订单数量的查询,因为count distinct的消耗较大,可以创建下列两类物化视图:
CREATE MATERIALIZED VIEW mv_2_1 DISTRIBUTED BY HASH(lo_orderdate) PARTITION BY LO_ORDERDATE REFRESH ASYNC EVERY(INTERVAL 1 DAY) AS SELECT lo_orderdate, count(distinct lo_orderkey) FROM hive.ssb_1g_csv.lineorder GROUP BY lo_orderdate; CREATE MATERIALIZED VIEW mv_2_2 DISTRIBUTED BY HASH(lo_orderdate) PARTITION BY LO_ORDERDATE REFRESH ASYNC EVERY(INTERVAL 1 DAY) AS SELECT -- lo_orderkey必须是BIGINT类型,以便可以用于查询改写。 lo_orderdate, bitmap_union(to_bitmap(lo_orderkey)) FROM hive.ssb_1g_csv.lineorder GROUP BY lo_orderdate;
说明此处不要创建带有LIMIT和ORDER BY子句的物化视图,以避免改写失败。
多表聚合查询
在涉及Join结果聚合的场景中,您可以在现有Join多表的物化视图上创建嵌套物化视图,进一步聚合连接结果。例如,根据案例一中的示例,您可以创建以下物化视图以加速Q1和Q2,因为它们的聚合模式相似。
CREATE MATERIALIZED VIEW mv_2_3 DISTRIBUTED BY HASH(lo_orderdate) PARTITION BY LO_ORDERDATE REFRESH ASYNC EVERY(INTERVAL 1 DAY) AS SELECT lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth, SUM(lo_extendedprice * lo_discount) AS REVENUE FROM lineorder_flat_mv GROUP BY lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth;
当然,您也可以在单个物化视图中同时执行Join和聚合计算。尽管这类的物化视图改写查询的机会更少(因为涉及的计算更加具体),但在聚合后,其占用存储空间更少。您可以基于您的真实场景选择。
CREATE MATERIALIZED VIEW mv_2_4 DISTRIBUTED BY HASH(lo_orderdate) PARTITION BY LO_ORDERDATE REFRESH ASYNC EVERY(INTERVAL 1 DAY) PROPERTIES ( "force_external_table_query_rewrite" = "TRUE" ) AS SELECT lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth, SUM(lo_extendedprice * lo_discount) AS REVENUE FROM hive.ssb_1g_csv.lineorder, hive.ssb_1g_csv.dates WHERE lo_orderdate = d_datekey GROUP BY lo_orderdate, lo_discount, lo_quantity, d_year, d_yearmonth;
案例三:加速数据湖中的聚合后Join计算
在某些情况下,您可能需要首先对一个表执行聚合计算,然后再与其他表执行Join查询。为了充分利用StarRocks的查询改写功能,我们建议您构建嵌套的物化视图。示例如下。
--Q5
SELECT * FROM (
SELECT
l.lo_orderkey, l.lo_orderdate, c.c_custkey, c_region, sum(l.lo_revenue)
FROM
hive.ssb_1g_csv.lineorder l
INNER JOIN (
SELECT distinct c_custkey, c_region
from
hive.ssb_1g_csv.customer
WHERE
c_region IN ('ASIA', 'AMERICA')
) c ON l.lo_custkey = c.c_custkey
GROUP BY l.lo_orderkey, l.lo_orderdate, c.c_custkey, c_region
) c1
WHERE
lo_orderdate = '19970503'
Q5首先在customer
表上执行聚合,然后在lineorder
表上执行Join和聚合。类似的查询可能涉及对c_region
和lo_orderdate
的不同过滤条件。为了利用查询改写功能,您可以创建两个物化视图,一个用于聚合,另一个用于连接。
--mv_3_1
CREATE MATERIALIZED VIEW mv_3_1
DISTRIBUTED BY HASH(c_custkey)
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
"force_external_table_query_rewrite" = "TRUE"
)
AS
SELECT distinct c_custkey, c_region from hive.ssb_1g_csv.customer;
--mv_3_2
CREATE MATERIALIZED VIEW mv_3_2
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
PROPERTIES (
"force_external_table_query_rewrite" = "TRUE"
)
AS
SELECT l.lo_orderdate, l.lo_orderkey, mv.c_custkey, mv.c_region, sum(l.lo_revenue)
FROM hive.ssb_1g_csv.lineorder l
INNER JOIN mv_3_1 mv
ON l.lo_custkey = mv.c_custkey
GROUP BY l.lo_orderkey, l.lo_orderdate, mv.c_custkey, mv.c_region;
案例四:对实时数据和数据湖中的历史数据进行冷热分离
例如以下情景:过去三天内的新数据直接写入StarRocks,三天前的旧数据经过校对后批量写入Hive。但是查询仍然可能有涉及过去七天数据。在这种情况下,您可以使用物化视图创建一个简单的模型来自动过期数据。
CREATE MATERIALIZED VIEW mv_4_1
DISTRIBUTED BY HASH(lo_orderdate)
PARTITION BY LO_ORDERDATE
REFRESH ASYNC EVERY(INTERVAL 1 DAY)
AS
SELECT lo_orderkey, lo_orderdate, lo_revenue
FROM hive.ssb_1g_csv.lineorder
WHERE lo_orderdate<=current_date()
AND lo_orderdate>=date_add(current_date(), INTERVAL -4 DAY);
您可以根据上层业务逻辑进一步构建视图或物化视图,以封装计算。