基于MaxCompute实现拉链表

拉链表是数据仓库设计中用来处理数据变化的一种技术,它允许保存历史数据,记录一个事物从开始到当前状态的所有变化信息,可以反映任意时间点数据的状态。本文将为您介绍基于MaxCompute引擎在DataWorks上实现拉链表ETL的案例。

前提条件

  1. 已创建DataWorks工作空间,详情请参见创建工作空间

  2. 已创建MaxCompute数据源并绑定至工作空间,详情请参见创建MaxCompute数据源并绑定至工作空间

注意事项

本案例通过命令建表,在实际开发中,您可通过DataWorks可视化功能创建,详情请参见创建并使用MaxCompute表

说明

本案例中的数据开发部分任务,也可以通过ETL工作流模板一键导入。在导入模板后,您可以前往目标工作空间,并自行完成任务运维等后续操作。

适用场景

在设计数据仓库的数据模型时,拉链存储技术可作为一种解决方案,满足以下需求:

  • 数据量较大。

  • 表中的部分字段被更新。

    例如,用户的地址、产品的描述信息、订单的状态和手机号码等。

  • 需要查看某一个时间点或时间段的历史快照信息。

    例如,查看某一个订单在某一个历史时间点的状态,或查看某一个用户在过去某段时间内更新过几次等。

  • 变化的比例不大或频率不高。

    假设总共有1000万个会员,且每天新增和发生变化的会员只有10万左右,如果每天都在表中保留一份全量,那么每次全量中会保存很多不变的信息,极大地浪费了存储资源。

关键字段介绍

拉链表通常包含以下几个关键字段:

字段

描述

主键

唯一标识每一行数据。

业务主键

唯一标识实体的主键。例如,会员ID等。

属性字段

需要追踪变化的实体属性。例如,会员昵称、会员手机号等。

有效开始日期

数据版本的有效开始时间。

有效结束日期

数据版本的有效结束时间。通常用一个极大值“9999-12-31”表示当前数据有效。

状态标识符

标识是否为最新版本。

版本

标识相同业务主键的不同历史版本号。

说明

本文主要展示拉链表的实现案例,关于设计部分不在这里过多展开。

案例介绍

  • 数据表设计,本案例共设计2张数据表。

    • 交易下单源表:ods_order_d,用于存放业务库同步过来的每日增量数据。包含如下两个字段:

      • id:业务主键订单ID。

      • status:追踪变化的订单状态。

    • 交易下单事实表(拉链表):dwd_order,用于拉链存储全量有效和失效的数据。包含如下四个字段:

      • id:业务主键订单ID。

      • status:追踪变化的订单状态。

      • start_date:有效开始日期。

      • end_date:有效结束时间。

  • 拉链表实现逻辑。

    • 本案例将使用拉链表记录电商订单从开始到当前状态(创建/支付/完成)的所有变化信息,拉链表的加载逻辑详情请参见:拉链表数据加载逻辑

任务开发:准备工作

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 创建业务流程。

    1. 鼠标悬停至新建图标,单击新建业务流程

    2. 新建业务流程对话框中,输入业务名称描述。本案例业务流程名为体验案例_拉链表的实现

    3. 单击新建

  3. 新建节点。

    在左侧目录树中,双击步骤2中创建的业务流程名称,进入业务流程面板,并通过拖拽组件以及拉线的方式在业务流程画布中编排整个流程。

    本案例中会使用到两个类型节点:虚拟节点开发ODPS SQL任务

    • 虚拟节点将作为整个拉链表实现的起始节点,用于管理整个业务流程;

    • ODPS SQL节点用于执行SQL任务,通过使用SQL语法实现拉链表数据加载。

  4. 任务流程图预览。

    本案例预计将创建以下业务流程。其中,为便于快速辨别每一个任务加工逻辑,本案例使用节点组功能对节点进行分组,节点代码请参照后续步骤。

    image

