使用 DataWorks OpenAPI 进行血缘查询

本文档旨在指导您如何使用 DataWorks OpenAPI(2024-05-18),以编程方式查询数据表和字段的血缘关系。我们将通过具体的 API 调用示例和 SDK 代码,帮助您快速上手,满足自动化、批量化的血缘分析需求。

什么是数据血缘?

想象一下,你正在看一份重要的商业报表,上面显示本季度的销售额大幅增长。作为一个严谨的数据分析师或管理者,你脑海中会立刻浮现出几个问题:

  • 这个“销售额”指标是如何计算出来的?

  • 它来源于哪些原始的业务数据?是订单表?还是支付流水表?

  • 在从原始数据到最终报表的整个过程中,它经过了哪些处理步骤?(比如清洗、转换、聚合等)

  • 如果这个指标的数据出错了,会影响到下游哪些其他报表或应用

    image

清晰的数据血缘至关重要,它主要带来以下核心价值:

  1. 数据溯源与问题排查
    当发现数据异常或错误时,可以沿着血缘关系向上追溯,快速定位到是哪个环节的计算逻辑或源头数据出了问题,极大地缩短故障排查时间。

  2. 影响分析
    当需要对某个数据表结构、字段或计算逻辑进行变更时,可以通过血缘关系向下分析,精确评估该变更会影响到哪些下游的数据和业务报表,从而避免“牵一发而动全身”的未知风险。

  3. 数据治理与可信度
    清晰的血缘关系是数据资产管理、数据标准落地和数据质量监控的基础。它让数据的来龙去脉变得透明,从而增强业务方对数据的信任度。

  4. 成本优化与资产盘点
    通过分析血缘,可以识别出那些“无人问津”(没有下游消费)的数据表或计算任务,为数据仓库的成本优化和老旧资产的下线提供决策依据。

在 DataWorks 中,系统会自动解析并记录各种计算任务(如MaxCompute SQL、EMR Spark等)产生的数据血缘关系。而通过 DataWorks OpenAPI,您可以以编程的方式访问这些血缘信息,从而将血缘分析能力集成到您自己的数据管理平台或自动化运维流程中。

准备工作:获取实体 ID

在进行任何血缘查询之前,您首先需要获取目标数据(表或字段)的唯一标识符,即实体ID (Entity ID)。实体 ID 是调用元数据和血缘相关 API 的核心凭据。

您可以通过以下两种方式获取实体 ID:

1. 通过 DataWorks 界面获取

对于少量、已知的表或字段,通过界面手动复制是最快捷的方式。

获取表的实体 ID

  1. 进入 DataWorks 的数据地图模块。

  2. 搜索并进入您要查询的表详情页。

  3. 在左侧的表基础信息面板中,找到实体ID并复制。

    image

获取字段的实体 ID

  1. 在目标表的详情页,切换到血缘信息选项卡,并选择字段血缘

  2. 在字段血缘图谱中,点击您关注的字段节点。

  3. 右侧会弹出该字段的详情面板,在面板中找到实体ID并复制。

    image

2. 通过 API 批量获取

当您需要批量获取实体 ID 时,手动操作会非常繁琐。此时,建议使用 OpenAPI 进行批量查询:

使用 ListLineages API 查询血缘

获取实体 ID 后,您就可以使用核心的 ListLineages API 来查询其上下游血缘关系了。

1. API 核心参数说明

以下是 ListLineages API 的关键请求参数。您可以在 OpenAPI 门户进行在线调试。

参数

类型

描述

SrcEntityId

String

查询下游血缘时使用。传入源头(上游)实体ID,API 将返回该实体的所有下游血缘。

DstEntityId

String

查询上游血缘时使用。传入目标(下游)实体ID,API 将返回该实体的所有上游血缘。

SrcEntityName

String

配合 DstEntityId 使用,用于模糊搜索和过滤上游实体。

DstEntityName

String

配合 SrcEntityId 使用,用于模糊搜索和过滤下游实体。

NeedAttachRelationship

Boolean

是否在响应中包含详细的血缘关系信息。建议设置为 true 以获取完整上下文。

重要
  • 如果同时提供SrcEntityIdDstEntityId,会返回给定上下游血缘实体下的血缘关系。

  • 如果SrcEntityIdDstEntityId为同一ID,会返回该实体只指向自己的血缘关系。

2. 调用示例

假设我们有一个 MaxCompute 表,其实体IDmaxcompute-table:::test_project::test_table

示例一:查询该表的下游血缘

要查询这个表的所有下游表,您需要将它作为源头 (Source)

  • SrcEntityId: maxcompute-table:::test_project::test_table

  • NeedAttachRelationship: true

