AttachCluster最佳实践
更新时间:
0 背景
AttachCluster作业是批量计算最新推出的作业类型。它结合了固定集群作业和AutoCluster作业的优势,既能自动管理集群生命周期,弹性伸缩资源,又能使用分布式缓存节省资源。本文的目的在于介绍在阿里云批量计算服务上运行AttachCluster作业。
1 使用限制
- 支持创建集群时自定义系统盘和数据盘大小
不支持作业中自定义系统盘
创建默认集群中定义实例系统盘大小为SystemDiskSize之后,提交到该集群中的所有AttachCluster作业都默认设置为SystemDiskSize。提交job的时候,该字段填0或者不填写。
不支持作业中自定义数据
创建默认集群中定义实例系统盘大小为SystemDiskSize之后,提交到该集群中的所有AttachCluster作业都默认设置为DataDiskSize。提交job的时候,该字段填0或者不填写。
不支持APP作业模式, 支持DAG作业模式
作业中填写的镜像必须是m-xxx开头的镜像, 不是img开头的镜像(提交job的时候务必仔细检查这里!)
2 准备工作
2.1 开通阿里云批量计算服务
要使用批量计算服务,请根据官方文档里面的指导开通批量计算和其依赖的相关服务,如OSS等。
2.2 升级Python SDK
若您未安装批量计算Python SDK,请您参照安装方法安装该SDK。如果您检查已经安装之后,请您参照Python SDK升级方法, 升级批量计算Python SDK至最新版。
3 创建集群
AttachCluster作业首次使用时,需要创建一个集群,创建方法可参考官方文档 。该集群对配置没有特殊需求,实例数可设置为0。以下是创建集群的Python源代码。
import timeimport randomimport stringimport batchcomputefrom batchcompute import CN_SHENZHEN as REGIONfrom batchcompute import Client, ClientErrorfrom batchcompute.resources import (JobDescription, TaskDescription, DAG,GroupDescription, ClusterDescription,Configs, Networks, VPC, Classic, Mounts, Notification, Topic)ACCESS_KEY_ID = 'Your Access Key Id'ACCESS_KEY_SECRET = 'Your Access Key Secret'IMAGE_ID = 'img-ubuntu'INSTANCE_TYPE = 'ecs.sn2ne.large'client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)def create_cluster(idempotent_token=''):try:# Cluster description.cluster_desc = ClusterDescription()cluster_desc.Name = "test-cluster"cluster_desc.Description = "demo"cluster_desc.ImageId = IMAGE_IDcluster_desc.InstanceType = INSTANCE_TYPE#Group descriptiongroup_desc1 = GroupDescription()group_desc1.DesiredVMCount = 4group_desc1.InstanceType = 'ecs.sn1ne.large' #user group special instance typegroup_desc1.ResourceType = 'OnDemand'cluster_desc.add_group('group1', group_desc1)#cluster_desc.add_group('group2', group_desc2)#Configsconfigs = Configs()#Configs.Disksconfigs.add_system_disk(50, 'cloud_efficiency')configs.add_data_disk(500, 'cloud_efficiency', '/home/my-data-disk')#Configs.Networksnetworks = Networks()vpc = VPC()vpc.CidrBlock = '192.168.0.0/16'#vpc.VpcId = 'vpc-xxxxx'networks.VPC = vpcconfigs.Networks = networkscluster_desc.Configs = configsprint cluster_descrsp = client.create_cluster(cluster_desc, idempotent_token)# get cluster id for attach cluster jobreturn rsp.Idexcept ClientError, e:print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())return ""if __name__ == '__main__':#Not Use idempotent tokencluster_id = create_cluster()print cluster_id
3 创建作业
在创建作业的时候需要步骤2中的集群Id,填入task的AutoCluster的ClusterId字段中。以下是创建作业的Python源代码。
from batchcompute import Client, ClientErrorfrom batchcompute import CN_SHENZHEN as REGIONfrom batchcompute.resources import (ClusterDescription, GroupDescription, Configs, Networks, VPC,JobDescription, TaskDescription, DAG,Mounts,AutoCluster,Disks,Notification,)access_key_id = "" # your access key idaccess_key_secret = "" # your access key secretimage_id = "m-8vbd8lo9xxxx" # the id of a image created before,镜像需要确保已经注册给批量计算,且必须是m-xx开头的镜像,不是img开头的镜像instance_type = "ecs.sn1.medium" # instance typeinputOssPath = "oss://xxx/input/" # your input oss pathoutputOssPath = "oss://xxx/output/" #your output oss pathstdoutOssPath = "oss://xxx/log/stdout/" #your stdout oss pathstderrOssPath = "oss://xxx/log/stderr/" #your stderr oss pathdef getAutoClusterDesc():auto_desc = AutoCluster()# attach cluster这里里填入上一步创建的集群Idauto_desc.ClusterId = cls-xxxxxauto_desc.ImageId = image_idauto_desc.ReserveOnFail = False# 实例规格auto_desc.InstanceType = instance_type#case1 设置上限价格的竞价实例;# auto_desc.ResourceType = "Spot"# auto_desc.SpotStrategy = "SpotWithPriceLimit"# auto_desc.SpotPriceLimit = 0.5#case2 系统自动出价,最高按量付费价格# auto_desc.ResourceType = "Spot"# auto_desc.SpotStrategy = "SpotAsPriceGo"#case3 按量auto_desc.ResourceType = "OnDemand"#Configsconfigs = Configs()#Configs.Networksnetworks = Networks()vpc = VPC()#case1 只给CidrBlockvpc.CidrBlock = '192.168.0.0/16'#case2 CidrBlock和VpcId 都传入,必须保证VpcId的CidrBlock 和传入的CidrBlock保持一致# vpc.CidrBlock = '172.26.0.0/16'# vpc.VpcId = "vpc-8vbfxdyhxxxx"networks.VPC = vpcconfigs.Networks = networks# 不支持设置系统盘#configs.add_system_disk(size=0, type_='cloud_efficiency')#不支持设置数据盘# case1 linux环境# configs.add_data_disk(size=0, type_='cloud_efficiency', mount_point='/path/to/mount/')# case2 windows环境# configs.add_data_disk(size=0, type_='cloud_efficiency', mount_point='E:')# 设置节点个数configs.InstanceCount = 1auto_desc.Configs = configsreturn auto_descdef getDagJobDesc(clusterId = None):job_desc = JobDescription()dag_desc = DAG()mounts_desc = Mounts()job_desc.Name = "testBatchSdkJob"job_desc.Description = "test job"job_desc.Priority = 1# 订阅job完成或者失败事件noti_desc = Notification()noti_desc.Topic['Name'] = "test-topic"noti_desc.Topic['Endpoint'] = "http://[UserId].mns.[Region].aliyuncs.com/"noti_desc.Topic['Events'] = ["OnJobFinished", "OnJobFailed"]# job_desc.Notification = noti_descjob_desc.JobFailOnInstanceFail = False# 作业运行成功后户自动会被立即释放掉job_desc.AutoRelease = Falsejob_desc.Type = "DAG"echo_task = TaskDescription()# echo_task.InputMapping = {"oss://xxx/input/": "/home/test/input/",# "oss://xxx/test/file": "/home/test/test/file"}echo_task.InputMapping = {inputOssPath: "/home/test/input/"}echo_task.OutputMapping = {"/home/test/output/":outputOssPath}#触发程序运行的命令行#case1 执行linux命令行echo_task.Parameters.Command.CommandLine = "/bin/bash -c 'echo BatchcomputeService'"#case2 执行Windows CMD.exe# echo_task.Parameters.Command.CommandLine = "cmd /c 'echo BatchcomputeService'"#case3 输入可执行文件# PackagePath存放commandLine中的可执行文件或者二进制包# echo_task.Parameters.Command.PackagePath = "oss://xxx/package/test.sh"# echo_task.Parameters.Command.CommandLine = "sh test.sh"# 设置程序运行过程中相关环境变量信息echo_task.Parameters.Command.EnvVars["key1"] = "value1"echo_task.Parameters.Command.EnvVars["key2"] = "value2"# 设置程序的标准输出地址,程序中的print打印会实时上传到指定的oss地址echo_task.Parameters.StdoutRedirectPath = stdoutOssPath# 设置程序的标准错误输出地址,程序抛出的异常错误会实时上传到指定的oss地址echo_task.Parameters.StderrRedirectPath = stderrOssPath# 设置任务的超时时间echo_task.Timeout = 600# 设置任务所需实例个数# 环境变量BATCH_COMPUTE_INSTANCE_ID为0到InstanceCount-1# 在执行程序中访问BATCH_COMPUTE_INSTANCE_ID,实现数据访问的切片实现单任务并发执行echo_task.InstanceCount = 1# 设置任务失败后重试次数echo_task.MaxRetryCount = 0# NAS数据挂载#采用NAS时必须保证网络和NAS在同一个VPC内nasMountEntry = {"Source": "nas://xxxx.nas.aliyuncs.com:/","Destination": "/home/mnt/","WriteSupport":True,}mounts_desc.add_entry(nasMountEntry)mounts_desc.Locale = "utf-8"mounts_desc.Lock = False# echo_task.Mounts = mounts_desc# attach cluster作业该集群字段设置为空echo_task.ClusterId = ""echo_task.AutoCluster = getAutoClusterDesc()# 添加任务dag_desc.add_task('echoTask', echo_task)# 可以设置多个task,每个task可以根据需求进行设置各项参数# dag_desc.add_task('echoTask2', echo_task)# Dependencies设置多个task之间的依赖关系,echoTask2依赖echoTask;echoTask3依赖echoTask2# dag_desc.Dependencies = {"echoTask":["echoTask2"], "echoTask2":["echoTask3"]}job_desc.DAG = dag_descreturn job_descif __name__ == "__main__":client = Client(REGION, access_key_id, access_key_secret)try:job_desc = getDagJobDesc()job_id = client.create_job(job_desc).Idprint('job created: %s' % job_id)except ClientError,e:print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
AttachCluster作业创建已经完成。
该文章对您有帮助吗?