业务数据随着时间在不断变化,如果您要对数据进行分析,则需要考虑如何存储和管理数据。其中数据中随着时间变化的维度被称为Slowly Changing Dimension(SCD)。E-MapReduce根据实际的数仓场景定义了基于固定粒度的缓慢变化维(G-SCD)。本文为您介绍G-SCD的具体解决方案及如何通过G-SCD处理维度的数据。
背景信息
SCD简介
Slowly Changing Dimension(SCD)即缓慢变化维,是随着时间变化的维度。在数据仓库中存储和管理当前和历史的数据,就需要考虑如何处理缓慢变化维,因此SCD被认为是跟踪维度变化的关键ETL任务之一。
类型 | 描述 |
---|---|
直接覆盖(Type 1) | 直接覆盖原值,不保留历史记录。该方式无法分析历史变化的信息。 |
添加维度行(Type 2) | 保留所有历史值。当属性值有变化时,都会新增一条记录,并且需要标记当前记录有效,同时修改前一个有效记录的有效性字段。通常可以通过起始时间、截止时间标识记录的有效性。 |
添加属性列(Type 3) | 通过额外的字段仅保留前一个版本的值。针对需要分析历史信息的属性添加一列,记录该属性变化前的值,而本属性字段则记录最新的值。 |
G-SCD概念和解决方案
SCD处理维度新值的三种方式不能覆盖业务的实际场景,所以E-MapReduce根据业务实际数仓场景提出了G-SCD(Based-Granularity Slowly Changing Dimension),即基于固定粒度(或者业务快照)的缓慢变化维。G-SCD按照固定的时间粒度生成一份业务快照数据,其中时间粒度可以是天、小时或者分钟等,同时支持按照时间粒度查询对应时间段的数据。
解决方案 | 存在的问题 |
---|---|
流式构建T+1时刻的增量数据表,和离线表的T时刻分区数据做合并,生成离线表T+1分区。 | 存储资源浪费。 |
保存离线的基础表,每个业务时刻的增量数据独立保存,在查询数据时合并基础表和增量表。 | 查询性能差。 |
方案 | 相同点 | 不同点 |
---|---|---|
SCD的Type 2方案 | 保留历史所有信息。 | 每次属性值有变化,都会新增一条记录。 |
G-SCD on Delta Lake方案 | GSCD on Delta Lake在具体实现上不是通过新增记录的形式保留信息,而是借助Delta Lake本身的Versioning特性,通过Time-Travel的能力追溯具体的快照数据。 |
- 流批一体:不需要增量表和基础表两张表。
- 存储资源节省:不需要按时间粒度保留历史全量数据。
- 查询性能高:借助Delta Lake的Optimize、Zorder和DataSkipping的能力,提升查询性能。
- SQL使用兼容性高:保留原来实现的SQL语句,和利用分区实现快照的方式一样,可以使用类似的分区字段查询对应时间粒度内的快照数据。
前提条件
已创建集群,详情请参见创建集群。
使用限制
- 需要保证Kafka内同一个Partition内的数据严格有序。
- 数据按Key分区,保证同一Key必须落到同一个Kafka的Partition。
操作流程
- 步骤一:创建G-SCD表创建G-SCD表,按照要求配置需要的参数。
- 步骤二:处理数据您可以根据业务数据的情况,选择使用流式写入或者批量写入的方式进行数据的处理。示例中通过两次批量写入代替流式写入的方式模拟G-SCD on Delta Lake的数据处理。
- 步骤三:验证数据写入结果通过查询语句,验证数据是否写入成功。
步骤一:创建G-SCD表
CREATE TABLE target (id Int, body String, dt string)
USING delta
TBLPROPERTIES (
"delta.gscdTypeTable" = "true",
"delta.gscdGranularity" = "1 day",
"delta.gscdColumnFormat" = "yyyy-MM-dd",
"delta.gscdColumn" = "dt"
);
参数 | 说明 |
---|---|
delta.gscdTypeTable | 定义当前表是否为G-SCD Delta Lake表,本文示例需要设置为true。当该值设置为false时则表示该表为普通表,无法使用G-SCD的相关功能。 |
delta.gscdGranularity | 业务快照粒度,例如:1 day、1 hour、30 minutes等。 |
delta.gscdColumnFormat | 业务快照粒度的格式,支持格式如下:
|
delta.gscdColumn | 定义查询时,表示业务快照版本的字段。当前字段也需要在Schema内定义,并且必须为String类型。 |
步骤二:处理数据
您可以根据业务数据的情况,选择使用流式写入或者批量写入的方式进行数据的处理。
- 流式写入
CREATE TABLE IF NOT EXISTS gscd_kafka_table USING kafka OPTIONS( kafka.bootstrap.servers = 'localhost:9092', subscribe = 'xxxxxx' ); CREATE SCAN gscd_stream ON gscd_kafka_table USING STREAM OPTIONS ( `watermark.time` = 'floor(ts/1000)' --- 定义源头watermark时间表达式,单位为秒。 ); CREATE STREAM delta_job OPTIONS ( triggerType = 'ProcessingTime', checkpointLocation = '/path/to/checkpoint' ) MERGE INTO gscd_target_table AS target USING ( SELECT *, from_unixtime(ts/1000, 'yyyy-MM-dd') AS dt FROM gscd_stream ) AS source ON source.id = target.id AND target.dt = source.dt WHEN MATCHED THEN update set * WHEN NOT MATCHED THEN insert *;
重要 在上述SQL语句中,watermark的使用原理及注意事项如下:- 为了在流作业中自动触发Savepoint,需要在
CREATE SCAN
语句中指定watermark时间表达式。 - watermark表示流作业源头的时间值,单位为秒。
- watermark时间会生成作为delta.gscdColumn字段的值,当watermark时间达到delta.gscdGranularity边界时(示例中定义的为1 day),会自动触发Savepoint。
- watermark时间要求在同一个Partition内递增有序。
- 为了在流作业中自动触发Savepoint,需要在
- 批量写入
一般场景下,通过流式写入已经可以满足。但当数据异常时,G-SCD on Delta Lake的方案同时提供了回滚Rollback的能力,并可以使用批量离线写入修复数据。修复完成后,执行Savepoint,永久保留当前Version。
MERGE INTO GSCD("2021-01-01") gscd_target_table USING gscd_source_table ON source.id = target.id WHEN MATCHED THEN UPDATE SET body = source.body WHEN NOT MATCHED THEN INSERT(id, body) VALUES(source.id, source.body);
在上述SQL语句中,
GSCD ("2021-01-01")
的语法表示要写入的数据所属的业务粒度值。重要 批量写入不支持同一个作业写入多个业务粒度数据。如果存在这种情况,需要提前进行拆分。
为了帮助您快速使用G-SCD处理维度数据,本文给出详细的示例,具体操作步骤如下。
- 模拟源数据。
CREATE TABLE s1 (id Int, body String) USING delta; CREATE TABLE s2 (id Int, body String) USING delta; INSERT INTO s1 VALUES (1, "addr_1_v1"), (2, "addr_2_v1"), (3, "addr_3_v1"); INSERT INTO s2 VALUES (2, "addr_2_v2"), (4, "addr_1_v1");
- 通过使用两次批量写入,代替流式写入的方式模拟G-SCD on Delta Lake的数据处理。
步骤三:验证数据写入结果
通过查询语句,验证数据是否写入成功。查询在步骤二:处理数据示例中两次批量写入的数据,具体操作如下。
- 执行以下命令,查询第一次批量写入的数据。
select id, body from target where dt = '2021-01-01';
重要- 查询数据时,可以使用正常的SQL语法。
- 查询数据时,必须指定gscdColumn字段作为查询条件,并且必须为
=
表达式,例如dt = '2021-01-01'
。
- 执行以下命令,查询第二次批量写入的数据。
select id, body from target where dt = '2021-01-02';
如果能够查询到写入的数据,则表明数据写入成功。执行上述查询命令后,返回结果如下图所示。