数据质量Spec配置说明
该Spec是DataWorks数据质量产品中数据质量监控、数据质量规则资源的描述规范,使用JSON格式书写。目前可以作为OpenAPI的入参,操作DataWorks数据质量产品中的相关资源。
基本定义
JSON示例
一个简单的数据质量监控定义如下:
{
"datasets": [
{
"type": "Table",
"tables": [
"tb_d_spec_demo"
],
"dataSource": {
"name": "odps_first",
"envType": "Dev"
}
}
],
"rules": [
{
"assertion": "row_count > 0"
}
]
}
属性说明
上述示例的Spec定义了一个最简单的数据质量监控:
datasets
:数据质量监控对象,包含3个属性type
:监控对象类型,目前只支持一种枚举Table
。dataSource
:监控对象所属的数据源,设置name
和envType
来标识数据源,您可以通过ListDataSources获取数据源名称。说明当前仅支持部分数据源,详情请参见支持的数据源类型列表。
tables
:如果监控对象为Table类型,需配置表名。说明如果数据源绑定到了数据库级别,同时需要监控非默认Schema的表,table需要按照
schema.table
的格式配置。
rules
:期望数据满足的规则,一个数据质量监控可以包含多条规则。数据质量监控在执行时,会扫描监控对象中的数据,依次计算出每一个规则所指定的指标值,并与期望的阈值做比较,通过校验是否满足期望阈值来判断该条规则是否通过。
一般情况下,对数据的期望使用一个包含指标类型(如
row_count
)、比较符(如>
)、阈值(如0
)的assertion
语句来描述,校验结果通常有3种。pass:通过,采集的指标值落在阈值定义的区间内。
fail:不通过,采集的指标值落在阈值定义的区间外。
error:校验过程中出现其他异常,比如语法错误等。
说明如果显式指定了
warn
级别的阈值,校验结果也可能会是warn
。详情请参见添加多级阈值定义。您可以查看
assertion
语句中支持的支持的系统内置指标类型。创建质量监控时,您也可以使用自定义指标规则。
阈值定义方式
assertion
中支持的阈值定义方式如下:
固定阈值
适用于需要将指标值与一个固定值直接比较的场景,固定阈值定义包含如下部分:
a metric
an argument (optional)
a comparison symbol(optional)
a threshold(optional)
配置示例
{
"rules": [
{
// 数据行数要大于0
"assertion": "row_count > 0"
}, {
// size字段最大值要小于等于500
"assertion": "max(size) <= 500"
}
]
}
对应固定值阈值表达式的四个部分如下表所示:
表达式组成部分 |
|
|
a metric |
|
|
an argument (optional) |
| |
a comparison symbol(optional) |
|
|
a threshold(optional) |
|
|
更多支持的比较符,请参见支持的比较符(comparison symbol)。
波动阈值
适用于将当前的指标值,与相同指标的历史值做比较的场景,例如:当天的用户数与前一天用户数的差值需控制在100个以内、当天的收入与最近7天的收入均值相比波动范围需控制在10%以内等。
通常在metric
前加change for
描述波动值阈值,波动值阈值定义包含如下部分:
change(关键字)
an aggregate type(optional)
a time window(optional)
percent(关键字 optional)
for(关键字)
a metric
an argument(optional)
a comparison symbol(optional)
a threshold(optional)
组合后的表达式格式:change [aggregate_type] [time_window] [percent] for metric [argument] [comparison_symbol threshold]
。
配置示例
示例一:与最近一次校验做差值比较
{
"rules": [
{
// 本次数据行数与上一次校验时数据行数差值要控制在10000行以内
"assertion": "change for row_count < 10000"
}
]
}
示例二:与指定时间窗口(time_window)的校验结果做差值比较
您可以在change ... for
之间添加时间窗口(time_window)来显式指定本次指标值与历史上哪些指标做对比:
{
"rules": [
{
// 本次数据行数与7天前校验时数据行数差值要控制在10000行以内
"assertion": "change 7 days ago for row_count < 10000"
}
]
}
示例三:将指定时间窗口的校验结果聚合后做差值比较
您可以在change
和时间窗口(time_window)之间增加聚合方式(aggregate type
),系统会将时间窗口之内的校验记录使用对应聚合方式计算出中间结果后,再作为本次校验的参考值:
{
"rules": [
{
// 本次数据行数与最近7天数据行数均值的差值要控制在10000行以内
"assertion": "change average last 7 days for row_count < 10000"
}
]
}
目前支持如下两种聚合方式:
avg:均值。
var:方差。
如果不指定聚合方式,系统会将当前指标值与时间窗口之内的所有校验历史记录依次对比,得出与每一个校验历史记录的对比结果,然后取严重等级最高的状态作为最终校验状态。
示例四:与历史校验结果做波动范围百分比比较
您可以在change ... for
之间添加percent
来指明计算波动百分比之后再与阈值比较:
{
"rules": [
{
// 本次数据行数与上一次校验时数据行数的相差100%要控制在50%以内
"assertion": "change percent for row_count < 50%"
}
]
}
阈值结尾可以添加 %,提升可读性。
假设本次指标值是
c
、上一次是cl
,percent = (c-cl) / cl
:如果cl为0、c也为0,计算出的百分比为0。
如果cl为0、c不为0,则无法计算,校验结果为error。
结果可能是一个负数,定义的时候需要注意,可以用下面提到的between...and...来定义波动范围百分比的区间
区间阈值
可以使用between...and...
定义区间类型的阈值。
闭区间阈值定义
直接使用between...and...
定义的区间,默认是闭区间。例如,当数据行数在[10, 15]
区间时,校验通过。
{
"datasets": [
{
"type": "Table",
"tables": [
"tb_d_spec_demo"
],
"filter": "dt='$[yyyymmdd]' AND hh='$[hh24-1/24]'",
"dataSource": {
"name": "odps_first",
"envType": "Dev"
}
}
],
"rules": [
{
"assertion": "row_count between 10 and 15"
}
]
}
开区间阈值定义
如果添加了圆括号(
或)
,代表被修饰的区间边界是开区间。例如,第一条规则定义的区间为(10, 15]
、第二条规则定义的区间为(10, 15)
。
{
"datasets": [
{
"type": "Table",
"tables": [
"tb_d_spec_demo"
],
"filter": "dt='$[yyyymmdd]' AND hh='$[hh24-1/24]'",
"dataSource": {
"name": "odps_first",
"envType": "Dev"
}
}
],
"rules": [
{
"assertion": "row_count between (10 and 15"
}, {
"assertion": "row_count between (10 and 15)"
}
]
}
显示的闭区间阈值定义
如果区间边界被方括号[
或]
修饰,代表被修饰的区间边界为闭区间。例如,4条规则定义的区间分别为:
[10, 15]
[10, 15)
(10, 15]
[10, 15]
{
"datasets": [
{
"type": "Table",
"tables": [
"tb_d_spec_demo"
],
"filter": "dt='$[yyyymmdd]' AND hh='$[hh24-1/24]'",
"dataSource": {
"name": "odps_first"
}
}
],
"rules": [
{
"assertion": "row_count between 10 and 15"
}, {
"assertion": "row_count between [10 and 15"
}, {
"assertion": "row_count between 10 and 15]"
}, {
"assertion": "row_count between [10 and 15]"
}
]
}
添加多级阈值定义
除了在规则的assertion
语句中定义期望阈值,还可以省去assertion
语句中的期望阈值,同时设置规则的warn
或fail
属性,来定义更丰富等级的阈值。例如:
{
"rules": [
{
"assertion": "duplicate_count(phone)",
"warn": "when between 1 and 10",
"fail": "when > 10"
}
]
}
上述代码片段中,定义了warn
和fail
两级阈值:
phone
字段的重复行数为0
时校验通过。phone
字段的重复行数大于10时校验不通过。phone
字段的重复行数在1到10之间时,校验结果为warn
。
定义多个warn和fail阈值
warn
和fail
阈值,分别可以是一个数组,校验时命中数组中的一个条件,即被认为是命中了这个等级的阈值。
{
"rules": [
{
"assertion": "duplicate_count(phone)",
"warn": [
"when > 2",
"when < 0"
]
}
]
}
上述代码片段中,当phone
字段的重复值行数大于2或者小于0时,均会被认为命中了warn
阈值。
warn和fail有重叠时的处理策略
warn
和fail
的阈值区间可以重叠,如果指标值落到了这段重叠的区间,系统会以等级更严重的状态为准,结果为fail
。
使用not between...and...定义区间补集
您可以在between...and...
前使用not
取反,获取区间补集,例如:
{
"rules": [
{
"assertion": "duplicate_count(phone)",
"warn": "when not between -10 and 10",
"fail": "when not between -20 and 20"
}
]
}
上述代码片段定义的区间如下图所示:
设置规则唯一标识(identify)
您可以为规则指定一个identify
作为规则的全局唯一标识。
如果创建规则时,没有指定
identify
,系统会为该规则自动分配一个id
。您需要确保
identify
的全局唯一性,否则在更新时可能会失败,或者误更新到其他的规则,推荐使用uuid
。对于波动类阈值的规则(
change...for...
)和异常检测(anomaly detection for ...
)的规则,在查询校验历史时,会根据规则的identify
查询。系统在接收到更新质量监控请求时,处理逻辑如下:
先遍历更新质量监控请求中传入的每一个规则,使用规则的
id
匹配质量监控中已经存在的规则并进行更新。将质量监控中已经存在且未匹配到
id
的规则删除。在质量监控中将更新请求中剩余规则创建为新规则。
指定identify
的示例如下:
{
"rules": [
{
"assertion": "row_count > 0",
"name": "数据行数大于0",
// 指定全局唯一标识
"identify": "3de219e2-cb3c-4e3c-bff0-15f607c641f2"
}
]
}
定义规则业务严重等级(severity)
您可以设置规则的severity
,标记规则对自身业务影响的严重程度,方便后续管理。例如:
{
"rules": [
{
"assertion": "row_count > 0",
"severity": "High"
}
]
}
severity
支持2个等级:
High
Normal(默认值)
设置规则启用状态(enabled)
您可以指定规则的enabled
标识,用于管理规则的开启状态,对于临时不使用但又不想删除的规则,可以将enabled
置为false
,表示暂时关闭。在监控执行时,将不会真正触发该规则。
{
"rules": [
{
"assertion": "row_count > 0",
// 规则的开启状态,默认为true
"enabled": false
}
]
}
设置前置语句(settingConfig)
部分业务场景下,在执行SQL计算指标之前,需要执行一些SET语句进行参数调整,以确保指标计算的SQL能够正常执行或者保证性能。您可以在规则中添加settingConfig
设置,例如:
{
"rules": [
{
"assertion": "row_count > 0",
// 设置前置执行的set语句
"settingConfig": "SET odps.sql.udf.timeout=600s; SET odps.sql.python.version=cp27;"
}
]
}
设置保留问题数据开关(collectFailedRows)
您可以在规则级别开启问题数据保留的开关,当规则校验未通过(warn
、fail
状态)时,自动过滤导致规则未通过的数据并保存在与监控对象表相同数据库的另外一张表中。开启问题数据保留开关的方式如下:
{
"rules": [
{
"assertion": "duplicate_count(phone) = 0",
"collectFailedRows": true
}
]
}
您可以将collectFailedRows
设置成true
,开启问题数据的保留。对于自定义SQL的规则,需要额外指定failedRowsQuery
显式的配置问题数据保留的过滤语句:
{
"rules": [
{
"assertion": "id_null_count = 0",
"id_null_count": {
"expression": "id IS NULL"
},
"collectFailedRows": true,
"failedRowsQuery": "SELECT * FROM tb_d_spec_demo WHERE dt = '$[yyyymmdd-1]' AND id IS NULL"
}
]
}
除了自定义SQL的规则,仅以下指标支持保留问题数据,其他指标暂不支持开启collectFailedRows
:
missing_count
missing_percent
duplicate_count
duplicate_percent
distinct_count
distinct_percent
设置规则名称和描述
您可以为规则设置名字、描述,方便管理。
例如,您可以未监控整体设置一个名称、为每一个规则设置各自的名称,这个名称在后续校验结果、页面显示中都可以被使用,方便管理。
{
"datasets": [
{
"type": "Table",
"tables": [
"tb_d_spec_demo"
],
"filter": "dt='$[yyyymmdd]' AND hh='$[hh24-1/24]'",
"dataSource": {
"name": "odps_first",
"envType": "Dev"
}
}
],
"rules": [
{
"assertion": "row_count > 0",
// 规则名称和描述
"name": "数据行数大于0",
"description": "产出数据不能为空"
}
]
}
设置数据过滤(filter)
为规则设置数据过滤
如果在做规则的校验时,只需要使用一部分数据做指标统计,例如只统计id
字段不为NULL
的数据,您可以为规则添加filter
设置。例如:
{
"rules": [
{
"assertion": "row_count > 0",
"filter": "id IS NOT NULL"
}
]
}
为质量监控设置数据过滤
您也可以在Scan.Dataset
中添加filter
设置,此时这个filter
会在质量监控的所有规则校验过程中生效。
{
"datasets": [
{
"type": "Table",
"tables": [
"tb_d_spec_demo"
],
"filter": "dt='$[yyyymmdd]' AND hh='$[hh24-1/24]'",
"dataSource": {
"name": "odps_first"
}
}
]
"rules": [
{
"assertion": "row_count > 0"
}
]
}
上述示例代码片段中,在dataset
中添加filter
设置,每个规则在做校验时,都会优先使用filter
做数据过滤,然后再执行后续校验。
在示例filter
的定义中,使用了时间表达式$[yyyymmdd-1]
,系统会使用触发监控时的triggerTime
参数,做时间偏移后替换到filter
中。具体系统支持的参数引用方式请参见数据过滤配置。
附录
支持的比较符(comparison symbol)
>
>=
<
<=
=
!=
between ... and ...
支持的数据源类型列表
目前支持的数据源类型包括:
maxcompute
hologres
emr
mysql
analyticdb_for_mysql
analyticdb_for_postgresql
cdh
starrocks
支持的系统内置指标类型
avg
row_count
sum
min
max
distinct_count
distinct_percent
table_size
missing_count
missing_percent
duplicate_percent
duplicate_count
group_by
invalid_count
invalid_distinct_count
时间窗口(time_window)定义方式
DataWorks数据质量支持多种时间窗口的定义方法,基本的格式如下:
n个时间单位前:
n (minute[s]|hour[s]|day[s]|week[s]|month[s]) ago
,例如n月前
、n天前
、n小时前
。1天前:1 day ago
7天前:7 days ago
1月前:1 month ago
8小时前:8 hours ago
15分钟前:15 minutes ago
最近一段时间:
last n (minute[s]|hour[s]|day[s]|week[s]|month[s]) [with interval m (minute[s]|hour[s]|day[s]|week[s]|month[s])]
,例如最近15分钟
、最近7天
、最近1个月
。最近15分钟:last 15 minutes
最近24小时,每隔1个小时取一个时间点:last 24 hours with interval 1 hour
说明从当前时间的24小时之前开始到当前时间截止,每隔一个小时采集一个时间点。
最近7天:last 7 days
最近1个月:last 1 month
某个月的日期、某个周的周几的同一时间:
1/2/3/.../-3/-2/-1 of (current|last|n) (months|weeks) (ago)
当月1日同一时间:1 of current month
上月最后一天同一时间:-1 of last month
3周前的周二同一时间:2 of 3 weeks ago
n个时间窗口连接:
time window and time window [and time window]
1天、7天、1个月前和当月1号:1 day ago and 7 days ago and 1 month ago and 1 of current month