业务数据随着时间在不断变化,如果您要对数据进行分析,则需要考虑如何存储和管理数据。其中数据中随着时间变化的维度被称为Slowly Changing Dimension(SCD)。E-MapReduce根据实际的数仓场景定义了基于固定粒度的缓慢变化维(G-SCD)。本文为您介绍G-SCD的具体解决方案及如何通过G-SCD处理维度的数据。

背景信息

SCD简介

Slowly Changing Dimension(SCD)即缓慢变化维,是随着时间变化的维度。在数据仓库中存储和管理当前和历史的数据,就需要考虑如何处理缓慢变化维,因此SCD被认为是跟踪维度变化的关键ETL任务之一。

根据处理维度新值的方式,SCD被分为以下三种类型。
类型描述
直接覆盖(Type 1)直接覆盖原值,不保留历史记录。该方式无法分析历史变化的信息。
添加维度行(Type 2)保留所有历史值。当属性值有变化时,都会新增一条记录,并且需要标记当前记录有效,同时修改前一个有效记录的有效性字段。通常可以通过起始时间、截止时间标识记录的有效性。
添加属性列(Type 3)通过额外的字段仅保留前一个版本的值。针对需要分析历史信息的属性添加一列,记录该属性变化前的值,而本属性字段则记录最新的值。

G-SCD概念和解决方案

SCD处理维度新值的三种方式不能覆盖业务的实际场景,所以E-MapReduce根据业务实际数仓场景提出了G-SCD(Based-Granularity Slowly Changing Dimension),即基于固定粒度(或者业务快照)的缓慢变化维。G-SCD按照固定的时间粒度生成一份业务快照数据,其中时间粒度可以是天、小时或者分钟等,同时支持按照时间粒度查询对应时间段的数据。

在传统的数仓体系下,基于Hive表的实现有以下两个解决方案可以考虑,但各有弊端。
解决方案存在的问题
流式构建T+1时刻的增量数据表,和离线表的T时刻分区数据做合并,生成离线表T+1分区。存储资源浪费。
保存离线的基础表,每个业务时刻的增量数据独立保存,在查询数据时合并基础表和增量表。查询性能差。
其中按T保留全量数据的解决方案如下图所示。按T保留全量数据
为了解决上述两个解决方案存在的问题,阿里云E-MapReduce团队基于Delta Lake提供了G-SCD的解决方案,即G-SCD on Delta Lake。G-SCD on Delta Lake方案与SCD的Type 2方案类似,两者之间的相同点和不同点如下表所示。
方案相同点不同点
SCD的Type 2方案保留历史所有信息。每次属性值有变化,都会新增一条记录。
G-SCD on Delta Lake方案GSCD on Delta Lake在具体实现上不是通过新增记录的形式保留信息,而是借助Delta Lake本身的Versioning特性,通过Time-Travel的能力追溯具体的快照数据。
G-SCD on Delta Lake方案如下图所示。G-SCD
G-SCD解决方案的优势如下:
  • 流批一体:不需要增量表和基础表两张表。
  • 存储资源节省:不需要按时间粒度保留历史全量数据。
  • 查询性能高:借助Delta Lake的Optimize、Zorder和DataSkipping的能力,提升查询性能。
  • SQL使用兼容性高:保留原来实现的SQL语句,和利用分区实现快照的方式一样,可以使用类似的分区字段查询对应时间粒度内的快照数据。

前提条件

已创建集群,详情请参见创建集群

使用限制

  • 需要保证Kafka内同一个Partition内的数据严格有序。
  • 数据按Key分区,保证同一Key必须落到同一个Kafka的Partition。

操作流程

  1. 步骤一:创建G-SCD表
    创建G-SCD表,按照要求配置需要的参数。
  2. 步骤二:处理数据
    您可以根据业务数据的情况,选择使用流式写入或者批量写入的方式进行数据的处理。示例中通过两次批量写入代替流式写入的方式模拟G-SCD on Delta Lake的数据处理。
  3. 步骤三:验证数据写入结果
    通过查询语句,验证数据是否写入成功。

步骤一:创建G-SCD表

创建G-SCD表的示例如下,该表会在步骤二:处理数据使用。
CREATE TABLE target (id Int, body String, dt string)
USING delta
TBLPROPERTIES (
  "delta.gscdTypeTable" = "true", 
  "delta.gscdGranularity" = "1 day",
  "delta.gscdColumnFormat" = "yyyy-MM-dd",
  "delta.gscdColumn" = "dt"
);
参数说明如下表所示。
参数说明
delta.gscdTypeTable定义当前表是否为G-SCD Delta Lake表,本文示例需要设置为true。当该值设置为false时则表示该表为普通表,无法使用G-SCD的相关功能。
delta.gscdGranularity业务快照粒度,例如:1 day、1 hour、30 minutes等。
delta.gscdColumnFormat业务快照粒度的格式,支持格式如下:
  • yyyyMM
  • yyyyMMdd
  • yyyyMMddHH
  • yyyyMMddHHmm
  • yyyy-MM
  • yyyy-MM-dd
  • yyyy-MM-dd HH
  • yyyy-MM-dd HH:mm
