文档

使用Apache Airflow调度MaxCompute

更新时间:

MaxCompute支持您使用Apache Airflow通过Python接口实现作业调度。本文为您介绍如何使用Apache Airflow的Python Operator调度MaxCompute作业。

背景信息

Apache Airflow是Airbnb开源的、基于Python编写的调度工具,基于有向无环图(DAG),可以定义一组有依赖的作业,并按照依赖顺序依次执行作业。还支持通过Python定义子作业,并支持各种Operators操作器,灵活性大,能满足用户的各种需求。更多Apache Airflow信息,请参见Apache Airflow

前提条件

在执行操作前,请确认您已满足如下条件:

  • 已安装PyODPS。

    更多安装PyODPS操作,请参见安装PyODPS

  • 已安装并启动Apache Airflow。

    更多安装及启动Apache Airflow操作,请参见Apache Airflow快速入门

    本文中的Apache Airflow示例版本为1.10.7。

步骤一:在Apache Airflow家目录编写调度Python脚本

编写作业调度Python脚本并保存为.py文件,脚本文件中会呈现完整的调度逻辑及对应的调度作业名称。假设Python脚本名称为Airiflow_MC.py,脚本内容示例如下:

# -*- coding: UTF-8 -*-
import sys
import os
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from configparser import ConfigParser
import time
reload(sys)
sys.setdefaultencoding('utf8')
#修改系统默认编码
#MaxCompute参数设置
options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
cfg = ConfigParser()
cfg.read("odps.ini")
print(cfg.items())
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# 不建议直接使用 Access Key ID / Access Key Secret 字符串
odps = ODPS(cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')),cfg.get("odps",os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')),cfg.get("odps","project"),cfg.get("odps","endpoint"))
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retry_delay': timedelta(minutes=5),
    'start_date':datetime(2020,1,15)
    # 'email': ['airflow@example.com'],
    # 'email_on_failure': False,
    # 'email_on_retry': False,
    # 'retries': 1,
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
#调度流程
dag = DAG(
    'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
def read_sql(sqlfile):
    with io.open(sqlfile, encoding='utf-8', mode='r') as f:
        sql=f.read()
    f.closed
    return sql
#调度作业
def get_time():
    print '当前时间是{}'.format(time.time())
    return time.time()
#调度作业
def mc_job ():

    project = odps.get_project()  # 取到默认项目。
    instance=odps.run_sql("select * from long_chinese;")
    print(instance.get_logview_address())
    instance.wait_for_success()
    with instance.open_reader() as reader:
        count = reader.count
    print("查询表数据条数:{}".format(count))
    for record in reader:
        print record
    return count
t1 = PythonOperator (
    task_id = 'get_time' ,
    provide_context = False ,
    python_callable = get_time,
    dag = dag )

t2 = PythonOperator (
    task_id = 'mc_job' ,
    provide_context = False ,
    python_callable = mc_job ,
    dag = dag )
t2.set_upstream(t1)

步骤二:提交调度脚本

  1. 在系统的命令行窗口执行如下命令提交步骤一中编写的调度作业Python脚本。

    python Airiflow_MC.py
  2. 在系统的命令行窗口执行如下命令生成调度流程并测试调度作业。

    # print the list of active DAGs
    airflow list_dags
    
    # prints the list of tasks the "tutorial" dag_id
    airflow list_tasks Airiflow_MC
    
    # prints the hierarchy of tasks in the tutorial DAG
    airflow list_tasks Airiflow_MC --tree
    #测试task
    airflow test Airiflow_MC get_time 2010-01-16
    airflow test Airiflow_MC mc_job 2010-01-16

步骤三:运行调度作业

您可以登录Apache Airflow的Web界面,在DAGs页签,查找到提交的调度流程,单击Links列的运行图标即可运行调度作业。

运行调度作业

步骤四:查看调度作业运行结果

您也可以单击调度作业名称,在Graph View页签查看到调度作业流程。单击调度流程中的某个作业,例如mc_job,您可以在mc_job对话框,单击View Log,即可查看运行结果。

调度流程