如果只想查找下游表中名称包含 "report" 的表,可以追加 DstEntityName 参数:

  • DstEntityName: report

示例二:查询该表的上游血缘

要查询是哪些表或任务生成了这个表,您需要将它作为目标 (Destination)

  • DstEntityId: maxcompute-table:::test_project::test_table

  • NeedAttachRelationship: true

同样,您也可以通过 SrcEntityName 参数来过滤上游来源。

3. 理解 API 响应

调用 ListLineages 成功后,您会得到一个包含多条血缘关系的列表。每一条血缘关系都包含源实体、目标实体以及它们之间的关联信息。

单条血缘关系响应示例 (JSON):

{
  "SrcEntity": {
    "Id": "maxcompute-table:::test_project::table_from",
    "Name": "table_from",
    "Attributes": {
      "rawEntityId": "maxcompute-table:::test_project::table_from"
    }
  },
  "DstEntity": {
    "Id": "maxcompute-table:::test_project::table_to",
    "Name": "table_to",
    "Attributes": {
      "project": "test_project",
      "region": "cn-shanghai",
      "table": "table_to"
    }
  },
  "Relationships": [
    {
      "Id": "123456789:maxcompute-table.test_project.table_from:maxcompute-table.test_project.table_to:maxcompute.SQL.76543xxx",
      "CreateTime": 1761089163548,
      "Task": {
        "Id": "76543xxx",
        "Type": "dataworks-sql",
        "Attributes": {
          "engine": "maxcompute",
          "channel": "1st",
          "taskInstanceId": "12345xxx",
          "projectId": "123456",
          "taskId": "76543xxx"
        }
      }
    }
  ]
}

如何解读响应:

  • SrcEntityDstEntity: 分别代表血缘的上游和下游实体。您可以拿到它们的 Id,并进一步调用GetTableGetColumnAPI 来获取更详细的元数据信息。

  • Relationships: 描述了 SrcEntityDstEntity 是如何关联起来的。

    • Task: 描述产生这条血缘关系的任务信息。如果任务是 DataWorks 调度任务,Task.Attributes 中会包含taskIdtaskInstanceId。您可以使用这些 ID 调用GetTaskAPI 来获取任务的详细定义和运行状态。

Java SDK 实战演练

下面以 Java SDK 为例,展示如何通过代码实现完整的血缘查询流程。

1. 环境准备

  • JDK 版本: 确保您已安装 JDK 8 或更高版本。

  • Maven 依赖: 在您的项目 pom.xml 文件中添加以下依赖。请将 ${latest.version} 替换为 SDK 的最新版本号(在Java > 快速入门 > 安装方式 > Apache Maven查看)

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>dataworks_public20240518</artifactId>
    <version>${latest.version}</version>
</dependency>

2. 完整代码示例

以下代码演示了如何初始化客户端、查询指定表的上游和下游血缘,并打印关键信息。

import java.util.List;
import java.util.Map;

import com.aliyun.dataworks_public20240518.Client;
import com.aliyun.dataworks_public20240518.models.GetTableRequest;
import com.aliyun.dataworks_public20240518.models.GetTableResponse;
import com.aliyun.dataworks_public20240518.models.LineageEntity;
import com.aliyun.dataworks_public20240518.models.LineageRelationship;
import com.aliyun.dataworks_public20240518.models.LineageTask;
import com.aliyun.dataworks_public20240518.models.ListLineagesRequest;
import com.aliyun.dataworks_public20240518.models.ListLineagesResponse;
import com.aliyun.dataworks_public20240518.models.ListLineagesResponseBody.ListLineagesResponseBodyPagingInfo;
import com.aliyun.dataworks_public20240518.models.ListLineagesResponseBody.ListLineagesResponseBodyPagingInfoLineages;
import com.aliyun.dataworks_public20240518.models.Table;
import com.aliyun.tea.TeaException;

public class LineageQuerySample {
  /**
     * <b>description</b> :
     * <p>使用凭据初始化账号 Client</p>
     *
     * @return Client
     * @throws Exception
     */
  public static com.aliyun.dataworks_public20240518.Client createClient() throws Exception {
    com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config()
      // 您的 AccessKey ID
      .setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      // 您的 AccessKey Secret
      .setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    // Endpoint 请参考 https://api.aliyun.com/product/dataworks-public
    config.endpoint = "dataworks.cn-hangzhou.aliyuncs.com";
    return new com.aliyun.dataworks_public20240518.Client(config);
  }