任务开发:增量表数据准备

  • 在数据准备环节,需要创建两个类型节点:

    • 创建虚拟节点:拉链表实现说明,用于流程管理和介绍的作用。

    • 创建ODPS SQL节点:ods_order_di,用于产出交易下单源表数据。

  • 节点间的依赖关系,如下图所示:

    image

ods_order_di

  • 表数据:交易下单源表,包含业务主键订单ID和需要追踪变化的订单状态status字段,用于存放每日新增交易下单数据。

  • ods_order_di节点编辑页面,需要将以下ods_order.csv测试数据,存储在ods_order_di交易下单源表中。

    • ods_order.csv 测试数据

      id,gmt_create,gmt_modified,status,pt
      210001,2023-10-04,2023-10-04,创建,2023-10-04
      210002,2023-10-04,2023-10-04,创建,2023-10-04
      210001,2023-10-04,2023-10-05,支付,2023-10-05
      210003,2023-10-05,2023-10-05,创建,2023-10-05
      210004,2023-10-05,2023-10-05,创建,2023-10-05
      210001,2023-10-04,2023-10-06,完成,2023-10-06
      210002,2023-10-04,2023-10-06,支付,2023-10-06
      210004,2023-10-05,2023-10-06,支付,2023-10-06
      210005,2023-10-06,2023-10-06,创建,2023-10-06
    • 在插入测试数据的时候,需要按照对应的分区(pt)进行补充,操作如下:

      -- 补充说明:由于案例测试需要,我们将3天的每日新增数据一次性分别插入分区'2023-10-04、2023-10-05、2023-10-06',
      --         在实际业务场景中,一般每日更新当前业务日期分区,仅需将分区日期设置成调度参数变量即可。
      INSERT OVERWRITE TABLE ods_order_di PARTITION (pt = '2023-10-04') VALUES
              (210001,DATE'2023-10-04',DATE'2023-10-04','创建')
              ,(210002,DATE'2023-10-04',DATE'2023-10-04','创建')
      ;
      
      INSERT OVERWRITE TABLE ods_order_di PARTITION (pt = '2023-10-05') VALUES
              (210001,DATE'2023-10-04',DATE'2023-10-05','支付')
              ,(210003,DATE'2023-10-05',DATE'2023-10-05','创建')
              ,(210004,DATE'2023-10-05',DATE'2023-10-05','创建')
      ;
      
      INSERT OVERWRITE TABLE ods_order_di PARTITION (pt = '2023-10-06') VALUES
              (210001,DATE'2023-10-04',DATE'2023-10-06','完成')
              ,(210002,DATE'2023-10-04',DATE'2023-10-06','支付')
              ,(210004,DATE'2023-10-05',DATE'2023-10-06','支付')
              ,(210005,DATE'2023-10-06',DATE'2023-10-06','创建')
    • 完整代码,如下示例:

      -- 1. 创建ods_order_di订单源表,存放每日新增交易下单数据。
      -- 补充说明:为了便于理解,我们约定案例中的“订单创建时间、修改时间字段”粒度到天级别,且默认一个订单一天仅执行一次操作;
      --         实际业务中一般时间字段粒度更细,并且一天可能有多次操作。
      CREATE TABLE IF NOT EXISTS ods_order_di
      (
          id            BIGINT COMMENT '订单ID'
          ,gmt_create   DATE COMMENT '创建时间,格式为yyyy-mm-dd'
          ,gmt_modified DATE COMMENT '更新时间,格式为yyyy-mm-dd'
          ,`status`     STRING COMMENT '状态'
      )
      COMMENT '交易下单源表'
      PARTITIONED BY 
      (
          pt            STRING COMMENT '日期,格式为yyyy-mm-dd'
      )
      LIFECYCLE 7
      ;
      
      -- 2. 初始化历史交易下单新增数据。
      -- 补充说明:由于案例测试需要,我们将3天的每日新增数据一次性分别插入分区'2023-10-04、2023-10-05、2023-10-06',
      --         在实际业务场景中,一般每日更新当前业务日期分区,仅需将分区日期设置成调度参数变量即可。
      INSERT OVERWRITE TABLE ods_order_di PARTITION (pt = '2023-10-04') VALUES
              (210001,DATE'2023-10-04',DATE'2023-10-04','创建')
              ,(210002,DATE'2023-10-04',DATE'2023-10-04','创建')
      ;
      
      INSERT OVERWRITE TABLE ods_order_di PARTITION (pt = '2023-10-05') VALUES
              (210001,DATE'2023-10-04',DATE'2023-10-05','支付')
              ,(210003,DATE'2023-10-05',DATE'2023-10-05','创建')
              ,(210004,DATE'2023-10-05',DATE'2023-10-05','创建')
      ;
      
      INSERT OVERWRITE TABLE ods_order_di PARTITION (pt = '2023-10-06') VALUES
              (210001,DATE'2023-10-04',DATE'2023-10-06','完成')
              ,(210002,DATE'2023-10-04',DATE'2023-10-06','支付')
              ,(210004,DATE'2023-10-05',DATE'2023-10-06','支付')
              ,(210005,DATE'2023-10-06',DATE'2023-10-06','创建')

