数据质量Spec配置说明

更新时间: 2025-08-13 10:56:55

该Spec是DataWorks数据质量产品中数据质量监控、数据质量规则资源的描述规范,使用JSON格式书写。目前可以作为OpenAPI的入参,操作DataWorks数据质量产品中的相关资源。

基本定义

JSON示例

一个简单的数据质量监控定义如下:

{
  "datasets": [
    {
      "type": "Table",
      "tables": [
        "tb_d_spec_demo"
      ],
      "dataSource": {
        "name": "odps_first", 
        "envType": "Dev"
      }
    }
  ],
  "rules": [
    {
      "assertion": "row_count > 0"
    }
  ]
}

属性说明

上述示例的Spec定义了一个最简单的数据质量监控:

  • datasets:数据质量监控对象,包含3个属性

    • type:监控对象类型,目前只支持一种枚举Table

    • dataSource:监控对象所属的数据源,设置nameenvType来标识数据源,您可以通过ListDataSources获取数据源名称。

      说明

      当前仅支持部分数据源,详情请参见支持的数据源类型列表

    • tables:如果监控对象为Table类型,需配置表名。

      说明

      如果数据源绑定到了数据库级别,同时需要监控非默认Schema的表,table需要按照schema.table的格式配置。

  • rules:期望数据满足的规则,一个数据质量监控可以包含多条规则。

    数据质量监控在执行时,会扫描监控对象中的数据,依次计算出每一个规则所指定的指标值,并与期望的阈值做比较,通过校验是否满足期望阈值来判断该条规则是否通过。

    一般情况下,对数据的期望使用一个包含指标类型(如row_count)、比较符(如>)、阈值(如0)的assertion语句来描述,校验结果通常有3种。

    • pass:通过,采集的指标值落在阈值定义的区间内。

    • fail:不通过,采集的指标值落在阈值定义的区间外。

    • error:校验过程中出现其他异常,比如语法错误等。

    说明

阈值定义方式

assertion中支持的阈值定义方式如下:

固定阈值

适用于需要将指标值与一个固定值直接比较的场景,固定阈值定义包含如下部分:

  • a metric

  • an argument (optional)

  • a comparison symbol(optional)

  • a threshold(optional)

配置示例

{
  "rules": [
    {
      // 数据行数要大于0
      "assertion": "row_count > 0"
    }, {
      // size字段最大值要小于等于500
      "assertion": "max(size) <= 500"
    }
  ]
}

对应固定值阈值表达式的四个部分如下表所示:

表达式组成部分

row_count > 0

max(size) <= 500

a metric

row_count

max

an argument (optional)

(size)

a comparison symbol(optional)

>

<=

a threshold(optional)

0

500

更多支持的比较符,请参见支持的比较符(comparison symbol)

波动阈值

适用于将当前的指标值,与相同指标的历史值做比较的场景,例如:当天的用户数与前一天用户数的差值需控制在100个以内、当天的收入与最近7天的收入均值相比波动范围需控制在10%以内等。

通常在metric前加change for描述波动值阈值,波动值阈值定义包含如下部分:

  • change(关键字)

  • an aggregate type(optional)

  • a time window(optional)

  • percent(关键字 optional)

  • for(关键字)

  • a metric

  • an argument(optional)

  • a comparison symbol(optional)

  • a threshold(optional)

组合后的表达式格式:change [aggregate_type] [time_window] [percent] for metric [argument] [comparison_symbol threshold]

配置示例

示例一:与最近一次校验做差值比较

{
  "rules": [
    {
      // 本次数据行数与上一次校验时数据行数差值要控制在10000行以内
      "assertion": "change for row_count < 10000"
    }
  ]
}

示例二:与指定时间窗口(time_window)的校验结果做差值比较

您可以在change ... for之间添加时间窗口(time_window)来显式指定本次指标值与历史上哪些指标做对比:

{
  "rules": [
    {
      // 本次数据行数与7天前校验时数据行数差值要控制在10000行以内
      "assertion": "change 7 days ago for row_count < 10000"
    }
  ]
}

示例三:将指定时间窗口的校验结果聚合后做差值比较

您可以在change时间窗口(time_window)之间增加聚合方式(aggregate type),系统会将时间窗口之内的校验记录使用对应聚合方式计算出中间结果后,再作为本次校验的参考值:

