部署弹性伸缩的Kohya训练服务

本文以Kohya训练任务为例,为您介绍如何以独立或集成的方式部署训练场景的弹性Job服务。并介绍调用该服务执行训练、获取训练结果、终止任务以及查询相关日志的操作步骤。

前提条件

已创建OSS存储空间(Bucket),用于存储训练获得的模型文件和配置文件。关于如何创建存储空间,详情请参见创建存储空间

部署训练场景的弹性Job服务

本方案以PAI提供的预置镜像kohya_ss为例,为您介绍如何部署弹性伸缩的kohya训练服务:

  1. 登录PAI控制台,在页面上方选择目标地域,并在右侧选择目标工作空间,然后单击进入EAS

  2. 部署训练服务。

    支持以下两种部署方式:

    集成部署

    通过集成方式部署Kohya训练服务,包括队列服务、常驻的前端服务和弹性Job服务。具体操作步骤如下:

    1. 单击部署服务,然后在自定义模型部署区域,单击JSON独立部署

    2. JSON编辑框中填入配置信息。

      {
        "cloud": {
          "computing": {
            "instance_type": "ecs.gn6i-c4g1.xlarge"
          }
        },
        "containers": [
          {
            "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2"
          }
        ],
        "features": {
          "eas.aliyun.com/extra-ephemeral-storage": "30Gi"
        },
        "front_end": {
          "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2",
          "port": 8001,
          "script": "python -u kohya_gui.py --listen 0.0.0.0 --server_port 8001 --data-dir /workspace --headless --just-ui --job-service"
        },
        "metadata": {
          "cpu": 4,
          "enable_webservice": true,
          "gpu": 1,
          "instance": 1,
          "memory": 15000,
          "name": "kohya_job",
          "type": "ScalableJobService"
        },
        "name": "kohya_job",
        "storage": [
          {
            "mount_path": "/workspace",
            "oss": {
              "path": "oss://examplebucket/kohya/",
              "readOnly": false
            },
            "properties": {
              "resource_type": "model"
            }
          }
        ]
      }

      其中关键参数说明如下:

      参数

      描述

      metadata

      name

      自定义服务名称,在同地域内唯一。

      type

      服务类型,设置为ScalableJobService,表示集成部署。

      enable_webservice

      设置为true,表示部署前端AI-Web应用。

      front_end

      image

      前端实例镜像。镜像选择kohya_ss,镜像版本选择2.2

      说明

      由于版本迭代迅速,部署时镜像版本选择最高版本即可。

      script

      前端实例的启动命令,设置为python -u kohya_gui.py --listen 0.0.0.0 --server_port 8000 --headless --just-ui --job-service。其中:

      • --listen:用于将本程序绑定到指定的本机IP地址上,接收外部请求并进行处理。

      • --server_port:监听端口号。

      • --just-ui:启动独立前端模式,前端服务仅支持展示功能。

      • --job-service:训练任务由弹性Job服务执行。

      port

      端口号。必须与containers.script中的server_port参数值一致。

      containers

      image

      弹性Job服务的镜像。如果不配置默认和前端实例镜像一致。

      instance_type

      弹性Job服务的实例的资源规格配置,必须选择GPU类型。若未配置该参数,则资源规格与cloud.computing.instance_type一致。

      storage

      path

      OSS挂载为例,请选择同地域下的OSS路径,用于存储训练生成的模型文件。本方案示例为oss://examplebucket/kohya/

      readOnly

      设置为false,否则模型文件无法存储到OSS中。

      mount_path

      挂载路径,可自定义。本方案配置为/workspace

      cloud

      instance_type

      前端服务实例的资源规格配置,选择CPU类型即可。

    3. 单击部署

    独立部署

    需要分别部署弹性Job服务和前端服务,弹性Job服务可接收多个前端服务的训练任务请求。具体操作步骤如下:

    1. 部署弹性Job服务。

      1. 单击部署服务,然后在自定义模型部署区域,单击JSON独立部署

      2. JSON编辑框中填入弹性Job服务的配置信息。

        {
          "cloud": {
            "computing": {
              "instance_type": "ecs.gn6i-c4g1.xlarge"
            }
          },
          "containers": [
            {
              "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2"
            }
          ],
          "features": {
            "eas.aliyun.com/extra-ephemeral-storage": "30Gi"
          },
          "metadata": {
            "instance": 1,
            "name": "kohya_scalable_job",
            "type": "ScalableJob"
          },
          "storage": [
            {
              "mount_path": "/workspace",
              "oss": {
                "path": "oss://examplebucket/kohya/",
                "readOnly": false
              },
              "properties": {
                "resource_type": "model"
              }
            }
          ]
        }

        其中关键参数说明如下:

        参数

        描述

        metadata

        name

        自定义服务名称。同地域内唯一。

        type

        服务类型。设置为ScalableJob,表示独立部署。

        containers

        image

        弹性Job服务的镜像。镜像选择kohya_ss,镜像版本选择2.2

        说明

        由于版本迭代迅速,部署时镜像版本选择最高版本即可。

        storage

        path

        本方案以OSS挂载为例,请选择同地域下的OSS路径,用来存储训练生成的模型文件。本方案示例为oss://examplebucket/kohya/

        readOnly

        设置为false,否则模型文件无法存储到OSS中。

        mount_path

        挂载路径,可自定义。本方案配置为/workspace

        cloud

        instance_type

        弹性Job服务的资源规格配置。kohya训练任务必须选择GPU类型。

      3. 单击部署

      4. 服务部署成功后,单击服务方式列下的调用信息,在公网地址调用页签下获取服务访问地址和Token,并保存到本地。

    2. (可选)部署前端服务。

      1. 单击部署服务,然后在自定义模型部署区域,单击JSON独立部署

      2. JSON编辑框中填入前端服务的配置信息。

        {
          "cloud": {
            "computing": {
              "instance_type": "ecs.g6.large"
            }
          },
          "containers": [
            {
              "image": "eas-registry-vpc.cn-hangzhou.cr.aliyuncs.com/pai-eas/kohya_ss:2.2",
              "port": 8000,
              "script": "python kohya_gui.py --listen 0.0.0.0 --server_port 8000 --headless --just-ui --job-service --job-service-endpoint 166233998075****.vpc.cn-hangzhou.pai-eas.aliyuncs.com --job-service-token test-token --job-service-inputname kohya_scalable_job"
            }
          ],
          "metadata": {
            "enable_webservice": true,
            "instance": 1,
            "name": "kohya_scalable_job_front"
          },
          "storage": [
            {
              "mount_path": "/workspace",
              "oss": {
                "path": "oss://examplebucket/kohya/",
                "readOnly": false
              },
              "properties": {
                "resource_type": "model"
              }
            }
          ]
        }

        其中关键参数说明如下:

        参数

        描述

        metadata

        name

        自定义前端服务名称。

        enable_webservice

        设置为true,表示部署前端AI-Web应用。

        containers

        image

        前端服务镜像。镜像选择kohya_ss,镜像版本选择2.2

        说明

        由于版本迭代迅速,部署时镜像版本选择最高版本即可。

        script

        前端服务的启动命令,设置为python kohya_gui.py --listen 0.0.0.0 --server_port 8000 --headless --just-ui --job-service --job-service-endpoint 166233998075****.vpc.cn-hangzhou.pai-eas.aliyuncs.com --job-service-token test-token --job-service-inputname kohya_scaled_job。其中:

        • --listen:用于将本程序绑定到指定的本机IP地址上,接收外部请求并进行处理。

        • --server_port:监听端口号。

        • --just-ui:启动独立前端模式,前端服务仅支持展示功能。

        • --job-service:训练任务由弹性Job服务执行。

        • --job-service-endpoint:弹性Job服务的Endpoint。

        • --job-service-token:弹性Job服务的token。

        • --job-service-inputname:弹性Job服务的服务名称。

        port

        端口号。必须与containers.script中的server_port参数值一致。

        storage

        path

        本方案以OSS挂载为例,请选择同地域下的OSS路径,用来存储训练生成的模型文件。本方案示例为oss://examplebucket/kohya/

        readOnly

        设置为false,否则模型文件无法存储到OSS中。

        mount_path

        挂载路径,可自定义。本方案配置为/workspace

        cloud

        instance_type

        前端服务实例的资源规格配置,选择CPU类型即可。

    3. 单击部署