任务开发:拉链表的实现

  • 创建ODPS SQL节点:dwd_order,用于产出交易下单事实明细表数据。

  • 节点间的依赖关系,如下图所示:

    image

dwd_order

  • 表数据:交易下单事实明细表(拉链表),用于存放全量有效数据和全量失效数据。包含有效开始日期start_date、有效结束时间end_date、业务主键ID和追踪订单状态变化的status字段。

  • dwd_order节点编辑页面,输入如下示例代码:

    -- 1. 创建交易下单事实明细表,存放全量数据(拉链存储)。
    -- 补充说明:拉链表增加记录生命周期的字段start_date(生效日期)、end_date(失效日期),当end_date为极大值'9999-12-31'
    --         时表示数据有效。
    CREATE TABLE IF NOT EXISTS dwd_order
    (
        ID            BIGINT COMMENT '订单ID'
        ,gmt_create   DATE COMMENT '创建时间,格式为yyyy-mm-dd'
        ,gmt_modified DATE COMMENT '更新时间,格式为yyyy-mm-dd'
        ,`status`     STRING COMMENT '状态'
        ,start_date   DATE COMMENT '生效日期,格式为yyyy-mm-dd'
        ,end_date     DATE COMMENT '失效日期,格式为yyyy-mm-dd,9999-12-31表示有效'
    )
    COMMENT '交易下单事实明细表(拉链表)'
    ;
    
    -- 2. 拉链存储的实现逻辑:业务日期当日ods_order_di新增或更新的所有数据插入dwd_order状态为有效数据、dwd_order中
    -- 有更新过的有效数据修改成失效数据。
    INSERT OVERWRITE TABLE dwd_order
    SELECT  id
            ,gmt_create
            ,gmt_modified
            ,`status`
            ,start_date
            ,end_date
    FROM    (
                SELECT  id
                        ,gmt_create
                        ,gmt_modified
                        ,`status`
                        ,start_date
                        ,end_date
                        -- 2.3 支持重跑:使用开窗函数,按id,gmt_create,gmt_modified,`status`分发,按end_date reducer内降序排序,仅取第一条数据。
                        ,ROW_NUMBER() OVER (DISTRIBUTE BY id,gmt_create,gmt_modified,`status` SORT BY end_date DESC ) AS row_num
                FROM    (
                            -- 2.1 当日失效数据:dwd_order中有被更新过的有效数据修改成失效数据。
                            SELECT  a.id AS id
                                    ,a.gmt_create AS gmt_create
                                    ,a.gmt_modified AS gmt_modified
                                    ,a.`status` AS `status`
                                    ,a.start_date AS start_date
                                    ,CASE   WHEN b.id IS NOT NULL
                                                -- AND CAST(a.end_date AS STRING) > '${biz_date}' THEN DATE'${biz_date}' 这里再考虑下用哪个合适
                                                AND CAST(a.end_date AS STRING) == '9999-12-31' THEN DATE'${biz_date}' 
                                            ELSE a.end_date
                                    END AS end_date
                            -- 2.4 调度依赖补充说明:由于当日dwd_order表数据计算依赖于自身昨日数据产出,所以我们在节点「调度配置」-「跨周期依赖」中配置了依赖「本节点」。 
                            FROM    dwd_order AS a
                            LEFT JOIN   (
                                            SELECT  id
                                                    ,gmt_create
                                                    ,gmt_modified
                                                    ,`status`
                                            FROM    ods_order_di
                                            WHERE   pt == '${biz_date}'
                                        ) b
                            ON      a.id == b.id
                            UNION ALL 
                            -- 2.2 当日新增或更新的有效数据:业务日期当日ods_order_di新增或更新的所有数据插入dwd_order状态为有效数据。
                            SELECT  id
                                    ,gmt_create
                                    ,gmt_modified
                                    ,`status`
                                    ,DATE'${biz_date}' AS start_date
                                    ,DATE'9999-12-31' AS end_date
                            FROM    ods_order_di
                            WHERE   pt == '${biz_date}'
                        ) 
            ) with_row_num_tb
    -- 支持重跑
    WHERE   row_num == 1
    ;
  • 调度配置,核心配置要点如下,其他参数保持默认即可。

    • 调度参数:在调度配置 > 调度参数中,点击加载代码中的参数自动生成的参数配置结果,参数值中输入$[yyyy-mm-dd-1],更多关于调度参数的配置说明,请参见调度参数支持的格式

      说明

      您也可以单击用表达式定义,使用手动输入的方式添加调度参数biz_date=$[yyyy-mm-dd-1]

    • 配置跨周期依赖。由于dwd_order表的当天数据处理依赖于自身前一日的数据产出,为了确保数据处理的正确性和连续性。

      调度配置 > 调度依赖 > 跨周期依赖(原上一周期)中勾选本节点的设置自依赖操作。通过配置自依赖,可以保障只有在上一周期的数据成功处理后,才会进行当前周期的数据加载,从而保障了数据处理流程的顺利进行。