{
  "rules": [
    {
      // 本次数据行数与最近7天数据行数均值的差值要控制在10000行以内
      "assertion": "change average last 7 days for row_count < 10000"
    }
  ]
}

目前支持如下两种聚合方式:

  • avg:均值。

  • var:方差。

如果不指定聚合方式,系统会将当前指标值与时间窗口之内的所有校验历史记录依次对比,得出与每一个校验历史记录的对比结果,然后取严重等级最高的状态作为最终校验状态。

示例四:与历史校验结果做波动范围百分比比较

您可以在change ... for之间添加percent来指明计算波动百分比之后再与阈值比较:

{
  "rules": [
    {
      // 本次数据行数与上一次校验时数据行数的相差100%要控制在50%以内
      "assertion": "change percent for row_count < 50%"
    }
  ]
}
  • 阈值结尾可以添加 %,提升可读性。

  • 假设本次指标值是c、上一次是clpercent = (c-cl) / cl

    • 如果cl为0、c也为0,计算出的百分比为0。

    • 如果cl为0、c不为0,则无法计算,校验结果为error。

  • 结果可能是一个负数,定义的时候需要注意,可以用下面提到的between...and...来定义波动范围百分比的区间

区间阈值

可以使用between...and...定义区间类型的阈值。

闭区间阈值定义

直接使用between...and...定义的区间,默认是闭区间。例如,当数据行数在[10, 15]区间时,校验通过。

{
  "datasets": [
    {
      "type": "Table",
      "tables": [
        "tb_d_spec_demo"
      ],
      "filter": "dt='$[yyyymmdd]' AND hh='$[hh24-1/24]'",
      "dataSource": {
        "name": "odps_first", 
        "envType": "Dev"
      }
    }
  ],
  "rules": [
    {
      "assertion": "row_count between 10 and 15"
    }
  ]
}

开区间阈值定义

如果添加了圆括号(),代表被修饰的区间边界是开区间。例如,第一条规则定义的区间为(10, 15]、第二条规则定义的区间为(10, 15)

{
  "datasets": [
    {
      "type": "Table",
      "tables": [
        "tb_d_spec_demo"
      ],
      "filter": "dt='$[yyyymmdd]' AND hh='$[hh24-1/24]'",
      "dataSource": {
        "name": "odps_first", 
        "envType": "Dev"
      }
    }
  ],
  "rules": [
    {
      "assertion": "row_count between (10 and 15"
    }, {
      "assertion": "row_count between (10 and 15)"
    }
  ]
}

显示的闭区间阈值定义

如果区间边界被方括号[]修饰,代表被修饰的区间边界为闭区间。例如,4条规则定义的区间分别为:

  • [10, 15]

  • [10, 15)

  • (10, 15]

  • [10, 15]

{
  "datasets": [
    {
      "type": "Table",
      "tables": [
        "tb_d_spec_demo"
      ],
      "filter": "dt='$[yyyymmdd]' AND hh='$[hh24-1/24]'",
      "dataSource": {
        "name": "odps_first"
      }
    }
  ],
  "rules": [
    {
      "assertion": "row_count between 10 and 15"
    }, {
      "assertion": "row_count between [10 and 15"
    }, {
      "assertion": "row_count between 10 and 15]"
    }, {
      "assertion": "row_count between [10 and 15]"
    }
  ]
}

添加多级阈值定义

除了在规则的assertion语句中定义期望阈值,还可以省去assertion语句中的期望阈值,同时设置规则的warnfail属性,来定义更丰富等级的阈值。例如:

{
  "rules": [
    {
      "assertion": "duplicate_count(phone)",
      "warn": "when between 1 and 10",
      "fail": "when > 10"
    }
  ]
}

上述代码片段中,定义了warnfail两级阈值:

  • phone字段的重复行数为0时校验通过。

  • phone字段的重复行数大于10时校验不通过。

  • phone字段的重复行数在1到10之间时,校验结果为warn

定义多个warn和fail阈值

warnfail阈值,分别可以是一个数组,校验时命中数组中的一个条件,即被认为是命中了这个等级的阈值。

