Task是ODPS的基本计算单元,例如SQLTask。

基本操作

Task在执行时会被实例化, 以ODPS实例的形式存在。您可以调用list_instances来获取项目空间下的所有的Instance。exist_instance用于判断是否存在某Instance;get_instance用于获取实例。

for instance in o.list_instances():
    print(instance.id)
    o.exist_instance('my_instance_id')

您可以在ODPS入口使用stop_instance,或通过Instance对象调用stop方法来停止一个Instance。

获取Logview地址

对于SQL等任务,通过调用get_logview_address方法即可获取Logview地址。

# 从已有的instance对象获取Logview地址。
instance = o.run_sql('desc pyodps_iris')
print(instance.get_logview_address())
# 从instance id获取Logview地址。
instance = o.get_instance('2016042605520945g9k5pvyi2')
print(instance.get_logview_address())

对于XFlow任务,需要先枚举其子任务,再获取子任务的Logiew。

instance = o.run_xflow('AppendID', 'algo_public', {'inputTableName': 'input_table', 'outputTableName': 'output_table'})
for sub_inst_name, sub_inst in o.get_xflow_sub_instances(instance).items():
    print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))

任务实例状态

一个Instance的状态可以是RunningSuspended或者 Terminated,用户可以通过status属性来获取状态。is_terminated方法返回当前Instance是否已经执行完毕,is_successful方法返回当前Instance是否正确地执行完毕, 任务处于运行中或者执行失败都会返回False。

instance = o.get_instance('2016042605520945g9k5pvyi2')
instance.status
<Status.TERMINATED: 'Terminated'>
    from odps.models import Instance
    instance.status == Instance.Status.TERMINATED
    True
    instance.status.value
    'Terminated'

调用wait_for_completion方法会阻塞直到Instance执行完成。wait_for_success方法同样会阻塞,但如果最终任务执行失败,会抛出相关异常。

子任务操作

一个Instance在运行时,可能包含一个或者多个子任务,这些子任务称为Task(注意:这里的Task不同于ODPS的计算单元)。

您可以通过get_task_names方法获取所有的Task任务。该方法返回一个所有子任务的名称列表。
instance.get_task_names()
['SQLDropTableTask']
得到Task的名称后,您可以通过get_task_result方法获取这个Task的执行结果。get_task_results方法是以字典的形式返回每个Task的执行结果。
instance = o.execute_sql('select * from pyodps_iris limit 1')
instance.get_task_names()
['AnonymousSQLTask']
instance.get_task_result('AnonymousSQLTask')
'"sepallength","sepalwidth","petallength","petalwidth","name"\n5.1,3.5,1.4,0.2,"Iris-setosa"\n'
instance.get_task_results()
OrderedDict([('AnonymousSQLTask', "sepallength","sepalwidth","petallength","petalwidth","name"\n5.1,3.5,1.4,0.2,"Iris-setosa"\n')])

您可以通过get_task_progress方法获取在任务实例运行时Task当前的运行进度。

while not instance.is_terminated():
    for task_name in instance.get_task_names():
        print(instance.id, instance.get_task_progress(task_name).get_stage_progress_formatted_string())
        time.sleep(10)
20160519101349613gzbzufck2 2016-05-19 18:14:03 M1_Stg1_job0:0/1/1[100%]