  public static void main(String[] args_) throws Exception {
    Client client = LineageQuerySample.createClient();
    // 待查询的表实体ID,此处需要替换为待查询的mc表实体ID
    String tableId = "maxcompute-table:::test_project::test_table";
    try {
      // 1. 查询上游血缘
      ListLineagesRequest listLineagesRequest = new ListLineagesRequest()
        .setDstEntityId(tableId)
        .setNeedAttachRelationship(true)
        .setPageNumber(1)
        // 默认查询10条记录,最大为100
        .setPageSize(10);
      // 支持对血缘上游表名称进行关键词匹配过滤
      listLineagesRequest.setSrcEntityName("demo");
      ListLineagesResponse listLineagesResponse = client.listLineages(listLineagesRequest);
      String requestId = listLineagesResponse.getBody().getRequestId();
      System.out.println("\n查询上游血缘");
      // 打印requestId,辅助问题排查
      System.out.println(requestId);
      ListLineagesResponseBodyPagingInfo pagingInfo = listLineagesResponse.getBody().getPagingInfo();
      if (pagingInfo.getTotalCount() > 0 && pagingInfo.getLineages() != null) {
        for (ListLineagesResponseBodyPagingInfoLineages lineage : pagingInfo.getLineages()) {
          // 获取单条血缘,查询对应血缘上游表
          LineageEntity srcEntity = lineage.getSrcEntity();
          System.out.println("============================================");
          System.out.println("ID: " + srcEntity.getId());
          System.out.println("Name: " + srcEntity.getName());
          // 获取上游表信息
          Table table = getTable(client, srcEntity.getId());
          if (table != null) {
            System.out.println("Comment: " + table.getComment());
            System.out.println("Create Time: " + table.getCreateTime());
            System.out.println("Modify Time: " + table.getModifyTime());
          }
        }
      }
            // 2. 查询下游血缘
            listLineagesRequest = new ListLineagesRequest()
                    .setSrcEntityId(tableId)
                    .setNeedAttachRelationship(true)
                    .setPageNumber(1)
                    // 默认查询10条记录,最大为100
                    .setPageSize(10);
            listLineagesResponse = client.listLineages(listLineagesRequest);
            requestId = listLineagesResponse.getBody().getRequestId();
            System.out.println("\n查询下游血缘");
            // 打印requestId,辅助问题排查
            System.out.println(requestId);
            pagingInfo = listLineagesResponse.getBody().getPagingInfo();
            if (pagingInfo.getTotalCount() > 0 && pagingInfo.getLineages() != null) {
                for (ListLineagesResponseBodyPagingInfoLineages lineage : pagingInfo.getLineages()) {
                    // 获取单条血缘,查询对应血缘下游表
                    LineageEntity dstEntity = lineage.getDstEntity();
                    System.out.println("============================================");
                    System.out.println("ID: " + dstEntity.getId());
                    System.out.println("Name: " + dstEntity.getName());
                    // 获取下游表信息
                    Table table = getTable(client, dstEntity.getId());
                    if (table != null) {
                        System.out.println("Comment: " + table.getComment());
                        System.out.println("Create Time: " + table.getCreateTime());
                        System.out.println("Modify Time: " + table.getModifyTime());
                    }
                    // 解析血缘关系
                    List<LineageRelationship> relationships = lineage.getRelationships();
                    if (relationships != null) {
                        for (LineageRelationship relationship : relationships) {
                            System.out.println("\n\tRelationshipId: " + relationship.getId());
                            System.out.println("\tRelationshipCreateTime: " + relationship.getCreateTime());
                            // 解析任务详情
                            LineageTask task = relationship.getTask();
                            Map<String, String> attributes = task.getAttributes();
                            // 对于DataWorks调度任务,可以通过attributes获取任务ID和任务实例ID
                            if (attributes != null && attributes.containsKey("taskId") && attributes.containsKey("taskInstanceId")) {
                                System.out.println("\tTaskId: " + attributes.get("taskId"));
                                System.out.println("\tTaskInstanceId: " + attributes.get("taskInstanceId"));
                            }
                        }
                    }
                }
            }
        } catch (TeaException error) {
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        } catch (Exception _error) {
            TeaException error = new TeaException(_error.getMessage(), _error);
            // 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
            // 错误 message
            System.out.println(error.getMessage());
            // 诊断地址
            System.out.println(error.getData().get("Recommend"));
            com.aliyun.teautil.Common.assertAsString(error.message);
        }
    }

    public static Table getTable(Client client, String tableId) {
        // 根据ID查询表信息
        GetTableRequest getTableRequest = new GetTableRequest()
                .setId(tableId)
                .setIncludeBusinessMetadata(true);
        try {
            GetTableResponse getTableResponse = client.getTable(getTableRequest);
            return getTableResponse.getBody().getTable();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return null;
    }
}