使用REST API和CLI自动化管理工作流

REST APICLIAirflow中两种常用的自动化管理工具,其中REST API允许通过HTTP请求自动化管理Airflow的工作流(DAG)和任务,CLI可以直接通过命令来触发、管理DAG。

REST APICLI的更多信息,请参见Airflow官网文档

使用REST API

  1. 获取登录Airflow实例所需的凭证(Token)。

    调用CreateAirflowLoginToken接口获取Token。

    说明

    登录Token的有效时间为2小时。超过2小时还未登录Airflow实例,则需要重新获取。

  2. 登录Airflow实例。

    通过cURL命令登录实例的示例:

    curl -i "https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797****/login?token=xxxxx"
    说明

    请替换请求示例中的以下内容:data-cn-beijing-dms.aliyuncs.com(Airflow实例所在地域)、airflow(实例名称)、33***(DMS租户ID)、8691522017****(工作空间ID)、af-b3a7797****(Airflow实例ID)和Token。

    返回结果示例:

    HTTP/2 302
    date: Thu, 05 Jun 2025 09:07:18 GMT
    content-type: text/html; charset=utf-8
    content-length: 309
    vary: Origin
    vary: Access-Control-Request-Method
    vary: Access-Control-Request-Headers
    server: envoy
    location: /airflow/1/865084104****/af-ehrmszbxk735bl3x5****/home
    cache-control: no-store
    x-robots-tag: noindex, nofollow
    x-envoy-upstream-service-time: 248
    set-cookie: session=xxxxxxx; Expires=Sat, 05 Jul 2025 09:07:18 GMT; Secure; HttpOnly; Path=/; SameSite=None; Partitioned
    x-content-type-options: nosniff
    x-xss-protection: 0
    referrer-policy: no-referrer
    eagleeye-traceid: 0a032a1517491144384047056ec81f
    strict-transport-security: max-age=31536000
    timing-allow-origin: *
    
    <!doctype html>
    <html lang=en>
    <title>Redirecting...</title>
    <h1>Redirecting...</h1>
    <p>You should be redirected automatically to the target URL: <a href="/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home">/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home</a>. If not, click the link.

    登录实例后,DMS自动会为您返回Session信息。此Session信息用于请求REST API。

  3. 调用REST API。

    您可参考社区版Airflow REST API文档调用API。

    请求示例:

    curl 'https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797y****/api/v1/health' -b 'session=xxx' 

    返回结果示例:

    {
      "dag_processor": {
        "latest_dag_processor_heartbeat": null,
        "status": null
      },
      "metadatabase": {
        "status": "healthy"
      },
      "scheduler": {
        "latest_scheduler_heartbeat": "2025-06-05T09:13:03.075907+00:00",
        "status": "healthy"
      },
      "triggerer": {
        "latest_triggerer_heartbeat": null,
        "status": null
      }
    }

使用CLI命令行

  1. 获取登录Airflow实例所需的凭证(Token)。

    调用CreateAirflowLoginToken接口获取Token。

    说明

    登录Token的有效时间为2小时。超过2小时还未登录Airflow实例,则需要重新获取。

  2. 登录Airflow实例。

    登录实例后,DMS自动会为您返回Session信息。此Session信息用于使用CLI执行Airflow命令。

    如下示例为使用cURL命令发送登录实例的请求:

    curl -i "https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797****/login?token=xxxxx"
    说明

    请替换请求示例中的以下内容:data-cn-beijing-dms.aliyuncs.com(Airflow实例所在地域)、airflow(实例名称)、33***(DMS租户ID)、8691522017****(工作空间ID)、af-b3a7797****(Airflow实例ID)和Token。

    返回结果示例:

    HTTP/2 302
    date: Thu, 05 Jun 2025 09:07:18 GMT
    content-type: text/html; charset=utf-8
    content-length: 309
    vary: Origin
    vary: Access-Control-Request-Method
    vary: Access-Control-Request-Headers
    server: envoy
    location: /airflow/1/865084104****/af-ehrmszbxk735bl3x5****/home
    cache-control: no-store
    x-robots-tag: noindex, nofollow
    x-envoy-upstream-service-time: 248
    set-cookie: session=xxxxxxx; Expires=Sat, 05 Jul 2025 09:07:18 GMT; Secure; HttpOnly; Path=/; SameSite=None; Partitioned
    x-content-type-options: nosniff
    x-xss-protection: 0
    referrer-policy: no-referrer
    eagleeye-traceid: 0a032a1517491144384047056ec81f
    strict-transport-security: max-age=31536000
    timing-allow-origin: *
    
    <!doctype html>
    <html lang=en>
    <title>Redirecting...</title>
    <h1>Redirecting...</h1>
    <p>You should be redirected automatically to the target URL: <a href="/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home">/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home</a>. If not, click the link.
  3. 调用CLI接口执行Airflow命令。

    请求示例:

    curl  -X 'POST' 'https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/86915220175****/af-b3a7f110a6vmvn797y****/api/v1/command?command=version' -b 'session=faf9009b-exxx' 

    返回结果示例:

    {
      "stderr": "",
      "stdout": "2.10.4\n"
    }

