REST API和CLI是Airflow中两种常用的自动化管理工具,其中REST API允许通过HTTP请求自动化管理Airflow的工作流(DAG)和任务,CLI可以直接通过命令来触发、管理DAG。
REST API和CLI的更多信息,请参见Airflow官网文档。
使用REST API
获取登录Airflow实例所需的凭证(Token)。
调用CreateAirflowLoginToken接口获取Token。
说明登录Token的有效时间为2小时。超过2小时还未登录Airflow实例,则需要重新获取。
登录Airflow实例。
通过cURL命令登录实例的示例:
curl -i "https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797****/login?token=xxxxx"
说明请替换请求示例中的以下内容:data-cn-beijing-dms.aliyuncs.com(Airflow实例所在地域)、airflow(实例名称)、33***(DMS租户ID)、8691522017****(工作空间ID)、af-b3a7797****(Airflow实例ID)和Token。
返回结果示例:
HTTP/2 302 date: Thu, 05 Jun 2025 09:07:18 GMT content-type: text/html; charset=utf-8 content-length: 309 vary: Origin vary: Access-Control-Request-Method vary: Access-Control-Request-Headers server: envoy location: /airflow/1/865084104****/af-ehrmszbxk735bl3x5****/home cache-control: no-store x-robots-tag: noindex, nofollow x-envoy-upstream-service-time: 248 set-cookie: session=xxxxxxx; Expires=Sat, 05 Jul 2025 09:07:18 GMT; Secure; HttpOnly; Path=/; SameSite=None; Partitioned x-content-type-options: nosniff x-xss-protection: 0 referrer-policy: no-referrer eagleeye-traceid: 0a032a1517491144384047056ec81f strict-transport-security: max-age=31536000 timing-allow-origin: * <!doctype html> <html lang=en> <title>Redirecting...</title> <h1>Redirecting...</h1> <p>You should be redirected automatically to the target URL: <a href="/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home">/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home</a>. If not, click the link.
登录实例后,DMS自动会为您返回Session信息。此Session信息用于请求REST API。
调用REST API。
您可参考社区版Airflow REST API文档调用API。
请求示例:
curl 'https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797y****/api/v1/health' -b 'session=xxx'
返回结果示例:
{ "dag_processor": { "latest_dag_processor_heartbeat": null, "status": null }, "metadatabase": { "status": "healthy" }, "scheduler": { "latest_scheduler_heartbeat": "2025-06-05T09:13:03.075907+00:00", "status": "healthy" }, "triggerer": { "latest_triggerer_heartbeat": null, "status": null } }
使用CLI命令行
获取登录Airflow实例所需的凭证(Token)。
调用CreateAirflowLoginToken接口获取Token。
说明登录Token的有效时间为2小时。超过2小时还未登录Airflow实例,则需要重新获取。
登录Airflow实例。
登录实例后,DMS自动会为您返回Session信息。此Session信息用于使用CLI执行Airflow命令。
如下示例为使用cURL命令发送登录实例的请求:
curl -i "https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/8691522017****/af-b3a7f110a6vmvn797****/login?token=xxxxx"
说明请替换请求示例中的以下内容:data-cn-beijing-dms.aliyuncs.com(Airflow实例所在地域)、airflow(实例名称)、33***(DMS租户ID)、8691522017****(工作空间ID)、af-b3a7797****(Airflow实例ID)和Token。
返回结果示例:
HTTP/2 302 date: Thu, 05 Jun 2025 09:07:18 GMT content-type: text/html; charset=utf-8 content-length: 309 vary: Origin vary: Access-Control-Request-Method vary: Access-Control-Request-Headers server: envoy location: /airflow/1/865084104****/af-ehrmszbxk735bl3x5****/home cache-control: no-store x-robots-tag: noindex, nofollow x-envoy-upstream-service-time: 248 set-cookie: session=xxxxxxx; Expires=Sat, 05 Jul 2025 09:07:18 GMT; Secure; HttpOnly; Path=/; SameSite=None; Partitioned x-content-type-options: nosniff x-xss-protection: 0 referrer-policy: no-referrer eagleeye-traceid: 0a032a1517491144384047056ec81f strict-transport-security: max-age=31536000 timing-allow-origin: * <!doctype html> <html lang=en> <title>Redirecting...</title> <h1>Redirecting...</h1> <p>You should be redirected automatically to the target URL: <a href="/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home">/airflow/1/8650841047****/af-ehrmszbxk735bl3x5v****/home</a>. If not, click the link.
调用CLI接口执行Airflow命令。
请求示例:
curl -X 'POST' 'https://data-cn-beijing-dms.aliyuncs.com/airflow/33***/86915220175****/af-b3a7f110a6vmvn797y****/api/v1/command?command=version' -b 'session=faf9009b-exxx'
返回结果示例:
{ "stderr": "", "stdout": "2.10.4\n" }
示例:在Python环境下调用REST API
执行如下命令安装DMS SDK。
pip install alibabacloud_dms20250414
调用REST API。
创建如下Python脚本示例,文件名称为dms_rest_api.py。
# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. import os import sys from typing import List from alibabacloud_dms20250414.client import Client as Dms20250414Client from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_dms20250414.models import CreateAirflowLoginTokenResponseBodyData from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dms20250414 import models as dms_20250414_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient import requests endpoints = { "cn-beijing": "dms.cn-beijing.aliyuncs.com", "cn-hangzhou": "dms.cn-hangzhou.aliyuncs.com", "cn-shanghai": "dms.cn-shanghai.aliyuncs.com", "cn-shenzhen": "dms.cn-shenzhen.aliyuncs.com", "ap-southeast-1": "dms.ap-southeast-1.aliyuncs.com" } class DmsAirflowRestApi: def __init__(self, endpoint: str): """ # 使用凭据初始化账号Client @return: Client @throws Exception """ # 工程代码建议使用更安全的无AK方式,凭据配置方式请参见:https://help.aliyun.com/document_detail/378659.html。 credential = CredentialClient() config = open_api_models.Config( credential=credential ) # Endpoint 请参考 https://api.aliyun.com/product/Dms config.endpoint = endpoint self.client = Dms20250414Client(config) def get_login_token(self, airflowId: str) -> CreateAirflowLoginTokenResponseBodyData: create_airflow_login_token_request = dms_20250414_models.CreateAirflowLoginTokenRequest( airflow_id=airflowId ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 response = self.client.create_airflow_login_token_with_options(create_airflow_login_token_request, runtime) data = response.body.data return data except Exception as error: # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 # 错误 message print(error.message) # 诊断地址 print(error.data.get("Recommend")) UtilClient.assert_as_string(error.message) def get_session_cookie(self, login_token: CreateAirflowLoginTokenResponseBodyData): login_url = f'{login_token.host}/login?token={login_token.token}' try: # 发送请求 response = requests.get(login_url) response.raise_for_status() print(response.headers) # 获取 session cookie if 'session' in response.cookies: return response.cookies['session'] return None except Exception as e: print(f"Error: {e}") return None def list_dags(self, login_token: CreateAirflowLoginTokenResponseBodyData, session_cookie: str): login_url = f'{login_token.host}/api/v1/dags' try: cookies = {'session': session_cookie} response = requests.get(login_url, cookies=cookies) response.raise_for_status() print(response.json()) except Exception as e: print(f"Error: {e}") if __name__ == '__main__': region = sys.argv[1] airflowId = sys.argv[2] endpoint = endpoints.get(region) restApi = DmsAirflowRestApi(endpoint) login_token = restApi.get_login_token(airflowId) session_cookie = restApi.get_session_cookie(login_token) restApi.list_dags(login_token, session_cookie)
说明您可调用CreateAirflowLoginToken接口获取AirflowID。
配置AccessKey ID和AccessKey Secret。
export ALIBABA_CLOUD_ACCESS_KEY_ID=xxx export ALIBABA_CLOUD_ACCESS_KEY_SECRET=xxx
执行Python脚本。
python3 dms_rest_api.py ${region_id} ${airflow_id}
示例:
python3 dms_rest_api.py cn-hangzhou af-sxsssxx
。
支持的CLI命令行
CLI命令行的使用方法,请参见官方CLI使用文档。
cheat-sheet connections add
connections delete
dags backfill
dags delete
dags list
dags list-jobs
dags list-import-errors
dags list-runs
dags next-execution
dags pause
dags report
dags reserialize
dags show
dags state
dags test
dags trigger
dags unpause
db clean
providers behaviours
providers get
providers hooks
providers links
providers list
providers notifications
providers secrets
providers triggerer
providers widgets
tasks clear
tasks failed-deps
tasks list
tasks render
tasks state
tasks states-for-dag-run
tasks test
variables delete
variables get
variables set
variables list
version