文档

任务类型

更新时间:

1. 背景

​ 一个作业(Job)由一组任务(Task)及其依赖关系组成,每个任务可以有一个或多个执行实例(Instance)。具体详情看名词解释。目前的任务类型分为两种:并发任务和 DAG(Directed Acyclic Graph) 任务。

2. 任务概述

2.1 并发任务

作业中的一个任务可以指定在多个实例上运行程序,这些实例运行的任务程序都是一样的,但是可以处理不同的数据。

2.2 DAG任务

作业中的多个任务之间可以有 DAG 依赖关系。即前面的任务运行完成后, 后面的任务才开始运行。

3. 任务实现

这两种任务是在提交的 Job 中指定相关字段实现的,下面以 Python SDK 为例给出实现方式,代码的完整程序见快速开始。

3.1 并发任务实现

在提交的 Job 中,填写 InstanceCount 字段。指明任务需要的实例数。该字段就是实现任务的并发功能。

from batchcompute.resources import (
    JobDescription, TaskDescription, DAG
)
# create my_task
 my_task = TaskDescription()
 my_task.InstanceCount = 3 #指定需要实例数:3台VM

如果并发任务需要处理不同片段的数据,这个时候在需要运行的任务程序中使用环境变量:BATCH_COMPUTE_DAG_INSTANCE_ID(实例 ID)来区分,就可以处理不同片段的数据。下面的示例程序是快速开始的count代码,假设输入数据已经放在OSS中。您需要下载OSS的sdk

import oss2 #oss sdk
from conf import conf
import os 
import json

endpoint = os.environ.get('BATCH_COMPUTE_OSS_HOST') #OSS Host
auth = oss2.Auth(conf['access_key_id'], conf['access_key_secret'])

def download_file(oss_path, filename):
    (bucket, key) = parse_oss_path(oss_path)
    bucket_tool = oss2.Bucket(auth, endpoint, bucket)
    bucket_tool.get_object_to_file(key, filename)

def upload_file(filename, oss_path):
    (bucket, key) = parse_oss_path(oss_path)
    bucket_tool = oss2.Bucket(auth, endpoint, bucket)
    bucket_tool.put_object_from_file(key,filename)

def put_data(data, oss_path):
    (bucket, key) = parse_oss_path(oss_path)
    bucket_tool = oss2.Bucket(auth, endpoint, bucket)
    bucket_tool.put_object(key, data)

def parse_oss_path(oss_path):
    s = oss_path[len('oss://'):]
    [bucket, key] = s.split('/',1)
    return (bucket,key)

def main():
    # instance_id: should be start from 0
    instance_id = os.environ['BATCH_COMPUTE_DAG_INSTANCE_ID']
    data_path = conf['data_path']
    split_results = 'split_results'
    filename = 'part_%s.txt' %  instance_id
    pre = data_path[0: data_path.rfind('/')]
    print('download form: %s/%s/' % (pre, split_results))

    # 1. download a part
    download_file('%s/%s/%s.txt' % (pre, split_results, instance_id ), filename)
    # 2. parse, calculate
    with open(filename) as f:
        txt = f.read()
    m = {
        'INFO': 0,
        'WARN': 0,
        'ERROR': 0,
        'DEBUG': 0
    }
    for k in m:
       m[k] = len(re.findall(k, txt))
    print(m)
    # 3. upload result to oss
    upload_to = '%s/count_results/%s.json' % (pre, instance_id )
    print('upload to %s' % upload_to)
    put_data(json.dumps(m), upload_to)

3.2 DAG任务实现

在提交的job中,填写 Dependencies 字段。指明任务之间的依赖关系。下面的图中,首先理清各个任务之间的依赖关系,count1 和 count2 是并行的任务,它们依赖 split 任务,merge任务依赖 count1 和 count2。

img

依据上面的依赖关系,在Job中可以这样描述:

from batchcompute.resources import (
    JobDescription, TaskDescription, DAG, AutoCluster
)

job_desc = JobDescription()
#以下省略task的描述内容
split = TaskDescription()
count1 = TaskDescription() 
count2 = TaskDescription()
merge = TaskDescription()

task_dag = DAG()
task_dag.add_task(task_name="split", task=split)
task_dag.add_task(task_name="count1", task=count1)
task_dag.add_task(task_name="count2", task=count2)
task_dag.add_task(task_name="merge", task=merge)
task_dag.Dependencies = {
  'split': ['count1', 'count2'],
  'count1': ['merge'],
  'count2': ['merge']
}

job_desc.DAG = task_dag

整个作业的任务执行顺序是:

  • split 运行完成后,count1 和 count2 同时开始运行,count1 和 count2 都完成后,merge 才开始运行。

  • merge 运行完成,整个作业结束。