本文介绍如何使用 DataWorks OpenAPI 将企业自有系统中的元数据对象和血缘关系批量注册到 DataWorks 数据地图,实现统一检索、统一展示、统一血缘分析和影响分析。
概述
在企业数据治理过程中,很多关键数据对象并不完全产生于 DataWorks 内部,例如:
BI 报表、看板、指标、数据产品。
内部服务 API、数据服务接口、应用系统对象。
第三方数据库、外部表、外部字段。
企业自研平台中的任务、流程、模型或主题域对象。
如果这些对象没有进入数据地图,您在做数据资产检索、上下游分析、故障排查和变更影响评估时,只能看到 DataWorks 已采集的表和字段,无法形成完整的数据链路。通过自定义实体和血缘注册相关 OpenAPI,您可以:
将外部业务对象注册到数据地图,作为可查询、可展示、可治理的元数据实体。
将 DataWorks 已采集表、字段、数据集与外部对象建立血缘关系。
注册表与表、字段与字段、数据集与表之间的血缘关系。
为自定义实体、表、字段补充业务属性,例如业务域、来源系统、负责人、数据分层。
将外部系统的资产关系沉淀到数据地图,支撑数据溯源、影响分析和统一资产盘点。
批量接入时,可以把 DataWorks 数据地图视为企业自有元数据和血缘关系的统一消费入口。典型链路如下:
企业数据源:业务系统、第三方数据库、BI 报表、API 服务等。
批量接入工程:从源系统抽取实体清单、血缘关系清单、属性定义清单,统一作为 OpenAPI 入参。
DataWorks OpenAPI:调用
CreateMetaEntityDef、CreateCustomAttribute、BatchCreateMetaEntities、CreateLineageRelationship等接口写入实体定义、实体对象、自定义属性和血缘关系。DataWorks 数据地图:统一展示已采集对象与自定义对象,提供检索、详情、血缘图、影响分析等能力。
下游消费:调用
ListLineages等查询接口,支撑数据溯源、影响分析、断链检测、血缘健康巡检等运维场景。
其中,实体清单描述"有哪些对象要纳管",血缘关系清单描述"对象之间如何流转",属性定义清单描述"哪些业务字段需要统一展示、筛选和治理"。
对象类型选择
接入前建议先判断对象是否已经被 DataWorks 纳管,再决定是否需要注册实体。
典型接入场景
场景 | 示例 | 推荐方式 |
将业务对象纳入数据地图 | 报表、API、指标、数据产品 | 注册为自定义实体对象 |
将外部库表纳入数据地图 | 第三方数据库、外部表、外部字段 | 注册为扩展表实体 |
给实体补充业务信息 | 业务域、负责人、来源系统、安全等级 | 创建自定义属性并写入实体 |
注册外部血缘 | 表到报表、表到 API、表到外部表 | 调用 |
注册纳管实体之间血缘 | 表与表、字段与字段、表与字段、数据集与表 | 使用 DataWorks 已采集实体 ID 注册血缘 |
CreateLineageRelationship 支持在 DataWorks 已纳管实体之间注册血缘关系,不要求血缘两端必须包含自定义对象。
非纳管自定义对象(custom-{Type}:{Identifier})主要用于兼容历史用法。新接入场景建议优先注册为 DataWorks 自定义实体对象,再建立血缘关系。
按对象是否纳管选择注册方式
对象类型 | 是否需要注册实体 | 推荐方式 | 后续 ID 获取方式 |
DataWorks 已采集表、字段、数据集、数据集版本 | 否 | 直接使用查询 API 返回的实体 ID 注册血缘 |
|
报表、API、指标、数据产品等业务对象 | 是 | 使用 |
|
DataWorks 未自动采集的外部库表字段 | 是 | 使用扩展表实体建模,先写入 Database,再写入 Table 和内联 Column |
|
非纳管自定义对象 | 否 | 仅用于兼容历史血缘注册场景,新接入不推荐 | 调用方自行维护 |
前置要求
使用本文涉及的 OpenAPI 前,请先确认账号和 DataWorks 版本满足以下条件。
DataWorks 版本
本文示例会创建实体定义、创建实体对象、创建自定义属性并注册血缘关系,因此需要准备 DataWorks 专业版及以上版本。其中:
CreateMetaEntityDef用于创建自定义实体定义或扩展表实体定义,接口要求专业版及以上版本。BatchCreateMetaEntities用于批量创建自定义实体对象、扩展 Database 和扩展 Table,接口要求专业版及以上版本。CreateLineageRelationship用于注册血缘关系,接口要求专业版及以上版本。
如果只调用查询类 API,请以对应 API 详情页中的版本要求为准。
账号和角色权限
示例代码使用 AccessKey 调用 OpenAPI。AccessKey 对应的主账号、RAM 用户或 STS 身份,需要属于目标 DataWorks 租户,并具备调用 DataWorks OpenAPI 的权限。
创建自定义实体定义、自定义属性、纯自定义实体和扩展表实体时,建议使用具备以下任一权限的账号:
租户 Owner。
租户管理员。
租户数据治理管理员。
具备 DataWorks FullAccess 权限的账号。
如果后续要更新 DataWorks 已采集 MaxCompute 表或字段上的业务属性,调用方还需要是 MaxCompute 表 Owner,或对应 DataWorks 项目的项目 Owner、项目管理员。本文 Demo 只读取已采集表和字段的 ID 并注册血缘,不会修改这些已采集表和字段。
运行本文 Demo 前需要准备的数据
如果需要运行本文 Java Demo,类中的 UPSTREAM_TABLE_ID 需要替换成您账号下真实存在的 DataWorks 已采集表 ID。建议通过以下方式获取:
表 ID:调用
ListTables或GetTable,使用返回结果中的Id。字段 ID:调用
ListColumns或GetColumn,使用返回结果中的Id。
对于 DataWorks 已采集到的表、字段、数据集、数据集版本,不建议手动拼接实体 ID,应优先使用对应查询 API 返回的 ID,避免因为 ID 拼接有误或对象尚未采集导致血缘注册失败。通过 BatchCreateMetaEntities 注册的扩展表和扩展字段,也可以在写入成功后通过 ListTables、GetTable、ListColumns、GetColumn 查询并获取返回结果中的 Id。如果后续要把扩展表或扩展字段作为血缘端点,也建议优先使用查询 API 返回的 ID。
如果只按照"批量注册工程方案"接入自己的对象清单和血缘清单,不一定需要使用本文 Demo 中的上游表示例;可以直接使用您清单中真实存在的上游和下游实体 ID。
数据准备模板
批量接入前,推荐至少准备三类结构化清单。可以使用 CSV、Excel 或数据库表保存,字段名称可按企业内部规范调整,但语义建议保持一致。
实体定义清单示例
字段 | 说明 | 示例 |
| 实体定义名称,用于创建实体类型 |
|
| 页面展示名称 | BI 报表 |
| 对象建模方式:自定义实体或扩展表实体 |
|
| 是否扩展 DataWorks 表模型 |
|
| 普通属性定义,使用 JSON 数组保存 |
|
| 业务说明 | 企业 BI 报表资产 |
实体对象清单示例
字段 | 说明 | 示例 |
| 实体类型 |
|
| 实体名称 |
|
| 实体说明 | 每日销售报表 |
| 普通属性值,使用 JSON 对象保存 |
|
| 自定义属性值,使用 JSON 对象保存 |
|
| 扩展 database/table 层级关系 |
|
| 扩展表字段列表,随表写入 |
|
血缘关系清单示例
字段 | 说明 | 示例 |
| 上游实体 ID |
|
| 上游实体名称 |
|
| 下游实体 ID |
|
| 下游实体名称 |
|
| 产生血缘的任务或过程 ID |
|
| 任务类型 |
|
| 任务属性,使用 JSON 对象保存 |
|
接入流程
推荐按照以下流程接入。具体 API 参数、字段约束、错误码和更多 SDK 代码示例,请参见文末"参考文档"中的 API 详情页。
步骤一:梳理需要纳管的对象和关系
先整理企业系统中的对象清单和关系清单。对象清单通常包括:
对象名称,例如订单查询 API、销售报表、外部订单表。
对象类型,例如 API、报表、指标、数据库表。
业务属性,例如负责人、业务域、来源系统。
如果是表或字段,整理库名、表名、字段名和字段类型。
关系清单通常包括:
上游对象。
下游对象。
关系类型,例如表产出报表、字段映射字段、API 写入表。
产生关系的任务或过程,例如同步任务、加工任务、接口调用链路。
如果需要批量接入,建议先按照"数据准备模板"整理实体清单和血缘关系清单,再进入后续 API 调用流程。
步骤二:判断对象是否已经被 DataWorks 纳管
如果对象已经由 DataWorks 采集或平台管理,不需要重复注册实体,直接使用对应查询 API 返回的实体 ID。常见获取方式:
表:调用
ListTables或GetTable。字段:调用
ListColumns或GetColumn。数据集:调用
ListDatasets或GetDataset。数据集版本:调用
ListDatasetVersions或GetDatasetVersion。
如果对象还没有被 DataWorks 纳管,再根据对象类型选择注册方式:
报表、API、指标、系统等业务对象:注册为自定义实体。
具备数据库、表、字段结构的外部数据资产:注册为扩展表实体。
步骤三:创建实体定义
使用 CreateMetaEntityDef 创建实体类型定义。
自定义实体适合描述业务概念,例如 API、报表、指标。示例:
{
"Name": "customer_api",
"DisplayName": "业务API",
"Description": "企业系统中的 API 对象",
"AttributeDefs": [
{
"Name": "apiCode",
"DisplayName": "API编码",
"Type": "STRING",
"IsOptional": false
},
{
"Name": "serviceName",
"DisplayName": "服务名称",
"Type": "STRING",
"IsOptional": true
}
]
}扩展表实体适合描述 DataWorks 未自动采集的外部库表字段。建议在创建扩展表时同步创建对应的扩展 Database 对象,并通过 parentMetaEntityId 建立 Database 到 Table 的层级关系,便于后续在数据地图中按完整库表层级查看。示例:
{
"Name": "bi-table",
"DisplayName": "BI报表",
"Description": "外部 BI 系统中的库表字段",
"Extend": "TABLE"
}具体字段含义和命名约束请参见 CreateMetaEntityDef API 详情页。
步骤四:创建可复用的自定义属性
如果希望在数据地图中对实体展示或筛选业务属性,可以使用 CreateCustomAttribute 创建自定义属性定义。例如:
来源系统。
业务域。
负责人。
数据分层。
安全等级。
示例:
{
"Id": "custom-attribute:source_system",
"DisplayName": "来源系统",
"Type": "ENUM",
"ValueEnums": ["CRM", "ERP", "BI", "CUSTOM_APP"],
"EntityTypes": ["custom_entity-customer_api"],
"SearchFilterEnabled": true,
"DisplayEnabled": true
}自定义属性创建后,可在写入自定义实体、扩展表实体,或更新已采集表字段业务信息时使用。EntityTypes 需要填写可挂载该属性的具体实体类型;如果需要挂载到扩展表或扩展字段,请填写对应扩展实体定义生成的具体类型。
步骤五:创建实体对象
使用 BatchCreateMetaEntities 写入具体实体对象。
自定义实体对象示例:
{
"Entities": [
{
"EntityType": "custom_entity-customer_api",
"Name": "api_001",
"Comment": "业务订单查询 API",
"Attributes": {
"apiCode": "api_001",
"serviceName": "order-service"
},
"CustomAttributes": {
"source_system": ["CRM"]
}
}
]
}BatchCreateMetaEntities 是批量接口,需要同时关注两层成功状态:返回体最外层的 Success 表示本次批量请求是否被服务端成功处理;Results 中每条记录的 Success 表示对应实体对象是否创建成功。实际接入时,不能只判断最外层 Success,还需要逐条检查 Results[].Success,并记录失败项的 ErrorMessage。
扩展表实体对象示例
扩展表写入前,建议先写入同一扩展类型下的数据库实体,再写入表实体并将表的 parentMetaEntityId 指向该数据库实体。这样注册后可以形成完整的 Database -> Table -> Column 层级关系,便于在数据地图中查看和管理。由于 BatchCreateMetaEntities 同一批次的实体必须属于同一种 EntityType,database 和 table 建议拆成两次请求。
先写入外部 database:
{
"Entities": [
{
"EntityType": "custom_bi-database",
"Name": "report_db",
"Comment": "外部 BI 报表库",
"Attributes": {
"parentMetaEntityId": "custom_bi:custom_bi_src",
"technicalMetadata.location": "external://bi/report_db"
}
}
]
}再写入外部 table,并通过 columns 一起注册字段:
{
"Entities": [
{
"EntityType": "custom_bi-table",
"Name": "sales_report",
"Comment": "销售报表结果表",
"Attributes": {
"parentMetaEntityId": "custom_bi-database:custom_bi_src::report_db",
"columns": "[{\"name\":\"order_id\",\"type\":\"STRING\"},{\"name\":\"amount\",\"type\":\"DECIMAL\"}]"
}
}
]
}字段不需要单独调用 BatchCreateMetaEntities 创建。上例写入成功后,外部字段可使用扩展字段 ID 参与字段血缘。扩展表和字段的 ID 可以通过 ListTables、GetTable、ListColumns、GetColumn 查询获得;如果已经明确扩展实体 ID 规则,也可以按规则生成,例如 custom_bi-column:custom_bi_src::report_db::sales_report:order_id。
写入成功后,响应中会返回实体 ID。后续注册血缘、查询详情、更新实体时都使用该 ID。
步骤六:注册血缘关系
使用 CreateLineageRelationship 注册实体之间的上下游关系。血缘方向始终是:上游 SrcEntity -> 下游 DstEntity。
示例一:DataWorks 已采集表到自定义 API。
{
"SrcEntity": {
"Id": "mysql-table:rm-xxx::sales_db::orders",
"Name": "orders"
},
"DstEntity": {
"Id": "custom_entity-customer_api:api_001",
"Name": "api_001"
},
"Task": {
"Id": "table_to_api_001",
"Type": "custom-lineage-task",
"Attributes": {
"scene": "table_to_api"
}
}
}示例二:DataWorks 已采集表到外部扩展表。
{
"SrcEntity": {
"Id": "maxcompute-table:::prod_project:default:dwd_order",
"Name": "dwd_order"
},
"DstEntity": {
"Id": "custom_bi-table:custom_bi_src::report_db::sales_report",
"Name": "sales_report"
},
"Task": {
"Id": "dwd_order_to_sales_report",
"Type": "custom-table-lineage",
"Attributes": {
"scene": "external_bi_table"
}
}
}示例三:DataWorks 已采集字段到外部扩展字段。
{
"SrcEntity": {
"Id": "maxcompute-column:::prod_project:default:dwd_order:order_id",
"Name": "order_id"
},
"DstEntity": {
"Id": "custom_bi-column:custom_bi_src::report_db::sales_report:order_id",
"Name": "order_id"
},
"Task": {
"Id": "dwd_order_id_to_report_order_id",
"Type": "custom-column-lineage",
"Attributes": {
"scene": "external_bi_column"
}
}
}对于字段级血缘,SrcEntity 和 DstEntity 使用字段实体 ID。字段 ID 可以来自 ListColumns 或 GetColumn,包括 DataWorks 已采集字段和已注册扩展表字段。具体请求参数和返回字段请参见 CreateLineageRelationship API 详情页。
步骤七:验证注册结果并消费血缘
注册完成后,建议做两类验证。
验证实体是否写入成功:
自定义实体:调用
GetMetaEntity。扩展表:调用
GetTable。扩展字段:调用
GetColumn。DataWorks 已采集对象:调用对应的 Get API。
实体创建成功后,查询侧可能存在约 1 秒的短暂可见性延迟。如果刚写入后没有立即查询到,可短暂等待后重试。
验证血缘是否写入成功:
查询下游:调用
ListLineages并传入SrcEntityId。查询上游:调用
ListLineages并传入DstEntityId。需要关系详情时,设置
NeedAttachRelationship=true。
血缘注册后可能存在短暂索引延迟。如果刚写入后没有立即查询到,可等待数秒后重试。
完成基础验证后,可以继续把血缘数据用于日常分析:
上游溯源:从目标实体出发,调用
ListLineages查询上游,并按上游节点继续递归查询,用于回答"这个报表或表的数据来自哪里"。下游影响分析:从变更实体出发,调用
ListLineages查询下游,并统计下游实体类型和数量,用于回答"这张表或这个字段变更会影响哪些报表、API 或外部表"。断链检测:将批量注册的实体清单与血缘端点清单做比对,找出没有任何上游或下游的孤立实体,作为待补录或待清理对象。
血缘健康巡检:定期检查实体是否存在、血缘关系是否重复、血缘端点是否可查询、批量写入是否有失败项,形成巡检报表。
多层级影响分析的实现思路如下:
queue = [changedEntityId]
visited = set()
while queue is not empty:
current = queue.pop()
downstream = ListLineages(SrcEntityId=current)
for entity in downstream:
if entity.id not in visited:
visited.add(entity.id)
queue.push(entity.id)
/* 输出 visited 中的实体类型、实体名称和影响路径。 */建议在批量接入工程中保存每次注册成功的实体 ID 和血缘关系 ID。后续做影响分析、断链检测或增量更新时,可以直接基于这份结果表和查询 API 做比对,不需要重新解析原始业务系统数据。
批量注册工程方案
当接入规模从几个对象扩大到几百个报表、上千张外部表或上万条血缘关系时,建议把注册过程设计成可重复运行、可断点续跑、可审计的批处理任务。
推荐写入顺序
批量接入时,建议按照以下顺序执行:
创建或确认实体定义:调用
CreateMetaEntityDef。创建或确认自定义属性:调用
CreateCustomAttribute。写入扩展 Database:调用
BatchCreateMetaEntities。写入扩展 Table,并随 Table 写入 Column。
写入纯自定义实体,例如报表、API、指标。
注册血缘关系:调用
CreateLineageRelationship。查询验证并生成接入结果报告。
批次、重试和断点续跑
BatchCreateMetaEntities 是实体批量写入接口,单次请求最多写入 5 个实体,并且同一批次必须是同一种 EntityType。实际工程中建议先按 EntityType 分组,再按 5 条一批切分。
CreateLineageRelationship 按单条血缘关系写入。血缘量较大时,可以控制并发度分批调用,并对限流或临时服务异常做退避重试。
建议采用以下错误处理模式:
for batch in splitByEntityType(rows, size=5):
resp = BatchCreateMetaEntities(batch)
if resp.Success is not true:
write_batch_failed(batch, resp.RequestId, resp.Message)
continue
for result in resp.Results:
if result.Success is true:
mark_done(result.Id)
else:
write_row_failed(result.Id, result.ErrorMessage, resp.RequestId)这里需要同时判断两层成功状态:最外层 Success 表示请求是否被服务端成功处理,Results[].Success 表示每个实体对象是否写入成功。批量工程不要因为最外层 Success=true 就认为所有实体都已成功。
断点续跑建议:
为实体和血缘使用稳定的业务 ID,避免每次运行生成不同标识。
每批写入后记录成功项、失败项、
RequestId、失败原因和原始行号。重跑时跳过已成功项,只重试失败项或变更项。
运行前可通过
GetMetaEntity、GetCustomAttribute、ListLineageRelationships等查询接口确认目标对象是否已经存在。4xx 类数据错误通常需要修正清单后再重试;限流、网络抖动、临时服务异常可以做有限次数退避重试。
对于客户侧每日同步任务,推荐保留"期望状态表"和"上次成功写入结果表"。增量同步时只处理新增和变更对象;定期全量比对时,用期望状态表与 DataWorks 查询结果做差异检查,发现缺失、重复或废弃关系后再补录或清理。
页面效果
Demo 注册完成后,您可以在 DataWorks 数据地图中看到以下效果。实际页面展示会受账号权限、地域、数据地图页面版本和注册数据内容影响。
自定义实体可被搜索和筛选
注册自定义实体定义和实体对象后,可以在数据地图中搜索到对应实体。若实体定义中的属性开启了搜索筛选能力,页面左侧会展示相应筛选项(例如按业务域、来源系统、负责人过滤报表实体)。
自定义实体详情展示属性信息
进入自定义实体详情页后,可以查看实体基础信息、普通属性和自定义属性。例如 BI 报表实体详情页会展示报表编码、业务域、负责人以及来源系统等业务字段。
自定义属性可统一管理
通过 CreateCustomAttribute 创建的自定义属性会进入数据地图的自定义属性管理页,可查看属性名称、类型、适用实体类型等信息,便于统一维护和复用。
扩展表实体进入表资产搜索
注册扩展 Database、Table 和 Column 后,外部库表可以进入数据地图的表资产视图,并按数据源类型、数据库等条件检索,与 DataWorks 已采集表统一呈现。
表级和字段级血缘在血缘图中展示
注册"表到自定义实体"、"表到表"、"字段到字段"血缘后,可以在数据地图的血缘页面查看上下游关系。例如同一张销售汇总表可以同时关联到自定义报表实体和扩展表实体,并向下展示字段级血缘。
Java SDK 样例
下面提供一个可直接编译运行的 Java Demo,同时覆盖:
注册 BI 报表自定义实体,并建立 DataWorks 已采集表到报表的血缘。
注册外部扩展表,并建立表到表、字段到字段血缘。
Demo 中的业务标识集中放在类顶部常量里。实际接入时,请替换为您账号下真实的上游表 ID、字段 ID、报表名称、外部库表名称等。首次运行前,也请确认示例中的实体定义、自定义属性和实体对象在账号下尚未存在,或改成企业内部唯一的业务标识。
建议先通过 ListTables 或 GetTable 获取真实上游表 ID,通过 ListColumns 或 GetColumn 获取真实上游字段 ID,再运行 Demo。
1. 准备工程
工程目录:
custom-metadata-lineage-demo
├── pom.xml
└── src
└── main
└── java
└── CustomMetadataLineageDemo.javapom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>example</groupId>
<artifactId>custom-metadata-lineage-demo</artifactId>
<version>1.0.0</version>
<properties>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>dataworks_public20240518</artifactId>
<version>${dataworks.sdk.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<mainClass>CustomMetadataLineageDemo</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>2. 编写 Demo
将以下代码保存为 src/main/java/CustomMetadataLineageDemo.java。
import com.aliyun.dataworks_public20240518.Client;
import com.aliyun.dataworks_public20240518.models.BatchCreateMetaEntitiesRequest;
import com.aliyun.dataworks_public20240518.models.BatchCreateMetaEntitiesResponse;
import com.aliyun.dataworks_public20240518.models.CreateCustomAttributeRequest;
import com.aliyun.dataworks_public20240518.models.CreateLineageRelationshipRequest;
import com.aliyun.dataworks_public20240518.models.CreateMetaEntityDefRequest;
import com.aliyun.dataworks_public20240518.models.GetCustomAttributeRequest;
import com.aliyun.dataworks_public20240518.models.GetCustomAttributeResponse;
import com.aliyun.dataworks_public20240518.models.GetMetaEntityDefRequest;
import com.aliyun.dataworks_public20240518.models.GetMetaEntityDefResponse;
import com.aliyun.dataworks_public20240518.models.LineageEntity;
import com.aliyun.dataworks_public20240518.models.LineageTask;
import com.aliyun.dataworks_public20240518.models.MetaEntityAttributeDef;
import com.aliyun.dataworks_public20240518.models.MetaEntityWriteResult;
import com.aliyun.teaopenapi.models.Config;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CustomMetadataLineageDemo {
private static final String REGION_ID = "cn-shanghai";
// DataWorks 已采集表 ID。请通过 ListTables/GetTable 查询后替换。
private static final String UPSTREAM_TABLE_ID = "maxcompute-table:::prod_project:default:dwd_sales_order";
private static final String UPSTREAM_TABLE_NAME = "dwd_sales_order";
// BI 报表示例。
private static final String REPORT_ENTITY_TYPE = "custom_entity-bi_report";
private static final String REPORT_ENTITY_NAME = "daily_sales_report";
// 外部扩展表示例。
private static final String EXT_PREFIX = "warehouse";
private static final String EXT_SOURCE = "custom_external_bi_src";
private static final String EXT_DATABASE = "analytics_db";
private static final String EXT_TABLE = "ads_sales_summary";
private static final String EXT_TABLE_ID =
"custom_" + EXT_PREFIX + "-table:" + EXT_SOURCE + "::" + EXT_DATABASE + "::" + EXT_TABLE;
private static final String EXT_ORDER_ID_COLUMN_ID =
"custom_" + EXT_PREFIX + "-column:" + EXT_SOURCE + "::" + EXT_DATABASE + "::" + EXT_TABLE + ":order_id";
private static final String EXT_AMOUNT_COLUMN_ID =
"custom_" + EXT_PREFIX + "-column:" + EXT_SOURCE + "::" + EXT_DATABASE + "::" + EXT_TABLE + ":amount";
public static void main(String[] args) throws Exception {
Client client = new Client(new Config()
.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
.setRegionId(REGION_ID));
if (System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID") == null
|| System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") == null) {
throw new IllegalArgumentException(
"请先设置 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET");
}
String reportId = registerBiReport(client);
registerExternalTable(client);
createLineage(client, UPSTREAM_TABLE_ID, UPSTREAM_TABLE_NAME,
reportId, REPORT_ENTITY_NAME, "dw_table_to_bi_report", "custom-table_to_report");
createLineage(client, UPSTREAM_TABLE_ID, UPSTREAM_TABLE_NAME,
EXT_TABLE_ID, EXT_TABLE, "dw_table_to_external_table", "custom-table_to_table");
createLineage(client, EXT_ORDER_ID_COLUMN_ID, "order_id",
EXT_AMOUNT_COLUMN_ID, "amount", "ext_col_to_ext_col", "custom-column_to_column");
System.out.println("自定义实体、扩展表和血缘注册完成。");
}
private static String registerBiReport(Client client) throws Exception {
ensureMetaEntityDef(client, REPORT_ENTITY_TYPE, new CreateMetaEntityDefRequest()
.setName("bi_report")
.setDisplayName("BI 报表")
.setDescription("企业 BI 报表资产")
.setExtend("NONE")
.setAttributeDefs(Arrays.asList(
new MetaEntityAttributeDef()
.setName("reportCode").setDisplayName("报表编码").setType("STRING").setIsOptional(false),
new MetaEntityAttributeDef()
.setName("reportUrl").setDisplayName("报表地址").setType("STRING").setIsOptional(true)
)));
ensureCustomAttribute(client, "custom-attribute:sourceSystem", new CreateCustomAttributeRequest()
.setId("custom-attribute:sourceSystem")
.setDisplayName("来源系统")
.setType("ENUM")
.setValueEnums(Arrays.asList("BI", "CRM", "ERP"))
.setEntityTypes(Collections.singletonList(REPORT_ENTITY_TYPE))
.setDisplayEnabled(true)
.setSearchFilterEnabled(true));
Map<String, String> attrs = new HashMap<String, String>();
attrs.put("reportCode", REPORT_ENTITY_NAME);
attrs.put("reportUrl", "https://bi.example.com/reports/daily_sales");
Map<String, List<String>> customAttrs = new HashMap<String, List<String>>();
customAttrs.put("sourceSystem", Collections.singletonList("BI"));
String reportId = createOneEntity(client,
new BatchCreateMetaEntitiesRequest.BatchCreateMetaEntitiesRequestEntities()
.setEntityType(REPORT_ENTITY_TYPE)
.setName(REPORT_ENTITY_NAME)
.setComment("每日销售报表")
.setAttributes(attrs)
.setCustomAttributes(customAttrs));
Thread.sleep(1000L); // 实体创建成功后,查询侧可能存在约 1 秒可见性延迟。
return reportId;
}
private static void registerExternalTable(Client client) throws Exception {
ensureMetaEntityDef(client, "custom_" + EXT_PREFIX + "-table", new CreateMetaEntityDefRequest()
.setName(EXT_PREFIX + "-table")
.setDisplayName("外部数仓表")
.setDescription("DataWorks 未自动采集的外部库表")
.setExtend("TABLE"));
Map<String, String> dbAttrs = new HashMap<String, String>();
dbAttrs.put("parentMetaEntityId", "custom_" + EXT_PREFIX + ":" + EXT_SOURCE);
dbAttrs.put("technicalMetadata.location", "external://" + EXT_DATABASE);
createOneEntity(client, new BatchCreateMetaEntitiesRequest.BatchCreateMetaEntitiesRequestEntities()
.setEntityType("custom_" + EXT_PREFIX + "-database")
.setName(EXT_DATABASE)
.setComment("外部分析库")
.setAttributes(dbAttrs));
Map<String, String> tableAttrs = new HashMap<String, String>();
tableAttrs.put("parentMetaEntityId", "custom_" + EXT_PREFIX + "-database:" + EXT_SOURCE + "::" + EXT_DATABASE);
tableAttrs.put("tableType", "TABLE");
tableAttrs.put("columns", "["
+ "{\"name\":\"order_id\",\"type\":\"STRING\",\"comment\":\"订单 ID\",\"position\":1},"
+ "{\"name\":\"amount\",\"type\":\"DECIMAL\",\"comment\":\"销售金额\",\"position\":2}"
+ "]");
createOneEntity(client, new BatchCreateMetaEntitiesRequest.BatchCreateMetaEntitiesRequestEntities()
.setEntityType("custom_" + EXT_PREFIX + "-table")
.setName(EXT_TABLE)
.setComment("外部销售汇总表")
.setAttributes(tableAttrs));
Thread.sleep(1000L);
}
private static String createOneEntity(
Client client,
BatchCreateMetaEntitiesRequest.BatchCreateMetaEntitiesRequestEntities entity) throws Exception {
BatchCreateMetaEntitiesResponse response = client.batchCreateMetaEntities(
new BatchCreateMetaEntitiesRequest().setEntities(Collections.singletonList(entity)));
if (response.getBody() == null || !Boolean.TRUE.equals(response.getBody().getSuccess())) {
throw new RuntimeException("BatchCreateMetaEntities 请求失败");
}
MetaEntityWriteResult result = response.getBody().getResults().get(0);
if (!Boolean.TRUE.equals(result.getSuccess())) {
if (isAlreadyExists(result.getErrorMessage())) {
String existingId = buildExpectedEntityId(entity);
System.out.println("实体已存在,复用: " + existingId);
return existingId;
}
throw new RuntimeException("实体创建失败: " + result.getErrorMessage());
}
System.out.println("实体创建成功: " + result.getId());
return result.getId();
}
private static String buildExpectedEntityId(
BatchCreateMetaEntitiesRequest.BatchCreateMetaEntitiesRequestEntities entity) {
String entityType = entity.getEntityType();
String name = entity.getName();
if (entityType.startsWith("custom_entity-")) {
return entityType + ":" + name;
}
Map<String, String> attributes = entity.getAttributes();
String parentMetaEntityId = attributes == null ? null : attributes.get("parentMetaEntityId");
if (entityType.endsWith("-database") && parentMetaEntityId != null) {
String metaSource = parentMetaEntityId.substring(parentMetaEntityId.indexOf(':') + 1);
return entityType + ":" + metaSource + "::" + name;
}
if (entityType.endsWith("-table") && parentMetaEntityId != null) {
String parent = parentMetaEntityId.substring(parentMetaEntityId.indexOf(':') + 1);
return entityType + ":" + parent + "::" + name;
}
throw new IllegalArgumentException("无法根据实体类型和属性推导已存在实体 ID: " + entityType + "/" + name);
}
private static void createLineage(
Client client,
String srcId,
String srcName,
String dstId,
String dstName,
String taskId,
String taskType) throws Exception {
client.createLineageRelationship(new CreateLineageRelationshipRequest()
.setSrcEntity(new LineageEntity().setId(srcId).setName(srcName))
.setDstEntity(new LineageEntity().setId(dstId).setName(dstName))
.setTask(new LineageTask().setId(taskId).setType(taskType)));
System.out.println("血缘创建成功: " + srcId + " -> " + dstId);
}
private static void ensureMetaEntityDef(
Client client,
String entityType,
CreateMetaEntityDefRequest request) throws Exception {
if (metaEntityDefExists(client, entityType)) {
System.out.println("实体定义已存在,跳过创建: " + entityType);
return;
}
client.createMetaEntityDef(request);
}
private static boolean metaEntityDefExists(Client client, String entityType) throws Exception {
try {
GetMetaEntityDefResponse response = client.getMetaEntityDef(
new GetMetaEntityDefRequest().setEntityType(entityType));
return response.getBody() != null
&& Boolean.TRUE.equals(response.getBody().getSuccess())
&& response.getBody().getMetaEntityDef() != null;
} catch (Exception e) {
if (isNotFound(e)) {
return false;
}
throw e;
}
}
private static void ensureCustomAttribute(
Client client,
String id,
CreateCustomAttributeRequest request) throws Exception {
if (customAttributeExists(client, id)) {
System.out.println("自定义属性已存在,跳过创建: " + id);
return;
}
client.createCustomAttribute(request);
}
private static boolean customAttributeExists(Client client, String id) throws Exception {
try {
GetCustomAttributeResponse response = client.getCustomAttribute(
new GetCustomAttributeRequest().setId(id));
return response.getBody() != null
&& Boolean.TRUE.equals(response.getBody().getSuccess())
&& response.getBody().getCustomAttribute() != null;
} catch (Exception e) {
if (isNotFound(e)) {
return false;
}
throw e;
}
}
private static boolean isNotFound(Exception e) {
String message = e.getMessage();
if (message == null) {
return false;
}
String lowerMessage = message.toLowerCase();
return lowerMessage.contains("not found")
|| lowerMessage.contains("not exist")
|| lowerMessage.contains("not_exists")
|| message.contains("不存在");
}
private static boolean isAlreadyExists(String message) {
if (message == null) {
return false;
}
String lowerMessage = message.toLowerCase();
return lowerMessage.contains("already exists") || lowerMessage.contains("already exist");
}
}3. 运行 Demo
运行前配置访问凭证。示例代码从环境变量读取 AccessKey,避免在代码中明文写入密钥。
export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"
export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"执行:
export DATAWORKS_SDK_VERSION="latest-version"
export JAVA_VERSION="8"
mvn -Ddataworks.sdk.version="${DATAWORKS_SDK_VERSION}" \
-Djava.version="${JAVA_VERSION}" \
compile exec:java说明:
Demo 按创建流程编写。运行前,请将类顶部的实体类型、实体名称、外部库表名称等业务标识改成您账号下尚未使用的唯一值。
BatchCreateMetaEntities需要同时检查最外层Success和每条Results[].Success,示例中的createOneEntity已包含这部分检查。实体创建成功后,查询侧可能有约 1 秒可见性延迟。示例在创建后加入了短暂等待,实际生产接入可改成有限重试。
4. 批量接入核心代码片段
上面的 Demo 用于快速跑通端到端流程。生产环境批量接入时,建议参考"数据准备模板"准备实体对象清单和血缘关系清单,再分别转换成 List<EntityRow> 和 List<LineageRow> 作为批量方法入参。
其中,EntityRow 对应实体对象清单,LineageRow 对应血缘关系清单。如果清单来自 CSV、Excel 或数据库,可以先把 attributes_json、custom_attributes_json、task_attributes_json 解析成 Map 后赋值给对应 Row 对象。下面示例是可嵌入前面 Demo 工程的核心代码片段,不是完整 CSV/Excel 读取器;文件读取和 JSON 解析可以按企业内部工程规范实现。
如果将以下代码放入上面的 CustomMetadataLineageDemo 类中,需要额外引入:
import java.util.ArrayList;
import java.util.LinkedHashMap;批量创建实体:
private static BatchWriteResult batchCreateEntities(Client client, List<EntityRow> entityRows) throws Exception {
BatchWriteResult report = new BatchWriteResult();
// BatchCreateMetaEntities 要求同一批次为同一种 EntityType,因此先按 EntityType 分组。
Map<String, List<EntityRow>> grouped = new LinkedHashMap<String, List<EntityRow>>();
for (EntityRow row : entityRows) {
if (!grouped.containsKey(row.entityType)) {
grouped.put(row.entityType, new ArrayList<EntityRow>());
}
grouped.get(row.entityType).add(row);
}
for (Map.Entry<String, List<EntityRow>> entry : grouped.entrySet()) {
List<EntityRow> sameTypeRows = entry.getValue();
for (int from = 0; from < sameTypeRows.size(); from += 5) {
int to = Math.min(from + 5, sameTypeRows.size());
List<EntityRow> rowBatch = sameTypeRows.subList(from, to);
List<BatchCreateMetaEntitiesRequest.BatchCreateMetaEntitiesRequestEntities> requestEntities =
new ArrayList<BatchCreateMetaEntitiesRequest.BatchCreateMetaEntitiesRequestEntities>();
for (EntityRow row : rowBatch) {
requestEntities.add(row.toRequestEntity());
}
BatchCreateMetaEntitiesResponse response = client.batchCreateMetaEntities(
new BatchCreateMetaEntitiesRequest().setEntities(requestEntities));
if (response.getBody() == null || !Boolean.TRUE.equals(response.getBody().getSuccess())) {
for (EntityRow row : rowBatch) {
report.failed.add("entity=" + row.name + ", reason=batch request failed");
}
continue;
}
List<MetaEntityWriteResult> results = response.getBody().getResults();
if (results == null || results.size() != rowBatch.size()) {
for (EntityRow row : rowBatch) {
report.failed.add("entity=" + row.name + ", reason=unexpected batch result");
}
continue;
}
for (int i = 0; i < results.size(); i++) {
MetaEntityWriteResult result = results.get(i);
if (Boolean.TRUE.equals(result.getSuccess())) {
report.successIds.add(result.getId());
} else {
EntityRow row = rowBatch.get(i);
report.failed.add("entity=" + row.name + ", reason=" + result.getErrorMessage());
}
}
}
}
return report;
}批量注册血缘关系:
private static BatchWriteResult batchCreateLineages(Client client, List<LineageRow> lineages) {
BatchWriteResult report = new BatchWriteResult();
for (LineageRow row : lineages) {
boolean success = false;
String lastError = null;
for (int attempt = 1; attempt <= 3; attempt++) {
try {
LineageTask task = new LineageTask().setId(row.taskId).setType(row.taskType);
if (row.taskAttributes != null && !row.taskAttributes.isEmpty()) {
task.setAttributes(row.taskAttributes);
}
client.createLineageRelationship(new CreateLineageRelationshipRequest()
.setSrcEntity(new LineageEntity().setId(row.srcId).setName(row.srcName))
.setDstEntity(new LineageEntity().setId(row.dstId).setName(row.dstName))
.setTask(task));
report.successIds.add(row.srcId + " -> " + row.dstId);
success = true;
break;
} catch (Exception e) {
lastError = e.getMessage();
try {
Thread.sleep(1000L * attempt);
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
lastError = interrupted.getMessage();
break;
}
}
}
if (!success) {
report.failed.add("lineage=" + row.srcId + " -> " + row.dstId + ", reason=" + lastError);
}
}
return report;
}辅助结构:
private static class EntityRow {
// 对应实体对象清单中的 entity_type、name、comment。
private String entityType;
private String name;
private String comment;
// 对应 attributes_json、custom_attributes_json。
private Map<String, String> attributes = new HashMap<String, String>();
private Map<String, List<String>> customAttributes = new HashMap<String, List<String>>();
// 对应 parent_entity_id、columns_json,主要用于扩展 Database/Table。
private String parentEntityId;
private String columnsJson;
private BatchCreateMetaEntitiesRequest.BatchCreateMetaEntitiesRequestEntities toRequestEntity() {
Map<String, String> mergedAttributes = new HashMap<String, String>();
if (attributes != null) {
mergedAttributes.putAll(attributes);
}
if (parentEntityId != null && parentEntityId.length() > 0) {
mergedAttributes.put("parentMetaEntityId", parentEntityId);
}
if (columnsJson != null && columnsJson.length() > 0) {
mergedAttributes.put("columns", columnsJson);
}
return new BatchCreateMetaEntitiesRequest.BatchCreateMetaEntitiesRequestEntities()
.setEntityType(entityType)
.setName(name)
.setComment(comment)
.setAttributes(mergedAttributes)
.setCustomAttributes(customAttributes);
}
}
private static class BatchWriteResult {
private final List<String> successIds = new ArrayList<String>();
private final List<String> failed = new ArrayList<String>();
}
private static class LineageRow {
private String srcId;
private String srcName;
private String dstId;
private String dstName;
private String taskId;
private String taskType;
private Map<String, String> taskAttributes = new HashMap<String, String>();
}实际工程中,可以把 BatchWriteResult.failed 写入文件、数据库或消息队列,作为断点续跑和人工修正的数据来源。对于失败项,不建议无差别无限重试;参数错误、实体类型不存在、血缘端点不存在等问题需要先修正数据清单,再重新执行。
推荐实践
优先注册为 DataWorks 自定义实体对象
非纳管 custom-{Type}:{Identifier} 对象可以直接参与血缘注册,但这种方式不利于后续管理。新接入场景建议:
使用
CreateMetaEntityDef创建实体定义。使用
BatchCreateMetaEntities创建实体对象。使用返回的实体 ID 注册血缘。
这样可以在数据地图中查看实体详情、维护属性、复用自定义属性,并支持后续治理。
区分自定义实体和扩展表实体
如果对象是报表、API、指标、系统、数据产品等业务概念,建议注册为自定义实体。如果对象具备明确的数据库、表、字段结构,且 DataWorks 未自动采集,建议注册为扩展表实体。
注册扩展表实体时,建议同时创建对应的扩展 Database 对象,再创建 Table 和内联 Column。这样可以保留外部数据源的库表字段层级,后续在数据地图中查看资产时更接近真实数据结构。
请勿通过 BatchCreateMetaEntities 创建 MaxCompute、DLF、EMR 等 DataWorks 采集器管理的预置实体。
对于已支持的类型:
若未采集,请通过采集器将其纳入 DataWorks 管理。
若已采集,请通过查询 API 获取其 ID 直接使用。
对于不支持的类型:
请使用扩展表实体进行创建。
通过 API 详情页确认参数
本文示例用于说明 API 组合方式和接入流程。字段必填性、命名规则、枚举值、错误码、SDK 代码等详细信息,请进入对应 API 详情页查看。
运维与更新
自定义实体和血缘关系接入后,建议把它作为一条持续同步链路维护,而不是一次性导入任务。常见运维动作如下:
运维动作 | 推荐 API | 说明 |
更新实体定义展示名、说明或属性定义 |
| 适合实体模型演进;修改前建议评估已有实体数据兼容性 |
更新自定义属性定义 |
| 适合调整展示、筛选、枚举值等属性配置 |
更新自定义实体或扩展表实体对象 |
| 适合更新实体说明、普通属性、自定义属性等信息 |
更新表或字段业务属性 |
| 适合维护表、字段上的 README 和自定义属性 |
删除废弃血缘 |
| 可先通过 |
删除废弃实体定义 |
| 删除前需要确认没有仍在使用的实体和血缘关系,谨慎使用强制删除能力 |
血缘关系变化时,推荐先从客户系统生成一份"期望血缘清单",再用 ListLineageRelationships 查询 DataWorks 当前已有关系,对比后执行:
期望清单有、DataWorks 没有:调用
CreateLineageRelationship补充。期望清单没有、DataWorks 仍存在:调用
DeleteLineageRelationship清理。两边都存在但任务信息变化:先删除旧关系,再创建新关系。
如果客户系统每天都有新增对象,建议使用增量同步;如果客户系统存在大量历史修订或人工维护数据,建议定期执行全量比对,避免 DataWorks 中出现漂移数据。
常见问题
现象 | 可能原因 | 处理建议 |
实体类型不存在 | 还没有创建实体定义,或创建后查询侧尚未可见 | 先调用 |
实体写入请求成功但部分实体失败 | 只判断了最外层 | 逐条检查 |
血缘端点不存在 | 上游或下游实体 ID 不正确,或刚创建的实体尚未可查 | 使用 |
字段血缘注册失败 | 字段 ID 手动拼接错误,或扩展表字段未随表注册 | 已采集字段优先使用 |
批量调用被限流 | 并发过高或短时间请求过多 | 降低并发度,对限流错误做指数退避重试 |
重跑后出现重复或旧血缘 | 没有用稳定 ID 和期望状态做比对 | 保存成功结果表;重跑前查询已有关系,跳过已存在关系或先删除旧关系 |
自定义属性在页面不可筛选 | 属性定义未开启搜索筛选,或适用实体类型不包含目标类型 | 检查 |
需要排查批量写入失败 | 失败信息缺少可定位上下文 | 记录失败请求的 |
页面或查询接口短时间查不到刚写入对象 | 查询索引存在短暂延迟 | 等待 1 秒到数秒后重试;批量工程中使用有限重试而不是立即判定失败 |
常见接入方案
方案一:把 BI 报表接入数据地图
适合希望在数据地图中查看报表依赖哪些表、某张表变更会影响哪些报表的场景。推荐流程:
使用
CreateMetaEntityDef创建报表实体定义。使用
CreateCustomAttribute创建业务域、负责人、来源系统等属性。使用
BatchCreateMetaEntities批量写入报表实体。使用
ListTables、ListColumns获取 DataWorks 已采集表字段 ID。使用
CreateLineageRelationship注册表到报表、字段到字段血缘。使用
ListLineages验证上下游链路。
方案二:把外部数据库表接入数据地图
适合企业已有第三方数据库或外部系统表,DataWorks 未自动采集,但希望纳入统一资产视图的场景。推荐流程:
使用
CreateMetaEntityDef创建扩展表定义。使用
BatchCreateMetaEntities先写入外部数据库,再写入外部表,字段随表内联写入。使用
CreateCustomAttribute补充来源系统、数据域等业务属性。使用
CreateLineageRelationship建立 DataWorks 表与外部表的血缘关系。使用
GetTable、GetColumn、ListLineages验证结果。
方案三:补充 DataWorks 已采集资产之间的血缘
适合企业存在外部加工链路或历史链路,希望补充到数据地图中的场景。推荐流程:
使用
ListTables、ListColumns、ListDatasets等 API 获取实体 ID。根据实际上下游方向调用
CreateLineageRelationship。使用
ListLineages查询验证。
这种场景不需要创建自定义实体定义,也不需要写入实体对象。