本文以Kohya训练任务为例,为您介绍如何以独立或集成的方式部署训练场景的弹性Job服务。并介绍调用该服务执行训练、获取训练结果、终止任务以及查询相关日志的操作步骤。
前提条件
已创建OSS存储空间(Bucket),用于存储训练获得的模型文件和配置文件。关于如何创建存储空间,详情请参见创建存储空间。
部署训练场景的弹性Job服务
本方案以PAI提供的预置镜像kohya_ss为例,为您介绍如何部署弹性伸缩的kohya训练服务:
登录PAI控制台,在页面上方选择目标地域,并在右侧选择目标工作空间,然后单击进入EAS。
部署训练服务。
支持以下两种部署方式:
集成部署
通过集成方式部署Kohya训练服务,包括队列服务、常驻的前端服务和弹性Job服务。具体操作步骤如下:
单击部署服务,然后在自定义模型部署区域,单击JSON独立部署。
在JSON编辑框中填入配置信息。
{ "cloud": { "computing": { "instance_type": "ecs.gn6i-c4g1.xlarge" } }, "containers": [ { "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2" } ], "features": { "eas.aliyun.com/extra-ephemeral-storage": "30Gi" }, "front_end": { "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2", "port": 8001, "script": "python -u kohya_gui.py --listen 0.0.0.0 --server_port 8001 --data-dir /workspace --headless --just-ui --job-service" }, "metadata": { "cpu": 4, "enable_webservice": true, "gpu": 1, "instance": 1, "memory": 15000, "name": "kohya_job", "type": "ScalableJobService" }, "name": "kohya_job", "storage": [ { "mount_path": "/workspace", "oss": { "path": "oss://examplebucket/kohya/", "readOnly": false }, "properties": { "resource_type": "model" } } ] }
其中关键参数说明如下:
参数
描述
metadata
name
自定义服务名称,在同地域内唯一。
type
服务类型,设置为ScalableJobService,表示集成部署。
enable_webservice
设置为true,表示部署前端AI-Web应用。
front_end
image
前端实例镜像。镜像选择kohya_ss,镜像版本选择2.2。
说明由于版本迭代迅速,部署时镜像版本选择最高版本即可。
script
前端实例的启动命令,设置为
python -u kohya_gui.py --listen 0.0.0.0 --server_port 8000 --headless --just-ui --job-service
。其中:--listen:用于将本程序绑定到指定的本机IP地址上,接收外部请求并进行处理。
--server_port:监听端口号。
--just-ui:启动独立前端模式,前端服务仅支持展示功能。
--job-service:训练任务由弹性Job服务执行。
port
端口号。必须与containers.script中的server_port参数值一致。
containers
image
弹性Job服务的镜像。如果不配置默认和前端实例镜像一致。
instance_type
弹性Job服务的实例的资源规格配置,必须选择GPU类型。若未配置该参数,则资源规格与cloud.computing.instance_type一致。
storage
path
以OSS挂载为例,请选择同地域下的OSS路径,用于存储训练生成的模型文件。本方案示例为
oss://examplebucket/kohya/
。readOnly
设置为false,否则模型文件无法存储到OSS中。
mount_path
挂载路径,可自定义。本方案配置为
/workspace
。cloud
instance_type
前端服务实例的资源规格配置,选择CPU类型即可。
单击部署。
独立部署
需要分别部署弹性Job服务和前端服务,弹性Job服务可接收多个前端服务的训练任务请求。具体操作步骤如下:
部署弹性Job服务。
单击部署服务,然后在自定义模型部署区域,单击JSON独立部署。
在JSON编辑框中填入弹性Job服务的配置信息。
{ "cloud": { "computing": { "instance_type": "ecs.gn6i-c4g1.xlarge" } }, "containers": [ { "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2" } ], "features": { "eas.aliyun.com/extra-ephemeral-storage": "30Gi" }, "metadata": { "instance": 1, "name": "kohya_scalable_job", "type": "ScalableJob" }, "storage": [ { "mount_path": "/workspace", "oss": { "path": "oss://examplebucket/kohya/", "readOnly": false }, "properties": { "resource_type": "model" } } ] }
其中关键参数说明如下:
参数
描述
metadata
name
自定义服务名称。同地域内唯一。
type
服务类型。设置为ScalableJob,表示独立部署。
containers
image
弹性Job服务的镜像。镜像选择kohya_ss,镜像版本选择2.2。
说明由于版本迭代迅速,部署时镜像版本选择最高版本即可。
storage
path
本方案以OSS挂载为例,请选择同地域下的OSS路径,用来存储训练生成的模型文件。本方案示例为
oss://examplebucket/kohya/
。readOnly
设置为false,否则模型文件无法存储到OSS中。
mount_path
挂载路径,可自定义。本方案配置为
/workspace
。cloud
instance_type
弹性Job服务的资源规格配置。kohya训练任务必须选择GPU类型。
单击部署。
服务部署成功后,单击服务方式列下的调用信息,在公网地址调用页签下获取服务访问地址和Token,并保存到本地。
(可选)部署前端服务。
单击部署服务,然后在自定义模型部署区域,单击JSON独立部署。
在JSON编辑框中填入前端服务的配置信息。
{ "cloud": { "computing": { "instance_type": "ecs.g6.large" } }, "containers": [ { "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2", "port": 8000, "script": "python kohya_gui.py --listen 0.0.0.0 --server_port 8000 --headless --just-ui --job-service --job-service-endpoint 166233998075****.vpc.cn-hangzhou.pai-eas.aliyuncs.com --job-service-token test-token --job-service-inputname kohya_scalable_job" } ], "metadata": { "enable_webservice": true, "instance": 1, "name": "kohya_scalable_job_front" }, "storage": [ { "mount_path": "/workspace", "oss": { "path": "oss://examplebucket/kohya/", "readOnly": false }, "properties": { "resource_type": "model" } } ] }
其中关键参数说明如下:
参数
描述
metadata
name
自定义前端服务名称。
enable_webservice
设置为true,表示部署前端AI-Web应用。
containers
image
前端服务镜像。镜像选择kohya_ss,镜像版本选择2.2。
说明由于版本迭代迅速,部署时镜像版本选择最高版本即可。
script
前端服务的启动命令,设置为
python kohya_gui.py --listen 0.0.0.0 --server_port 8000 --headless --just-ui --job-service --job-service-endpoint 166233998075****.vpc.cn-hangzhou.pai-eas.aliyuncs.com --job-service-token test-token --job-service-inputname kohya_scaled_job
。其中:--listen:用于将本程序绑定到指定的本机IP地址上,接收外部请求并进行处理。
--server_port:监听端口号。
--just-ui:启动独立前端模式,前端服务仅支持展示功能。
--job-service:训练任务由弹性Job服务执行。
--job-service-endpoint:弹性Job服务的Endpoint。
--job-service-token:弹性Job服务的token。
--job-service-inputname:弹性Job服务的服务名称。
port
端口号。必须与containers.script中的server_port参数值一致。
storage
path
本方案以OSS挂载为例,请选择同地域下的OSS路径,用来存储训练生成的模型文件。本方案示例为
oss://examplebucket/kohya/
。readOnly
设置为false,否则模型文件无法存储到OSS中。
mount_path
挂载路径,可自定义。本方案配置为
/workspace
。cloud
instance_type
前端服务实例的资源规格配置,选择CPU类型即可。
单击部署。
调用Kohya训练服务
通过WebUI页面调用弹性Job服务
如果您使用EAS预置的Kohya镜像部署前端服务,该镜像(2.2及其以上版本)已支持弹性Job服务功能。服务部署成功后,您可以单击服务方式列下的查看Web应用,在WebUI页面配置Lora训练参数,进行Kohya模型训练,详情请参见训练LoRA模型。
单击Start training按钮,即可发送训练任务请求。在训练任务完成或被终止前,再次单击该按钮无效。弹性Job服务的实例会根据训练任务个数自动弹性伸缩。
单击Stop training按钮,可终止当前训练任务。
自定义前端服务镜像调用弹性Job服务
您也可以通过以下Python SDK调用弹性Job服务,向弹性job服务的队列发送command任务请求以及获取任务执行日志。如果您使用自定义镜像部署前端服务,则需要在自定义镜像中实现以下接口的功能,以便通过WebUI调用弹性Job服务。调用步骤如下所示:
查询弹性Job服务的访问地址和Token。
集成部署
在模型在线服务(EAS)页面中,单击服务名称进入服务详情页面。在基本信息区域,单击查看调用信息。在公网地址调用页签中,获取服务访问地址和Token。其中:
服务访问地址:格式为
<queue_name>.<service_name>.<uid>.<region>.pai-eas.aliyuncs.com
,例如kohya-job-queue-b****4f0.kohya-job.175805416243****.cn-beijing.pai-eas.aliyuncs.com
。其中<queue_name>
为队列服务实例名称-0之前的部分,您可以在服务详情页面的服务实例列表中进行查看。Token:示例为
OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****==
。
独立部署
在模型在线服务(EAS)页面中,单击弹性Job服务的服务方式列下的调用信息,获取服务访问地址和Token。其中:
服务访问地址:示例为
175805416243****.cn-beijing.pai-eas.aliyuncs.com
。Token:示例为
Njk5NDU5MGYzNmRlZWQ3ND****QyMDIzMGM4MjExNmQ1NjE1NzY5Mw==
。
安装Python SDK。
pip install -U eas-prediction --user
关于SDK更多接口的详细介绍,请参见Python SDK使用说明。
分别为输入队列和输出队列创建客户端。
集成部署
from eas_prediction import QueueClient if __name__ == '__main__': token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****==' input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com' sink_url = input_url + '/sink' # 创建输入队列,用于发送command任务的训练请求和终止请求。 inputQueue = QueueClient(custom_url = input_url) inputQueue.set_token(token) inputQueue.init(gid="superwatcher") # 创建输出队列,用于获取command执行状态和日志。 sinkQueue = QueueClient(custom_url = sink_url) sinkQueue.set_token(token) sinkQueue.init()
其中:
token:替换为上述步骤已获取的Token。
input_url:替换为上述步骤已获取的服务访问地址。
独立部署
from eas_prediction import QueueClient if __name__ == '__main__': endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com' token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OT****c1MTUxNg==' input_name = 'kohya_scalable_job' sink_name = input_name + '/sink' # 创建输入队列,用于发送command任务的训练请求和终止请求。 inputQueue = QueueClient(endpoint, input_name) inputQueue.set_token(token) inputQueue.init() # 创建输出队列,用于获取command执行状态和日志。 sinkQueue = QueueClient(endpoint, sink_name) sinkQueue.set_token(token) sinkQueue.init()
其中:
endpoint:替换为上述步骤已获取的服务访问地址。
token:替换为上述步骤已获取的服务Token。
input_name:配置为弹性Job服务名称。
向输入队列发送任务训练请求。
集成部署
from eas_prediction import QueueClient import uuid if __name__ == '__main__': token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****==' input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com' sink_url = input_url + '/sink' # 创建输入队列客户端,用于发送command请求。 inputQueue = QueueClient(custom_url = input_url) inputQueue.set_token(token) inputQueue.init(gid="superwatcher") # 为每个任务请求生成唯一的taskId。 task_id = uuid.uuid1().hex # 创建command字符串。 cmd = "for i in {1..10}; do date; sleep 1; done;" # 指定taskType为command,指定taskId。 tags = {"taskType": "command", "taskId": task_id} # 向输入队列发送command训练任务请求。 index, request_id = inputQueue.put(cmd, tags) print(f'send index: {index}, request id: {request_id}')
其中关键参数说明如下:
参数
描述
token
替换为上述步骤已获取的Token。
input_url
替换为上述步骤已获取的服务访问地址。
cmd
配置为需要执行的命令。Python指令需要加-u参数以便实时输出任务执行时的日志。
tags
对于训练任务请求:
taskType:必须配置为command。
taskId:训练任务ID,唯一标识训练任务。
独立部署
from eas_prediction import QueueClient import uuid if __name__ == '__main__': endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com' token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYj****djN2IyODc1MTM5ZQ==' input_name = 'kohya_scalable_job' # 创建输入队列客户端,用于发送command请求。 inputQueue = QueueClient(endpoint, input_name) inputQueue.set_token(token) inputQueue.init(gid="superwatcher") # 为每个任务请求生成唯一的taskId。 task_id = uuid.uuid1().hex # 创建command字符串。 cmd = "for i in {1..10}; do date; sleep 1; done;" # 指定taskType为command,指定taskId。 tags = {"taskType": "command", "taskId": task_id} # 向输入队列发送command训练任务请求。 index, request_id = inputQueue.put(cmd, tags) print(f'send index: {index}, request id: {request_id}')
其中关键参数说明如下:
参数
描述
endpoint
按照上述代码中的配置示例,替换为上述步骤已获取的服务访问地址。
token
替换为上述步骤已获取的服务Token。
cmd
配置为需要执行的命令。Python指令需要加-u参数以便实时输出任务执行时的日志。
tags
对于训练任务请求:
taskType:必须配置为command。
taskId:训练任务ID,唯一标识训练任务。
查询请求排队状态。
集成部署
from eas_prediction import QueueClient import uuid if __name__ == '__main__': token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****==' input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com' sink_url = input_url + '/sink' # 创建输入队列客户端,用于发送command请求。 inputQueue = QueueClient(custom_url = input_url) inputQueue.set_token(token) inputQueue.init(gid="superwatcher") # 向输入队列发送command请求。 task_id = uuid.uuid1().hex cmd = "for i in {1..100}; do date; sleep 1; done;" tags = {"taskType": "command", "taskId": task_id} index, request_id = inputQueue.put(cmd, tags) # 查询请求数据的排队状态。 search_info = inputQueue.search(index) print("index: {}, search info: {}".format(index, search_info))
其中:
token:替换为上述步骤已获取的Token。
input_url:替换为上述步骤已获取的服务访问地址。
独立部署
from eas_prediction import QueueClient import uuid if __name__ == '__main__': endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com' token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYjBjOTdjN2I****1MTM5ZQ==' input_name = 'kohya_scalable_job' # 创建输入队列客户端,用于发送command请求。 inputQueue = QueueClient(endpoint, input_name) inputQueue.set_token(token) inputQueue.init(gid="superwatcher") # 向输入队列发送command请求。 task_id = uuid.uuid1().hex cmd = "for i in {1..100}; do date; sleep 1; done;" tags = {"taskType": "command", "taskId": task_id} index, request_id = inputQueue.put(cmd, tags) # 查询请求数据的排队状态。 search_info = inputQueue.search(index) print("index: {}, search info: {}".format(index, search_info))
其中:
endpoint:替换为上述步骤已获取的服务访问地址中的endpoint。
token:替换为上述步骤已获取的服务Token。
返回JSON格式的数据:
{ 'IsPending': False, 'WaitCount': 0 }
返回字段说明如下:
参数
描述
IsPending
表示请求是否正在被处理。可能值为:
True:表示请求正在被处理。
False:表示请求正在排队。
WaitCount
表示该请求在排队队列中排第几位。仅IsPending为False时该值才有效,IsPending为True时该值为0。
从输出队列中获取执行结果。
训练任务的执行日志会实时写入到输出队列中,可调用
queue.get(request_id=request_id, length=1, timeout='0s', tags=tags)
从输出队列实时获取指定task_id的训练任务的输出日志。调用示例如下:集成部署
from eas_prediction import QueueClient import json import uuid if __name__ == '__main__': token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****==' input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com' sink_url = input_url + '/sink' # 创建输入队列客户端,用于发送command请求。 inputQueue = QueueClient(custom_url = input_url) inputQueue.set_token(token) inputQueue.init(gid="superwatcher") # 创建输出队列客户端,用于获取command执行日志。 sinkQueue = QueueClient(custom_url = sink_url) sinkQueue.set_token(token) sinkQueue.init() # 向输入队列发送command请求。 cmd = "for i in {1..10}; do date; sleep 1; done;" task_id = uuid.uuid1().hex tags = {"taskType": "command", "taskId": task_id} index, request_id = inputQueue.put(cmd, tags) # 从输出队列实时获取指定taskId的训练任务的输出日志。 running = True while running: dfs = sinkQueue.get(length=1, timeout='0s', tags=tags) if len(dfs) == 0: continue df = dfs[0] data = json.loads(df.data.decode()) state = data["state"] print(data.get("log", "")) if state in {"Exited", "Stopped", "Fatal", "Backoff"}: running = False
其中:
token:替换为上述步骤已获取的Token。
input_url:替换为上述步骤已获取的服务访问地址。
独立部署
from eas_prediction import QueueClient import json import uuid if __name__ == '__main__': endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com' token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYjBjOTdjN2IyOD****M5ZQ==' input_name = 'kohya_scalable_job' sink_name = input_name + '/sink' # 创建输入队列客户端,用于发送command请求。 inputQueue = QueueClient(endpoint, input_name) inputQueue.set_token(token) inputQueue.init(gid="superwatcher") # 创建输出队列客户端,用于获取command执行日志。 sinkQueue = QueueClient(endpoint, sink_name) sinkQueue.set_token(token) sinkQueue.init() # 向输入队列发送command请求。 cmd = "for i in {1..10}; do date; sleep 1; done;" task_id = uuid.uuid1().hex tags = {"taskType": "command", "taskId": task_id} index, request_id = inputQueue.put(cmd, tags) # 从输出队列实时获取指定taskId的训练任务的输出日志。 running = True while running: dfs = sinkQueue.get(length=1, timeout='0s', tags=tags) if len(dfs) == 0: continue df = dfs[0] data = json.loads(df.data.decode()) state = data["state"] print(data.get("log", "")) if state in {"Exited", "Stopped", "Fatal", "Backoff"}: running = False
其中:
endpoint:替换为上述步骤已获取的服务访问地址中的endpoint。
token:替换为上述步骤已获取的服务Token。
返回数据Bytes类型的数据:
{ "taskId": "e97409eea4a111ee9cb600163e08****", "command": "python3 -u test.py --args=xxx", "state": "Running", "log": "prepare tokenizer\\n" }
返回结果中各字段说明如下:
字段
描述
taskId
唯一标识训练任务。
command
job执行的任务指令。
state
指令执行状态:
Running:执行中。
Exited:已退出。
Fatal:执行异常。
Stopping:正在停止。
Stopped:已停止。
log
输出日志。完整的日志会将同一taskId的日志按照获取顺序输出。
停止训练任务。
发送command任务请求后如果想要停止该任务,需要区分该请求是在排队状态还是执行状态。可通过queue.search(index)确认请求状态,并根据获取的请求状态来处理训练任务。示例如下:
集成部署
from eas_prediction.queue_client import QueueClient import uuid if __name__ == '__main__': token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****==' input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com' sink_url = input_url + '/sink' # 创建输入队列客户端,用于发送command请求和终止请求。 inputQueue = QueueClient(custom_url = input_url) inputQueue.set_token(token) inputQueue.init(gid="superwatcher") # 向输入队列发送command请求。 cmd = "for i in {1..10}; do date; sleep 1; done;" task_id = uuid.uuid1().hex # 该任务请求的task_id。 tags = {"taskType": "command", "taskId": task_id} index, request_id = inputQueue.put(cmd, tags) print(f'cmd send, index: {index}, task_id: {task_id}') job_index = index # 发送任务请求时返回的index。 pending_detail = inputQueue.search(job_index) print(f'search info: {pending_detail}') if len(pending_detail) > 0 and pending_detail.get("IsPending", True) == False: # command任务还在排队中,直接从输入队列中删除。 inputQueue.delete(job_index) print(f'delete task index: {job_index}') else: # command任务正在执行中,向输入队列发送stop信号。 stop_data = "stop" tags = {"_is_symbol_": "true", "taskId": task_id} inputQueue.put(stop_data, tags) print(f'stop task index: {job_index}')
其中关键参数说明如下:
参数
描述
token
替换为上述步骤已获取的Token。
input_url
替换为上述步骤已获取的服务访问地址。
stop_data
设置为stop。
tags
_is_symbol_:必填,设置为true表示任务终止信号。
task_id:表示需要终止的训练任务的task_id。
独立部署
from eas_prediction.queue_client import QueueClient import uuid if __name__ == '__main__': endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com' token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYjBjOTdjN2IyODc1MTM5****' input_name = 'kohya_scalable_job' # 创建输入队列客户端,用于发送command请求和终止请求。 inputQueue = QueueClient(endpoint, input_name) inputQueue.set_token(token) inputQueue.init(gid="superwatcher") # 向输入队列发送command请求。 cmd = "for i in {1..10}; do date; sleep 1; done;" # 该任务请求的task_id。 task_id = uuid.uuid1().hex tags = {"taskType": "command", "taskId": task_id} index, request_id = inputQueue.put(cmd, tags) print(f'cmd send, index: {index}, task_id: {task_id}') job_index = index # 发送任务请求时返回的index。 pending_detail = inputQueue.search(job_index) print(f'search info: {pending_detail}') if len(pending_detail) > 0 and pending_detail.get("IsPending", True) == False: # command任务还在排队中,直接从输入队列中删除。 inputQueue.delete(job_index) print(f'delete task index: {job_index}') else: # command任务正在执行中,向输入队列发送stop信号。 stop_data = "stop" tags = {"_is_symbol_": "true", "taskId": task_id} inputQueue.put(stop_data, tags) print(f'stop task index: {job_index}')
其中关键参数说明如下:
参数
描述
endpoint
替换为上述步骤已获取的服务访问地址中的endpoint。
token
替换为上述步骤已获取的服务Token。
stop_data
设置为stop。
tags
_is_symbol_:必填,设置为true表示任务终止信号。
task_id:表示需要终止的训练任务的task_id。
相关文档
关于弹性Job服务更详细的内容介绍,请参见弹性Job服务功能介绍。
如何在推理场景使用弹性Job服务,请参见部署写真相机在线推理服务。