{
  "rules": [
    {
      "assertion": "duplicate_count(phone)",
      "warn": [
        "when > 2",
        "when < 0"
      ]
    }
  ]
}

上述代码片段中,当phone字段的重复值行数大于2或者小于0时,均会被认为命中了warn阈值。

warn和fail有重叠时的处理策略

warnfail的阈值区间可以重叠,如果指标值落到了这段重叠的区间,系统会以等级更严重的状态为准,结果为fail

使用not between...and...定义区间补集

您可以在between...and...前使用not取反,获取区间补集,例如:

{
  "rules": [
    {
      "assertion": "duplicate_count(phone)",
      "warn": "when not between -10 and 10",
      "fail": "when not between -20 and 20"
    }
  ]
}

上述代码片段定义的区间如下图所示:

image

设置规则唯一标识(identify)

您可以为规则指定一个identify作为规则的全局唯一标识。

  • 如果创建规则时,没有指定identify,系统会为该规则自动分配一个id

  • 您需要确保identify的全局唯一性,否则在更新时可能会失败,或者误更新到其他的规则,推荐使用uuid

  • 对于波动类阈值的规则(change...for...)和异常检测(anomaly detection for ...)的规则,在查询校验历史时,会根据规则的identify查询。

  • 系统在接收到更新质量监控请求时,处理逻辑如下:

    • 先遍历更新质量监控请求中传入的每一个规则,使用规则的id匹配质量监控中已经存在的规则并进行更新。

    • 将质量监控中已经存在且未匹配到id的规则删除。

    • 在质量监控中将更新请求中剩余规则创建为新规则。

指定identify的示例如下:

{
  "rules": [
    {
      "assertion": "row_count > 0",
      "name": "数据行数大于0",
      // 指定全局唯一标识
      "identify": "3de219e2-cb3c-4e3c-bff0-15f607c641f2"
    }
  ]
}

