快速开始

舞动人像AnimateAnyone可基于人物图片和预设的骨骼动作序列,生成人物动作视频。其中,包含2个独立的模型“舞动人像AnimateAnyone-detect”和“舞动人像AnimateAnyone”,分别提供人物图片合规检测与人物视频生成能力。

模型部署与调用

以独占实例部署模型时,需要分别部署“舞动人像AnimateAnyone-detect”模型和“舞动人像AnimateAnyone”模型。

模型部署成功后,可查看到部署成功的模型名称(如下图所示)。

image

调用时,需按模型名称调用对应模型,并请参照以下调用顺序:

(1)调用“舞动人像AnimateAnyone-detect”模型确认输入的人物图像符合规范(可参考文档:AnimateAnyone 图像检测 API详情);

(2)调用“舞动人像AnimateAnyone”模型输入通过检测的人物图像和预设动作模板文件以生成视频(可参考文档:AnimateAnyone 视频生成 API详情)。

由于图像检测的调用耗时较短,而视频生成算法的调用耗时较长。使用时,可结合实际需要调整图像检测模型与视频生成模型的部署比例。通常,当图像以有序的队列输入时,1路图像检测并发应可支撑40路以上的视频生成并发任务。

前提条件

示例代码

以下示例展示了调用AnimateAnyone API对一个用户指令进行响应的代码。

#!/usr/bin/env python3

import os
import sys
import json
import time
import requests

DETECT_URL = 'https://dashscope.aliyuncs.com/api/v1/services/aigc/image2video/aa-detect'
ALGO_URL = 'https://dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis/'
TASK_URL = 'https://dashscope.aliyuncs.com/api/v1/tasks/'
api_key = os.getenv('DASHSCOPE_API_KEY')

def sync_call(image_url):
    res = requests.post(DETECT_URL,
                       headers={
                           'Accept': 'application/json',
                           'Authorization': f'Bearer {api_key}',
                       },
                       json={
                           'input': {
                               "image_url": image_url,
                           },
                           'model': 'animate-anyone-detect'
                       })
    if res.status_code != requests.codes.ok:
        raise RuntimeError(f"Error in call anymiate_anyone: ({res.status_code}) \n{res.text}")

    return res.json()


def async_call(pose_sequence_id, image_url):
    res = requests.post(ALGO_URL,
                       headers={
                           'X-DashScope-async': 'enable',
                           'Accept': 'application/json',
                           'Authorization': f'Bearer {api_key}',
                       },
                       json={
                           "input": {
                               "image_url": image_url,
                               "pose_sequence_id": pose_sequence_id
                           },
                           "model": "animate-anyone"
                       })
    if res.status_code != requests.codes.ok:
        raise RuntimeError(f"Error in call anymiate_anyone: ({res.status_code}) \n{res.text}")

    return res.json().get('output', {}).get('task_id')

def task(task_id):
    res = requests.get(f'{TASK_URL}{task_id}', headers={'Authorization': f'Bearer {api_key}'})

    if res.status_code != requests.codes.ok:
        raise RuntimeError(f"Error in call anymiate_anyone: ({res.status_code}) \n{res.text}")

    return res.json()

if __name__ == '__main__':
    sequence_id = sys.argv[1]
    img_url = sys.argv[2]
    ## step1: 检测提供图片是否满足算法要求
    checked = sync_call(img_url)
    if not checked.get('output', {}).get('check_pass', False):
        print(json.dumps(checked.get('output'), indent=2))
        sys.exit(1)

    ## step2: 调用推理算法
    task_id = async_call(sequence_id, img_url)
    print(f'Submit Task: {task_id}')
    print('Waiting task:', end='', flush=True)

    ## step3: 等待生成结果
    while True:
        result = task(task_id)
        task_status = result.get('output', {}).get('task_status', 'UNKNOWN')

        if task_status == 'PENDING':
            print('-', end='', flush=True)
        elif task_status == 'RUNNING':
            print('=', end='', flush=True)
        else:
            print()
            print(json.dumps(result, indent=2))
            break
        time.sleep(60)