delta.gscdColumn定义查询时,表示业务快照版本的字段。当前字段也需要在Schema内定义,并且必须为String类型。

步骤二:处理数据

您可以根据业务数据的情况,选择使用流式写入或者批量写入的方式进行数据的处理。

  • 流式写入
    CREATE TABLE IF NOT EXISTS gscd_kafka_table
    USING kafka
    OPTIONS(
      kafka.bootstrap.servers = 'localhost:9092',
      subscribe = 'xxxxxx'
    );
    
    CREATE SCAN gscd_stream ON gscd_kafka_table USING STREAM
    OPTIONS (
      `watermark.time` = 'floor(ts/1000)'  --- 定义源头watermark时间表达式,单位为秒。
    );
    
    CREATE STREAM delta_job
    OPTIONS (
      triggerType = 'ProcessingTime',
      checkpointLocation = '/path/to/checkpoint'
    )
    MERGE INTO gscd_target_table AS target
    USING (
      SELECT *, from_unixtime(ts/1000, 'yyyy-MM-dd') AS dt FROM gscd_stream
    ) AS source
    ON source.id = target.id AND target.dt = source.dt
    WHEN MATCHED THEN update set *
    WHEN NOT MATCHED THEN insert *;
    重要 在上述SQL语句中,watermark的使用原理及注意事项如下:
    • 为了在流作业中自动触发Savepoint,需要在CREATE SCAN语句中指定watermark时间表达式。
    • watermark表示流作业源头的时间值,单位为秒。
    • watermark时间会生成作为delta.gscdColumn字段的值,当watermark时间达到delta.gscdGranularity边界时(示例中定义的为1 day),会自动触发Savepoint。
    • watermark时间要求在同一个Partition内递增有序。
  • 批量写入

    一般场景下,通过流式写入已经可以满足。但当数据异常时,G-SCD on Delta Lake的方案同时提供了回滚Rollback的能力,并可以使用批量离线写入修复数据。修复完成后,执行Savepoint,永久保留当前Version。

    MERGE INTO GSCD("2021-01-01") gscd_target_table
    USING gscd_source_table
    ON source.id = target.id  
    WHEN MATCHED THEN UPDATE SET body = source.body
    WHEN NOT MATCHED THEN INSERT(id, body) VALUES(source.id, source.body);

    在上述SQL语句中,GSCD ("2021-01-01")的语法表示要写入的数据所属的业务粒度值。

    重要 批量写入不支持同一个作业写入多个业务粒度数据。如果存在这种情况,需要提前进行拆分。

为了帮助您快速使用G-SCD处理维度数据,本文给出详细的示例,具体操作步骤如下。

  1. 模拟源数据。
    CREATE TABLE s1 (id Int, body String) USING delta;
    CREATE TABLE s2 (id Int, body String) USING delta;
    INSERT INTO s1 VALUES (1, "addr_1_v1"), (2, "addr_2_v1"), (3, "addr_3_v1");
    INSERT INTO s2 VALUES (2, "addr_2_v2"), (4, "addr_1_v1");
  2. 通过使用两次批量写入,代替流式写入的方式模拟G-SCD on Delta Lake的数据处理。
    1. 第一次批量写入,然后创建对应时间粒度的Savepoint。
      -- 第一次批量写入。
      MERGE INTO GSCD ("2021-01-01") target as target
      USING s1 as source
      ON source.id = target.id  
      WHEN MATCHED THEN UPDATE SET body = source.body
      WHEN NOT MATCHED THEN INSERT(id, body) VALUES(source.id, source.body);
      
      -- 创建2021-01-01的Savepoint。
      CREATE SAVEPOINT target GSCD('2021-01-01');
    2. 第二次批量写入,然后创建对应时间粒度的Savepoint。
      -- 第二次批量写入。
      MERGE INTO GSCD ("2021-01-02") target as target
      USING s2 as source
      ON source.id = target.id  
      WHEN MATCHED THEN UPDATE SET body = source.body
      WHEN NOT MATCHED THEN INSERT(id, body) VALUES(source.id, source.body);
      
      -- 创建2021-01-02的Savepoint。
      CREATE SAVEPOINT target GSCD('2021-01-02');

步骤三:验证数据写入结果

通过查询语句,验证数据是否写入成功。查询在步骤二:处理数据示例中两次批量写入的数据,具体操作如下。

  1. 执行以下命令,查询第一次批量写入的数据。
    select id, body from target where dt = '2021-01-01';
    重要
    • 查询数据时,可以使用正常的SQL语法。
    • 查询数据时,必须指定gscdColumn字段作为查询条件,并且必须为=表达式,例如dt = '2021-01-01'
  2. 执行以下命令,查询第二次批量写入的数据。
    select id, body from target where dt = '2021-01-02';
    如果能够查询到写入的数据,则表明数据写入成功。执行上述查询命令后,返回结果如下图所示。返回结果