拉链表数据加载逻辑

  1. 业务日期当日dwd_order表中,有被更新过的有效数据“修改”成失效数据;

  2. 业务日期当日ods_order_di表中,新增的所有数据插入dwd_order表中,状态为有效数据。

以业务日期10.5日拉链表数据加载为例,如下图所示,具体步骤如下:

  1. 原始数据:在10.4日的拉链表记录中,存在一条订单ID为“210001”,其状态为“创建”。

  2. 状态变更:到10.5日该订单状态被更新为“支付”。

  3. 数据对比逻辑:

    1. 将10.4日拉链表历史全量有效数据和增量表2023-10-05分区新增的数据关联查询。

    2. 更新状态判断:如果ID为“210001”的订单在10.4日拉链表中存在,且end_date为“9999-12-31”,说明该记录有效,同时该订单出现在增量表“2023-10-05”的分区中,则说明状态已更新。

  4. 更新处理:

    1. 需要将原拉链表中记录end_date:“9999-12-31”“修改”为当前业务日期“2023-10-05”。

    2. 如果该订单ID未出现在增量表“2023-10-05”的分区中,则说明无更新,维持原记录不变。

  5. 数据覆盖:

    完成上述步骤后,将拉链表中处理好的历史数据与增量表中新增的所有数据,一同覆盖写回拉链表中,最终产出10.5日拉链表数据。

yuque_diagram.jpg