调用Kohya训练服务

通过WebUI页面调用弹性Job服务

如果您使用EAS预置的Kohya镜像部署前端服务,该镜像(2.2及其以上版本)已支持弹性Job服务功能。服务部署成功后,您可以单击服务方式列下的查看Web应用,在WebUI页面配置Lora训练参数,进行Kohya模型训练,详情请参见训练LoRA模型

image.png

  • 单击Start training按钮,即可发送训练任务请求。在训练任务完成或被终止前,再次单击该按钮无效。弹性Job服务的实例会根据训练任务个数自动弹性伸缩。

  • 单击Stop training按钮,可终止当前训练任务。

自定义前端服务镜像调用弹性Job服务

您也可以通过以下Python SDK调用弹性Job服务,向弹性job服务的队列发送command任务请求以及获取任务执行日志。如果您使用自定义镜像部署前端服务,则需要在自定义镜像中实现以下接口的功能,以便通过WebUI调用弹性Job服务。调用步骤如下所示:

  1. 查询弹性Job服务的访问地址和Token。

    集成部署

    模型在线服务(EAS)页面中,单击服务名称进入服务详情页面。在基本信息区域,单击查看调用信息。在公网地址调用页签中,获取服务访问地址和Token。其中:

    • 服务访问地址:格式为<queue_name>.<service_name>.<uid>.<region>.pai-eas.aliyuncs.com,例如kohya-job-queue-b****4f0.kohya-job.175805416243****.cn-beijing.pai-eas.aliyuncs.com。其中<queue_name>为队列服务实例名称-0之前的部分,您可以在服务详情页面的服务实例列表中进行查看。image

    • Token:示例为OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****==

    独立部署

    模型在线服务(EAS)页面中,单击弹性Job服务的服务方式列下的调用信息,获取服务访问地址和Token。其中:

    • 服务访问地址:示例为175805416243****.cn-beijing.pai-eas.aliyuncs.com

    • Token:示例为Njk5NDU5MGYzNmRlZWQ3ND****QyMDIzMGM4MjExNmQ1NjE1NzY5Mw==

  2. 安装Python SDK。

    pip install -U eas-prediction --user

    关于SDK更多接口的详细介绍,请参见Python SDK使用说明

  3. 分别为输入队列和输出队列创建客户端。

    集成部署

    from eas_prediction import QueueClient
    
    if __name__ == '__main__':
        token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****=='
        input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com'
        sink_url = input_url + '/sink'
    
        # 创建输入队列,用于发送command任务的训练请求和终止请求。
        inputQueue = QueueClient(custom_url = input_url)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # 创建输出队列,用于获取command执行状态和日志。
        sinkQueue = QueueClient(custom_url = sink_url)
        sinkQueue.set_token(token)
        sinkQueue.init()
    

    其中:

    • token:替换为上述步骤已获取的Token。

    • input_url:替换为上述步骤已获取的服务访问地址。

    独立部署

    from eas_prediction import QueueClient
    
    if __name__ == '__main__':
        endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com'
        token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OT****c1MTUxNg=='
        input_name = 'kohya_scalable_job'
        sink_name = input_name + '/sink'
    
        # 创建输入队列,用于发送command任务的训练请求和终止请求。
        inputQueue = QueueClient(endpoint, input_name)
        inputQueue.set_token(token)
        inputQueue.init()
    
        # 创建输出队列,用于获取command执行状态和日志。
        sinkQueue = QueueClient(endpoint, sink_name)
        sinkQueue.set_token(token)
        sinkQueue.init()
    

    其中:

    • endpoint:替换为上述步骤已获取的服务访问地址。

    • token:替换为上述步骤已获取的服务Token。

    • input_name:配置为弹性Job服务名称。

  4. 向输入队列发送任务训练请求。

    集成部署

    from eas_prediction import QueueClient
    import uuid
    
    if __name__ == '__main__':
        token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****=='
        input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com'
        sink_url = input_url + '/sink'
    
        # 创建输入队列客户端,用于发送command请求。
        inputQueue = QueueClient(custom_url = input_url)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # 为每个任务请求生成唯一的taskId。
        task_id = uuid.uuid1().hex
        # 创建command字符串。
        cmd = "for i in {1..10}; do date; sleep 1; done;"
        # 指定taskTypecommand,指定taskId。
        tags = {"taskType": "command", "taskId": task_id}
        # 向输入队列发送command训练任务请求。
        index, request_id = inputQueue.put(cmd, tags)
        print(f'send index: {index}, request id: {request_id}')
    

    其中关键参数说明如下:

    参数

    描述

    token

    替换为上述步骤已获取的Token。

    input_url

    替换为上述步骤已获取的服务访问地址。

    cmd

    配置为需要执行的命令。Python指令需要加-u参数以便实时输出任务执行时的日志。

    tags

    对于训练任务请求:

    • taskType:必须配置为command。

    • taskId:训练任务ID,唯一标识训练任务。

    独立部署

    from eas_prediction import QueueClient
    import uuid
    
    if __name__ == '__main__':
        endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com'
        token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYj****djN2IyODc1MTM5ZQ=='
        input_name = 'kohya_scalable_job'
    
        # 创建输入队列客户端,用于发送command请求。
        inputQueue = QueueClient(endpoint, input_name)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # 为每个任务请求生成唯一的taskId。
        task_id = uuid.uuid1().hex
        # 创建command字符串。
        cmd = "for i in {1..10}; do date; sleep 1; done;"
        # 指定taskTypecommand,指定taskId。
        tags = {"taskType": "command", "taskId": task_id}
        # 向输入队列发送command训练任务请求。
        index, request_id = inputQueue.put(cmd, tags)
        print(f'send index: {index}, request id: {request_id}')
    

    其中关键参数说明如下:

    参数

    描述

    endpoint

    按照上述代码中的配置示例,替换为上述步骤已获取的服务访问地址。

    token

    替换为上述步骤已获取的服务Token。

    cmd

    配置为需要执行的命令。Python指令需要加-u参数以便实时输出任务执行时的日志。

    tags

    对于训练任务请求:

    • taskType:必须配置为command。

    • taskId:训练任务ID,唯一标识训练任务。

  5. 查询请求排队状态。

    集成部署

    from eas_prediction import QueueClient
    import uuid
    
    if __name__ == '__main__':
        token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****=='
        input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com'
        sink_url = input_url + '/sink'
    
        # 创建输入队列客户端,用于发送command请求。
        inputQueue = QueueClient(custom_url = input_url)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # 向输入队列发送command请求。
        task_id = uuid.uuid1().hex
        cmd = "for i in {1..100}; do date; sleep 1; done;"
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
    
        # 查询请求数据的排队状态。
        search_info = inputQueue.search(index)
        print("index: {}, search info: {}".format(index, search_info))
    

    其中:

    • token:替换为上述步骤已获取的Token。

    • input_url:替换为上述步骤已获取的服务访问地址。

    独立部署

    from eas_prediction import QueueClient
    import uuid
    
    if __name__ == '__main__':
        endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com'
        token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYjBjOTdjN2I****1MTM5ZQ=='
        input_name = 'kohya_scalable_job'
    
        # 创建输入队列客户端,用于发送command请求。
        inputQueue = QueueClient(endpoint, input_name)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # 向输入队列发送command请求。
        task_id = uuid.uuid1().hex
        cmd = "for i in {1..100}; do date; sleep 1; done;"
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
    
        # 查询请求数据的排队状态。
        search_info = inputQueue.search(index)
        print("index: {}, search info: {}".format(index, search_info))
    

    其中:

    • endpoint:替换为上述步骤已获取的服务访问地址中的endpoint。

    • token:替换为上述步骤已获取的服务Token。

    返回JSON格式的数据:

    {
    	'IsPending': False,
    	'WaitCount': 0
    }

    返回字段说明如下:

    参数

    描述

    IsPending

    表示请求是否正在被处理。可能值为:

    • True:表示请求正在被处理。

    • False:表示请求正在排队。

    WaitCount

    表示该请求在排队队列中排第几位。仅IsPendingFalse时该值才有效,IsPendingTrue时该值为0。

  6. 从输出队列中获取执行结果。

    训练任务的执行日志会实时写入到输出队列中,可调用queue.get(request_id=request_id, length=1, timeout='0s', tags=tags)从输出队列实时获取指定task_id的训练任务的输出日志。调用示例如下:

    集成部署

    from eas_prediction import QueueClient
    import json
    import uuid
    
    if __name__ == '__main__':
        token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****=='
        input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com'
        sink_url = input_url + '/sink'
    
        # 创建输入队列客户端,用于发送command请求。
        inputQueue = QueueClient(custom_url = input_url)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # 创建输出队列客户端,用于获取command执行日志。
        sinkQueue = QueueClient(custom_url = sink_url)
        sinkQueue.set_token(token)
        sinkQueue.init()
    
        # 向输入队列发送command请求。
        cmd = "for i in {1..10}; do date; sleep 1; done;"
        task_id = uuid.uuid1().hex
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
    
        # 从输出队列实时获取指定taskId的训练任务的输出日志。
        running = True
        while running:
            dfs = sinkQueue.get(length=1, timeout='0s', tags=tags)
            if len(dfs) == 0:
                continue
            df = dfs[0]
            data = json.loads(df.data.decode())
            state = data["state"]
            print(data.get("log", ""))
            if state in {"Exited", "Stopped", "Fatal", "Backoff"}:
                running = False
    

    其中:

    • token:替换为上述步骤已获取的Token。

    • input_url:替换为上述步骤已获取的服务访问地址。

    独立部署

    from eas_prediction import QueueClient
    import json
    import uuid
    
    if __name__ == '__main__':
        endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com'
        token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYjBjOTdjN2IyOD****M5ZQ=='
        input_name = 'kohya_scalable_job'
        sink_name = input_name + '/sink'
    
        # 创建输入队列客户端,用于发送command请求。
        inputQueue = QueueClient(endpoint, input_name)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # 创建输出队列客户端,用于获取command执行日志。
        sinkQueue = QueueClient(endpoint, sink_name)
        sinkQueue.set_token(token)
        sinkQueue.init()
    
        # 向输入队列发送command请求。
        cmd = "for i in {1..10}; do date; sleep 1; done;"
        task_id = uuid.uuid1().hex
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
    
        # 从输出队列实时获取指定taskId的训练任务的输出日志。
        running = True
        while running:
            dfs = sinkQueue.get(length=1, timeout='0s', tags=tags)
            if len(dfs) == 0:
                continue
            df = dfs[0]
            data = json.loads(df.data.decode())
            state = data["state"]
            print(data.get("log", ""))
            if state in {"Exited", "Stopped", "Fatal", "Backoff"}:
                running = False
    

    其中:

    • endpoint:替换为上述步骤已获取的服务访问地址中的endpoint。

    • token:替换为上述步骤已获取的服务Token。

    返回数据Bytes类型的数据:

    {
    	"taskId": "e97409eea4a111ee9cb600163e08****",
    	"command": "python3 -u test.py --args=xxx",
    	"state": "Running",
    	"log": "prepare tokenizer\\n"
    }

    返回结果中各字段说明如下:

    字段

    描述

    taskId

    唯一标识训练任务。

    command

    job执行的任务指令。

    state

    指令执行状态:

    • Running:执行中。

    • Exited:已退出。

    • Fatal:执行异常。

    • Stopping:正在停止。

    • Stopped:已停止。

    log

    输出日志。完整的日志会将同一taskId的日志按照获取顺序输出。

  7. 停止训练任务。

    发送command任务请求后如果想要停止该任务,需要区分该请求是在排队状态还是执行状态。可通过queue.search(index)确认请求状态,并根据获取的请求状态来处理训练任务。示例如下:

    集成部署

    from eas_prediction.queue_client import QueueClient
    import uuid
    
    if __name__ == '__main__':
        token = 'OGZlNzQwM2VlMWUyM2E2ZTAyMGRjOGQ5MWMyOTFjZGExNDgwMT****=='
        input_url = 'kohya-job-queue-bf****f0.kohya-job.175805416243****.cn-hangzhou.pai-eas.aliyuncs.com'
        sink_url = input_url + '/sink'
    
        # 创建输入队列客户端,用于发送command请求和终止请求。
        inputQueue = QueueClient(custom_url = input_url)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # 向输入队列发送command请求。
        cmd = "for i in {1..10}; do date; sleep 1; done;"
        task_id = uuid.uuid1().hex  # 该任务请求的task_id。
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
        print(f'cmd send, index: {index}, task_id: {task_id}')
    
        job_index = index  # 发送任务请求时返回的index。
    
        pending_detail = inputQueue.search(job_index)
        print(f'search info: {pending_detail}')
        if len(pending_detail) > 0 and pending_detail.get("IsPending", True) == False:
            # command任务还在排队中,直接从输入队列中删除。
            inputQueue.delete(job_index)
            print(f'delete task index: {job_index}')
        else:
            # command任务正在执行中,向输入队列发送stop信号。
            stop_data = "stop"
            tags = {"_is_symbol_": "true", "taskId": task_id}
            inputQueue.put(stop_data, tags)
            print(f'stop task index: {job_index}')
    

    其中关键参数说明如下:

    参数

    描述

    token

    替换为上述步骤已获取的Token。

    input_url

    替换为上述步骤已获取的服务访问地址。

    stop_data

    设置为stop。

    tags

    • _is_symbol_:必填,设置为true表示任务终止信号。

    • task_id:表示需要终止的训练任务的task_id。

    独立部署

    from eas_prediction.queue_client import QueueClient
    import uuid
    
    if __name__ == '__main__':
        endpoint = '166233998075****.cn-hangzhou.pai-eas.aliyuncs.com'
        token = 'M2EyNWYzNDJmNjQ5ZmUzMmM0OTMyMzgzYjBjOTdjN2IyODc1MTM5****'
        input_name = 'kohya_scalable_job'
    
        # 创建输入队列客户端,用于发送command请求和终止请求。
        inputQueue = QueueClient(endpoint, input_name)
        inputQueue.set_token(token)
        inputQueue.init(gid="superwatcher")
    
        # 向输入队列发送command请求。
        cmd = "for i in {1..10}; do date; sleep 1; done;"
    
        # 该任务请求的task_id。
        task_id = uuid.uuid1().hex
        tags = {"taskType": "command", "taskId": task_id}
        index, request_id = inputQueue.put(cmd, tags)
        print(f'cmd send, index: {index}, task_id: {task_id}')
    
        job_index = index  # 发送任务请求时返回的index。
    
        pending_detail = inputQueue.search(job_index)
        print(f'search info: {pending_detail}')
        if len(pending_detail) > 0 and pending_detail.get("IsPending", True) == False:
            # command任务还在排队中,直接从输入队列中删除。
            inputQueue.delete(job_index)
            print(f'delete task index: {job_index}')
        else:
            # command任务正在执行中,向输入队列发送stop信号。
            stop_data = "stop"
            tags = {"_is_symbol_": "true", "taskId": task_id}
            inputQueue.put(stop_data, tags)
            print(f'stop task index: {job_index}')
    

    其中关键参数说明如下:

    参数

    描述

    endpoint

    替换为上述步骤已获取的服务访问地址中的endpoint。

    token

    替换为上述步骤已获取的服务Token。

    stop_data

    设置为stop。

    tags

    • _is_symbol_:必填,设置为true表示任务终止信号。

    • task_id:表示需要终止的训练任务的task_id。

相关文档