定义规则业务严重等级(severity

您可以设置规则的severity,标记规则对自身业务影响的严重程度,方便后续管理。例如:

{
  "rules": [
    {
      "assertion": "row_count > 0",
      "severity": "High"
    }
  ]
}

severity支持2个等级:

  • High

  • Normal(默认值)

设置规则启用状态(enabled)

您可以指定规则的enabled标识,用于管理规则的开启状态,对于临时不使用但又不想删除的规则,可以将enabled置为false,表示暂时关闭。在监控执行时,将不会真正触发该规则。

{
  "rules": [
    {
      "assertion": "row_count > 0",
      // 规则的开启状态,默认为true
      "enabled": false
    }
  ]
}

设置前置语句(settingConfig)

部分业务场景下,在执行SQL计算指标之前,需要执行一些SET语句进行参数调整,以确保指标计算的SQL能够正常执行或者保证性能。您可以在规则中添加settingConfig设置,例如:

{
  "rules": [
    {
      "assertion": "row_count > 0",
      // 设置前置执行的set语句
      "settingConfig": "SET odps.sql.udf.timeout=600s; SET odps.sql.python.version=cp27;"
    }
  ]
}

设置保留问题数据开关(collectFailedRows

您可以在规则级别开启问题数据保留的开关,当规则校验未通过(warnfail状态)时,自动过滤导致规则未通过的数据并保存在与监控对象表相同数据库的另外一张表中。开启问题数据保留开关的方式如下:

{
  "rules": [
    {
      "assertion": "duplicate_count(phone) = 0",
      "collectFailedRows": true
    }
  ]
}

您可以将collectFailedRows设置成true,开启问题数据的保留。对于自定义SQL的规则,需要额外指定failedRowsQuery显式的配置问题数据保留的过滤语句:

{
  "rules": [
    {
      "assertion": "id_null_count = 0",
      "id_null_count": {
        "expression": "id IS NULL"
      },
      "collectFailedRows": true,
      "failedRowsQuery": "SELECT * FROM tb_d_spec_demo WHERE dt = '$[yyyymmdd-1]' AND id IS NULL"
    }
  ]
}

除了自定义SQL的规则,仅以下指标支持保留问题数据,其他指标暂不支持开启collectFailedRows

  • missing_count

  • missing_percent

  • duplicate_count

  • duplicate_percent

  • distinct_count

  • distinct_percent

设置规则名称和描述

您可以为规则设置名字、描述,方便管理。

例如,您可以未监控整体设置一个名称、为每一个规则设置各自的名称,这个名称在后续校验结果、页面显示中都可以被使用,方便管理。

{
  "datasets": [
    {
      "type": "Table",
      "tables": [
        "tb_d_spec_demo"
      ],
      "filter": "dt='$[yyyymmdd]' AND hh='$[hh24-1/24]'",
      "dataSource": {
        "name": "odps_first", 
        "envType": "Dev"
      }
    }
  ],
  "rules": [
    {
      "assertion": "row_count > 0",
      // 规则名称和描述
      "name": "数据行数大于0",
      "description": "产出数据不能为空"
    }
  ]
}

设置数据过滤(filter

为规则设置数据过滤

如果在做规则的校验时,只需要使用一部分数据做指标统计,例如只统计id字段不为NULL的数据,您可以为规则添加filter设置。例如:

{
  "rules": [
    {
      "assertion": "row_count > 0",
      "filter": "id IS NOT NULL"
    }
  ]
}

为质量监控设置数据过滤

您也可以在Scan.Dataset中添加filter设置,此时这个filter会在质量监控的所有规则校验过程中生效。

{
  "datasets": [
    {
      "type": "Table",
      "tables": [
        "tb_d_spec_demo"
      ],
      "filter": "dt='$[yyyymmdd]' AND hh='$[hh24-1/24]'",
      "dataSource": {
        "name": "odps_first"
      }
    }
  ]
  "rules": [
    {
      "assertion": "row_count > 0"
    }
  ]
}

上述示例代码片段中,在dataset中添加filter设置,每个规则在做校验时,都会优先使用filter做数据过滤,然后再执行后续校验。

说明

在示例filter的定义中,使用了时间表达式$[yyyymmdd-1],系统会使用触发监控时的triggerTime参数,做时间偏移后替换到filter中。具体系统支持的参数引用方式请参见数据过滤配置

附录

支持的比较符(comparison symbol

  • >

  • >=

  • <

  • <=

  • =

  • !=

  • between ... and ...

支持的数据源类型列表

目前支持的数据源类型包括:

  • maxcompute

  • hologres

  • emr

  • mysql

  • analyticdb_for_mysql

  • analyticdb_for_postgresql

  • cdh

  • starrocks

支持的系统内置指标类型

  • avg

  • row_count

  • sum

  • min

  • max

  • distinct_count

  • distinct_percent

  • table_size

  • missing_count

  • missing_percent

  • duplicate_percent

  • duplicate_count

  • group_by

  • invalid_count

  • invalid_distinct_count

时间窗口(time_window)定义方式

DataWorks数据质量支持多种时间窗口的定义方法,基本的格式如下:

  • n个时间单位前:n (minute[s]|hour[s]|day[s]|week[s]|month[s]) ago,例如n月前n天前n小时前

    • 1天前:1 day ago

    • 7天前:7 days ago

    • 1月前:1 month ago

    • 8小时前:8 hours ago

    • 15分钟前:15 minutes ago

  • 最近一段时间:last n (minute[s]|hour[s]|day[s]|week[s]|month[s]) [with interval m (minute[s]|hour[s]|day[s]|week[s]|month[s])],例如最近15分钟最近7天最近1个月

    • 最近15分钟:last 15 minutes

    • 最近24小时,每隔1个小时取一个时间点:last 24 hours with interval 1 hour

      说明

      从当前时间的24小时之前开始到当前时间截止,每隔一个小时采集一个时间点。

    • 最近7天:last 7 days

    • 最近1个月:last 1 month

  • 某个月的日期、某个周的周几的同一时间:1/2/3/.../-3/-2/-1 of (current|last|n) (months|weeks) (ago)

    • 当月1日同一时间:1 of current month

    • 上月最后一天同一时间:-1 of last month

    • 3周前的周二同一时间:2 of 3 weeks ago

  • n个时间窗口连接:time window and time window [and time window]

    • 1天、7天、1个月前和当月1号:1 day ago and 7 days ago and 1 month ago and 1 of current month

上一篇: 元数据实体相关概念说明 下一篇: 相关限制
阿里云首页 大数据开发治理平台 DataWorks 相关技术圈