任务开发:拉链表的使用

  • 本阶段需要创建一个ODPS SQL节点,依赖上述所有任务:

    • 拉链表查询节点。

  • 节点间的依赖关系,如图所示:

    image

  • 表定义:通过上述步骤已经完成了拉链表数据加载,通过创建查询节点,编写SQL查询语句,实现拉链存储结果的查询。

  • 拉链表查询节点编辑页面,输入如下示例代码:

    -- 1. 拉链表使用
    -- 1.1 我们仅需查询dwd_order表中end_date=9999-12-31的记录就可以得到全量最新状态的有效数据。
    SELECT  id
            ,gmt_create
            ,gmt_modified
            ,`status`
            ,start_date
            ,end_date
    FROM    dwd_order
    WHERE   end_date == DATE'9999-12-31'
    ORDER BY id DESC
    LIMIT   100
    ;
    
    -- 1.2 我们仅需查询dwd_order表中指定的订单id并按end_date排序的记录就可以得到该订单所有的历史快照。
    SELECT  id
            ,gmt_create
            ,gmt_modified
            ,`status`
            ,start_date
            ,end_date
    FROM    dwd_order
    WHERE   id == 210001
    ORDER BY end_date DESC
    LIMIT   100
    ;
    
    -- 1.3 查询dwd_order表中全量数据,并按订单id,end_date降序排序。
    SELECT  id
            ,gmt_create
            ,gmt_modified
            ,`status`
            ,start_date
            ,end_date
    FROM    dwd_order
    ORDER BY id,end_date DESC
    LIMIT   100
    ;

运行业务流程

由于上述的案例测试中,我们将3天的每日新增数据一次性分别插入指定分区"2023-10-04、2023-10-05、2023-10-06",所以需要在生产环境执行补数据操作,实现业务数据回刷操作。运维中心补数据操作如下。

  1. 提交任务。

    1. 单击工具栏中的image图表。

    2. 提交对话框中,选中需要提交的节点,输入变更描述

    3. 单击确认,在对话框中出现操作完成,说明任务提交成功。

      如果您使用的是标准模式的工作空间,提交成功后,请单击右上方的发布。具体操作请参见发布任务

  2. 进入运维中心。

    任务发布成功后,单击右上角的运维中心

    您也可以进入业务流程(体验案例_拉链表的实现)的编辑页面,单击工具栏中的前往运维,进入运维中心页面。

  3. 周期任务执行补数据操作。

    1. 在左侧导航栏,单击周期任务运维 > 周期任务,进入周期任务页面,单击业务流程(体验案例_拉链表的实现)的起始根节点拉链表实现说明

    2. 右键单击拉链表实现说明节点,选择补数据 > 当前节点及下游节点

      image

    3. 补数据对话框中,选择业务日期2023-10-04~2023-10-06,在选择需要补数据的节点中勾选所有节点,单击确定,自动跳转至补数据实例页面。

    4. 单击刷新,直至SQL任务全部运行成功即可。

说明

案例完成后,为了避免后续持续产生费用,您可以选择设置节点的调度有效期或者冻结业务流程根节点(虚拟节点拉链表实现说明)。

结果查询

运维中心运行完所有SQL任务后,您可以在日志中查看运行结果:

  1. 补数据示例页面的实例列表中,单击拉链表查询节点操作列的DAG图,点击查看日志

  2. 进入拉链表查询的运行日志详情页面,在运行详情 > 任务执行 > 日志中,可以查看到运行的结果。

    image

说明

若在运维中心运行完所有SQL任务,您也可以回到数据开发(DataStudio)通过单独运行拉链表查询节点,运行查看拉链存储结果。

image

附录:使用ETL工作流模板

DataWorks ETL工作流模板已内置该案例,您直接导入本案例相关代码,具体操作如下:

  1. 登录DataWorks控制台,点击左侧导航栏的大数据体验 > ETL工作流模板,进入ETL工作流模板页面。

  2. ETL工作流模板页面选择拉链表实现业务流程,单击查看详情进入模板页面,点击载入模板

  3. 载入模板对话框中,选择对应的工作空间,并在MaxCompute配置中选择在数据源名称下拉列表中选择数据源,点击确认载入模板。

资源释放

若您在案例测试完成之后,想要清理与释放当前案例生成的资源,您可参考以下文档处理:

相关文档