示例:在Python环境下调用REST API

  1. 执行如下命令安装DMS SDK。

    pip install alibabacloud_dms20250414
  2. 调用REST API。

    1. 创建如下Python脚本示例,文件名称为dms_rest_api.py。

      # -*- coding: utf-8 -*-
      # This file is auto-generated, don't edit it. Thanks.
      import os
      import sys
      
      from typing import List
      
      from alibabacloud_dms20250414.client import Client as Dms20250414Client
      from alibabacloud_credentials.client import Client as CredentialClient
      from alibabacloud_dms20250414.models import CreateAirflowLoginTokenResponseBodyData
      from alibabacloud_tea_openapi import models as open_api_models
      from alibabacloud_dms20250414 import models as dms_20250414_models
      from alibabacloud_tea_util import models as util_models
      from alibabacloud_tea_util.client import Client as UtilClient
      import requests
      
      endpoints = {
          "cn-beijing": "dms.cn-beijing.aliyuncs.com",
          "cn-hangzhou": "dms.cn-hangzhou.aliyuncs.com",
          "cn-shanghai": "dms.cn-shanghai.aliyuncs.com",
          "cn-shenzhen": "dms.cn-shenzhen.aliyuncs.com",
          "ap-southeast-1": "dms.ap-southeast-1.aliyuncs.com"
      }
      
      
      class DmsAirflowRestApi:
      
          def __init__(self, endpoint: str):
              """
              # 使用凭据初始化账号Client
              @return: Client
              @throws Exception
              """
              # 工程代码建议使用更安全的无AK方式,凭据配置方式请参见:https://help.aliyun.com/document_detail/378659.html。
              credential = CredentialClient()
              config = open_api_models.Config(
                  credential=credential
              )
              # Endpoint 请参考 https://api.aliyun.com/product/Dms
              config.endpoint = endpoint
              self.client = Dms20250414Client(config)
      
          def get_login_token(self, airflowId: str) -> CreateAirflowLoginTokenResponseBodyData:
              create_airflow_login_token_request = dms_20250414_models.CreateAirflowLoginTokenRequest(
                  airflow_id=airflowId
              )
              runtime = util_models.RuntimeOptions()
              try:
                  # 复制代码运行请自行打印 API 的返回值
                  response = self.client.create_airflow_login_token_with_options(create_airflow_login_token_request, runtime)
                  data = response.body.data
                  return data
              except Exception as error:
                  # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。
                  # 错误 message
                  print(error.message)
                  # 诊断地址
                  print(error.data.get("Recommend"))
                  UtilClient.assert_as_string(error.message)
      
          def get_session_cookie(self, login_token: CreateAirflowLoginTokenResponseBodyData):
              login_url = f'{login_token.host}/login?token={login_token.token}'
              try:
                  # 发送请求
                  response = requests.get(login_url)
                  response.raise_for_status()
      
                  print(response.headers)
                  # 获取 session cookie
                  if 'session' in response.cookies:
                      return response.cookies['session']
      
                  return None
      
              except Exception as e:
                  print(f"Error: {e}")
                  return None
      
          def list_dags(self, login_token: CreateAirflowLoginTokenResponseBodyData, session_cookie: str):
              login_url = f'{login_token.host}/api/v1/dags'
              try:
                  cookies = {'session': session_cookie}
                  response = requests.get(login_url, cookies=cookies)
                  response.raise_for_status()
      
                  print(response.json())
      
              except Exception as e:
                  print(f"Error: {e}")
      
      
      if __name__ == '__main__':
          region = sys.argv[1]
          airflowId = sys.argv[2]
      
          endpoint = endpoints.get(region)
          restApi = DmsAirflowRestApi(endpoint)
      
          login_token = restApi.get_login_token(airflowId)
          session_cookie = restApi.get_session_cookie(login_token)
          restApi.list_dags(login_token, session_cookie)
      
      说明

      您可调用CreateAirflowLoginToken接口获取AirflowID。

    2. 配置AccessKey IDAccessKey Secret

      export ALIBABA_CLOUD_ACCESS_KEY_ID=xxx
      export ALIBABA_CLOUD_ACCESS_KEY_SECRET=xxx
    3. 执行Python脚本。

      python3 dms_rest_api.py ${region_id} ${airflow_id}

      示例:python3 dms_rest_api.py cn-hangzhou af-sxsssxx

支持的CLI命令行

CLI命令行的使用方法,请参见官方CLI使用文档

cheat-sheet connections add
connections delete
dags backfill
dags delete
dags list
dags list-jobs
dags list-import-errors
dags list-runs
dags next-execution
dags pause
dags report
dags reserialize
dags show
dags state
dags test
dags trigger
dags unpause
db clean
providers behaviours
providers get
providers hooks
providers links
providers list
providers notifications
providers secrets
providers triggerer
providers widgets
tasks clear
tasks failed-deps
tasks list
tasks render
tasks state
tasks states-for-dag-run
tasks test
variables delete
variables get
variables set
variables list
version