数字人实时对话AvatarDialog API参考

更新时间:2025-04-17 02:50:35

数字人实时对话模型(avatar-dialog)能够基于输入的音频和预设的2D数字人形象,实时生成播报视频。本文介绍其WebSocket API交互流程及输入输出参数。

功能及模型介绍

数字人服务端通过 WebSocket 与客户端保持长连接。在连接过程中,服务端会根据输入的音频(仅支持单声道PCM)以及从公共形象库中选择的2D数字人形象,实时生成播报视频,并将生成的视频流推送至视频直播RTC

注意:为了在客户端展示数字人实时生成的视频流画面,您需要自行从RTC拉取视频流并进行下一步处理。

image
说明

数字人实时对话模型(avatar-dialog)依托视频直播 RTC 实现推流功能,推流的视频分辨率为 1080P,收费类型为视频通话。RTC推流费用请参考实时音视频费用

服务端模型简介

模型名称

计费单价

限流(主账号与RAM子账号共用)

免费额度

任务下发接口QPS限制

同时处理中任务数量

模型名称

计费单价

限流(主账号与RAM子账号共用)

免费额度

任务下发接口QPS限制

同时处理中任务数量

avatar-dialog

0.01元/秒

1

1

免费额度:600

有效期:百炼开通后180天内

说明

数字人实时对话模型(avatar-dialog)目前正处于邀测阶段。您可通过百炼模型广场申请使用,审核通过后方可调用模型并获得免费额度。免费额度用完后,将按计费单价收费。

前提条件

  1. 在调用前,您需要开通模型服务并获取API Key,再配置API Key到环境变量

  2. 开通阿里云视频直播服务(RTC),然后创建应用,获取应用IDAppKey。

点击查看操作步骤

  1. 登录视频直播控制台开通视频云RTC服务。

  2. 点击创建应用

    image

  3. 选择刚创建的应用,在操作列点击管理,查看应用IDAppKey。

    image

  1. 请将以下变量配置到环境变量中,具体配置方法请参见配置API Key到环境变量

    • RTC_APP_ID(应用 ID)

    • RTC_APP_KEY(AppKey)

WebSocket客户端与服务端交互流程

按照时间顺序,客户端与服务端的交互流程如下所示。

第一阶段:初始化

  1. 建立连接,客户端与服务端建立WebSocket连接。

  2. 开启任务,并完成初始化。

    • 客户端发送InitializeVideoSession以开启数字人渲染任务;

    • 客户端收到task-started表示服务端连接已建立;

    • 客户端收到VideoSessionInitialized表示数字人服务已收到消息并开始处理;

    • 客户端收到VideoSessionStarted表示数字人画面已成功推流,标志着初始化完成。

第二阶段:根据音频实时生成视频

  1. 发送音频,实时生成视频

    • 客户端发送GenerateVideo传输数字人需要播报的音频给服务端;

    • 数字人开始当前轮次播报时,会返回AvatarStatusChanged,current_statusSPEAKING,表示数字人正在播报,处于说状态;

    • 数字人结束当前轮次播报时,会返回AvatarStatusChanged,current_statusLISTENING,表示数字人停止播报,处于听状态;

    • 在播报过程中,数字人会返回AvatarHeartbeat作为心跳信息;

    • 如果GenerateVideo时传输了sentenceId,数字人在每个sentence开始播报的时候返回SentenceStarted消息。

  2. 打断(可选)

第三阶段:触发心跳

  • 客户端通过发送TriggerHeartbeat主动触发数字人心跳。当前如果超过1分钟数字人没有收到消息,会主动断开,可以使用该消息主动维持心跳。

第四阶段:结束任务

  1. 结束任务

  2. 关闭连接,客户端收到消息后可以关闭当前的WebSocket连接。

image

说明
  • action:客户端向服务端发送的消息。

  • event:服务端向客户端返回的消息。

WebSocket客户端实现流程

在编写WebSocket客户端代码时,为了同时发送和接收消息,通常采用异步编程。您可以按照以下步骤来编写程序:

  1. 建立WebSocket连接:首先,初始化并建立与服务器的WebSocket连接。

  2. 异步监听服务器消息:启动一个独立的线程(或使用异步任务,具体实现方式取决于编程语言)来监听服务器返回的消息,并根据消息内容执行相应的操作。

  3. 发送消息:在与步骤2中不同的线程中(例如主线程或其他独立线程),向服务器发送消息。

  4. 关闭连接:在程序结束前,确保关闭WebSocket连接以释放资源,避免连接泄漏。

本文主要介绍通过WebSocket连接访问服务时的鉴权,以及客户端与服务端之间的消息交互流程。

1. 建立WebSocket链接

调用WebSocket库函数(具体实现方式因编程语言或库函数而异),将请求URL和请求头传入以建立WebSocket连接。

WebSocket URL固定为:

wss://dashscope.aliyuncs.com/api-ws/v1/inference

请求头中需添加如下鉴权信息:

{
  "Authorization": "Bearer <YOUR_DASHSCOPE_API_KEY>", // 将<YOUR_DASHSCOPE_API_KEY>替换成您自己的API Key
}

2. 发送并监听消息

建立连接后,客户端可以向服务端发送消息,也可以接收来自服务端的消息。消息均为JSON格式。

  • action消息:客户端向服务端发送的消息(客户端 → 服务端)。

  • event消息:服务端向客户端返回的消息(服务端 → 客户端)。

action消息数据格式
event消息数据格式
{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "streaming": "duplex",
    "action": "run-task"
  },
  "payload": {
    "input": {
      "header": {
        "name": ""
      },
      "payload": {}
    }
  }
}

字段

类型

必选

描述

示例值

header

Object

请求头

header.task_id

String

当次任务ID,建议使用随机生成的32UUID

97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx

header.action

String

发送给服务端的事件类型,不同的事件对应不同的action消息:

run-task

header.streaming

String

WebSocket交互方式

固定值为 duplex

payload.input

Object

payload.input.header

Object

header仅包含name字段,该字段用来区分不同的action消息,其取值包括以下几种:

InitializeVideoSession

payload.input.payload

Object

发送给服务端的关键信息(如请求参数、音频数据等),不同类型的action消息在该字段中包含各自特定的数据结构

event消息数据格式如下:

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "event": "task-started"
  },
  "payload": {
    "output": {
      "header": {
        "name": ""
      },
      "payload": {}
    }
  }
}

字段

类型

描述

header

Object

请求头

header.task_id

String

客户端生成的task_id

header.event

String

服务端返回的事件类型,不同的事件对应不同的event消息:

payload.output

Object

payload.output.header

Object

header仅包含name字段,该字段用来区分不同的event消息,其取值包括以下几种:

payload.output.payload

Object

服务端返回的关键信息(如数字人状态、错误信息等),不同类型的event消息在该字段中包含各自特定的数据结构

下面将按照WebSocket客户端与服务端的交互流程,介绍对应的action消息和event消息。

action消息:InitializeVideoSession

客户端发送InitializeVideoSession以开启数字人渲染任务。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "streaming": "duplex",
    "action": "run-task"
  },
  "payload": {
    "task_group": "aigc",
    "task": "video-generation",
    "function": "stream-generation",
    "model": "avatar-dialog",
    "input": {
      "header": {
        "name": "InitializeVideoSession",
      },
      "payload": {
        "avatar_id": "",
        "format": "PCM",
        "sample_rate": 16000,
        "rtc_param": {
          "app_id": "xxx",
          "channel_id": "xxx",
          "user_id": "xxx",
          "nonce": "xxx",
          "timestamp": 0,
          "token": "xxx",
          "gslb": ["xxx"]
        },
        "user_agent": {
          "client": "BAILIAN",
          "platform": "Dalvik/2.1.0 (Linux; U; Android 12; NOH-AL00 Build/HUAWEINOH-AL00)",
          "version": "x.x.xx",
          "app_type": "Dev"
        }
      }
    },
    "parameter": {}
  }
}

字段

类型

必选

描述

示例值

字段

类型

必选

描述

示例值

header

Object

action消息头

参考action消息数据格式

header.task_id

String

当次任务ID,建议使用随机生成的32UUID

97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx

header.streaming

String

WebSocket交互方式

固定值为 duplex

header.action

String

发送给服务端的事件类型

固定值为run-task

payload

Object

action消息体

payload.task_group

String

任务组

固定值为aigc

payload.task

String

任务类别

固定值为video-generation

payload.function

String

方法类型

固定值为stream-generation

payload.model

String

数字人服务

固定值为avatar-dialog

payload.input

Object

请求体的输入内容

payload.input.header

Object

输入内容的请求头

payload.input.header.name

String

消息名称

InitializeVideoSession

payload.input.payload

Object

发送给服务端的关键信息

见下

payload.input.payload参数

字段

类型

必选

描述

示例值

字段

类型

必选

描述

示例值

avatar_id

String

数字人ID,请在公共形象库中选择某一形象

taoji

format

String

音频格式,目前仅支持单声道PCM

PCM

sample_rate

Integer

音频采样率(单位Hz),可选值为:

  • 16000

  • 24000

  • 32000

  • 48000

16000

rtc_param

Obect

RTC入会参数

您需要开通视频直播RTC服务,请在RTC token获取相关参数

rtc_param.app_id

String

rtc_param.channel_id

String

rtc_param.user_id

String

rtc_param.nonce

String

rtc_param.timestamp

Integer

rtc_param.token

String

rtc_param.gslb

Array

RTC推流地址

固定值为["https://gw.rtn.aliyuncs.com"]

user_agent

Obect

用户设备型号

user_agent.client

String

用户设备的客户端

固定值为BAILIAN

user_agent.platform

String

用户拉流端平台拉,可选值为:

  • "Android"

  • "iOS"

  • "Web"

  • "Windows"

  • "Linux"

Android

event消息:VideoSessionInitialized

客户端会收到服务端发送的VideoSessionInitialized消息,表示数字人服务已收到初始化任务并开始处理。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "event": "result-generated"
  },
  "payload": {
    "output": {
      "header": {
        "name": "VideoSessionInitialized"
      },
      "payload": {
      }
    }
  }
}

event消息:VideoSessionStarted

客户端会收到服务端发送的VideoSessionStarted消息表示数字人画面已成功推流,标志着初始化完成。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "event": "result-generated"
  },
  "payload": {
    "output": {
      "header": {
        "name": "VideoSessionStarted"
      },
      "payload": {
      }
    }
  }
}

action消息:GenerateVideo

客户端调用GenerateVideo发送音频,开启数字人实时播报。该消息对每次发送的音频长度没有严格限制,通过speech_idsentence_id来标识和描述音频内容。

例如,当需要数字人播报一段10秒的音频时,这一段完整的播报被称为一个speech。这10秒的音频可以通过多次调用GenerateVideo以流式方式发送至服务端,只需确保这些消息具有相同的speech_id即可。

此外,一个speech可以包含多个sentence(句子)。每当一个sentence开始播报时,客户端会收到服务端发送的SentenceStarted消息。sentence主要用于字幕对齐,实现语音与文字的实时同步。

为什么需要sentence来进行字幕对齐?

在上述实现过程中,服务端会在每个sentence开始播报时发送一个SentenceStarted消息 。该消息提供了句子开始时间的明确信号,客户端可以根据这一信号调整字幕的显示时间,从而确保语音与字幕的同步。

sentence支持划分语音内容,并结合时间信号实现语音与字幕的同步。其划分方式可以由客户端根据具体需求灵活定义。例如:

  • 实时同步场景 :将一句话设置为一个sentence,适用于需要逐句显示字幕的情况(推荐)。

  • 内容连贯场景 :将多句话合并为一个sentence,适用于语义连贯或无需逐句显示字幕的情况。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "action": "continue-task"
  },
  "payload": {
    "input": {
      "header": {
        "name": "GenerateVideo"
      },
      "payload": {
        "speech_id": "speech_id",
        "sentence_id": "sentence_id",
        "audio_data": "base64==",
        "end_of_speech": false
      }
    }
  }
}

字段

类型

必选

描述

示例值

字段

类型

必选

描述

示例值

header

Object

action消息头

参考action消息数据格式

header.task_id

String

当次任务ID,建议使用随机生成的32UUID

97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx

header.action

String

发送给服务端的事件类型

固定值为continue-task

payload

Object

action消息体

payload.input

Object

请求体的输入内容

payload.input.header

Object

输入内容的请求头

payload.input.header.name

String

消息名称

GenerateVideo

payload.input.payload

Object

发送给服务端的关键信息

见下

payload.input.payload参数

字段

类型

必选

描述

示例值

字段

类型

必选

描述

示例值

speech_id

String

speech ID(数字人的一段完整播报被称为一个speech),该值由客户端自定义,需要确保在同一次会话(session)中不变

sentence_id

String

用于划分播报内容,该值由客户端自定义,需要确保在同一次会话(session)中不变

audio_data

String

Base64编码格式的音频数据,您可以使用音频处理库将原始音频(单声道,采样位数为16bit)转换为 Base64 编码格式,Python示例如下:

# 原始音频数据(二进制字节流)
origin_audio_data = bytes()

# 将音频字节流转换为 Base64 字符串
base64_audio_data = base64.b64encode(origin_audio_data).decode("utf8")

end_of_speech

Bool

一个speech中的最后一次发送GenerateVideo消息时,需要将end_of_speech置为true,表示当前speech的数据已全部发送

false

event消息:SentenceStarted

当发送GenerateVideo消息并传入sentence_id时,每句播报开始时,服务端会返回SentenceStarted消息。此机制适用于字幕与音频在句子级别的对齐,表示数字人开始播报对应sentence_id的句子。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "event": "result-generated"
  },
  "payload": {
    "output": {
      "header": {
        "name": "SentenceStarted"
      },
      "payload": {
        "speech_id": "speech_id",
        "sentence_id": "sentence_id"
      }
    }
  }
}

字段

类型

描述

字段

类型

描述

speech_id

String

当前speechid,取决于发送GenerateVideo消息时传入的speech_id

sentence_id

String

当前开始播报的sentence_id,取决于发送GenerateVideo消息时传入的sentence_id

action消息:TriggerHeartbeat

目前,数字人服务具备超时断开机制。如果客户端在1分钟内未向数字人发送数据,WebSocket连接将自动断开。为避免这种情况,客户端可主动触发心跳机制。当服务端接收到心跳消息后,会返回一个AvatarHeartbeat消息,从而保持WebSocket连接的活跃状态。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "action": "continue-task"
  },
  "payload": {
    "input": {
      "header": {
        "name": "TriggerHeartbeat"
      },
      "payload": {
      }
    }
  }
}

event消息:AvatarHeartbeat

服务端会在以下三种场景中返回心跳消息(AvatarHeartbeat):

  • 持续播报 :当数字人基于客户端输入的音频持续生成播报画面时,服务端会以固定时间间隔(约5秒)返回心跳消息。

  • 状态变更 :当客户端发送ChangeAvatarStatus消息时,服务端会返回心跳消息。

  • 主动上报心跳 :当客户端发送TriggerHeartbeat消息时,服务端会响应并返回心跳消息。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "event": "result-generated"
  },
  "payload": {
    "output": {
      "header": {
        "name": "AvatarHeartbeat"
      },
      "payload": {
      }
    }
  }
}

event消息:AvatarStatusChanged

数字人目前包含两种状态:听(LISTENING)说(SPEAKING)。在交互过程中,AvatarStatusChanged 消息的 current_status 字段会根据数字人的状态变化进行更新:

  • 当数字人开始当前轮次播报时,current_status 设置为 SPEAKING。

  • 当数字人结束当前轮次播报时,current_status 设置为 LISTENING。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "event": "result-generated"
  },
  "payload": {
    "output": {
      "header": {
        "name": "AvatarStatusChanged"
      },
      "payload": {
        "current_status": "LISTENING",
        "speech_id": "speech_id"
      }
    }
  }
}

字段

类型

描述

字段

类型

描述

current_status

String

当前数字人的状态:LISTENING 或 SPEAKING

speech_id

String

当前speech_id,取决于发送GenerateVideo消息时传入的speech_id

action消息:ChangeAvatarStatus

通过主动切换数字人状态,您可以打断数字人播报,使其从说(SPEAKING)切换至听(LISTENING)。

当数字人状态成功切换时,服务端会返回AvatarStatusChanged消息。 需要注意的是,如果数字人在收到打断消息时并未处于说状态(SPEAKING),则不会返回AvatarStatusChanged消息。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "action": "continue-task"
  },
  "payload": {
    "input": {
      "header": {
        "name": "ChangeAvatarStatus"
      },
      "payload": {
        "target_status": "LISTENING"
      }
    }
  }
}

字段

类型

必选

描述

示例值

字段

类型

必选

描述

示例值

header

Object

action消息头

参考action消息数据格式

header.task_id

String

当次任务ID,建议使用随机生成的32UUID

97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx

header.action

String

发送给服务端的事件类型

固定值为continue-task

payload

Object

action消息体

payload.input

Object

请求体的输入内容

payload.input.header

Object

输入内容的请求头

payload.input.header.name

String

消息名称

ChangeAvatarStatus

payload.input.payload

Object

发送给服务端的关键信息

见下

payload.input.payload参数

字段

类型

必选

描述

示例值

字段

类型

必选

描述

示例值

target_status

String

数字人状态:

  • SPEAKING:说

  • LISTENING:听

LISTENING

3. 关闭WebSocket连接

action消息:DestroyVideoSession

客户端发送销毁数字人Session消息,结束此次会话。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "action": "finish-task"
  },
  "payload": {
    "input": {
      "header": {
        "name": "DestroyVideoSession"
      }
    }
  }
}

event消息:VideoSessionDestroyed

数字人Session销毁完成,当客户端收到该消息后,可以断开WebSocket连接。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "event": "task-finished"
  },
  "payload": {
    "output": {
      "header": {
        "name": "VideoSessionDestroyed"
      }
    }
  }
}

4. 处理任务失败

event消息:AvatarProcessError

当模型处理异常时,会返回错误消息AvatarProcessError并中断会话。此时,客户端需要关闭 WebSocket 连接。

您可以根据返回的 status_codestatus_namemessage 查阅错误码,按要求处理。

{
  "header": {
    "task_id": "97d75ef6-8604-4b86-bac0-xxxxxxxxxxxx",
    "event": "task-failed",
    "status_code": "400",
    "status_name": "InvalidParameter"
  },
  "payload": {
    "output": {
      "header": {
        "name": "AvatarProcessError"
      },
      "payload": {
        "message": "xxx"
      }
    }
  }
}

测试示例

  1. 请参见前提条件DASHSCOPE_API_KEY、RTC_APP_ID、RTC_APP_KEY配置到环境变量。

  2. 配置Python环境

    • 推荐使用Python3.10及以上版本。

    • 安装必要的依赖包。

    pip install loguru numpy asyncio websockets==15.0.1
  3. 新建一个demo.py文件,并将以下 Python 代码复制到该文件中。

点击打开Python示例代码

import asyncio
import hashlib
import os
import websockets
import base64
import json
import wave
import time
import uuid
import argparse
from copy import deepcopy
from loguru import logger
import numpy as np
import struct


parser = argparse.ArgumentParser()
parser.add_argument("--loop_num", type=int, default=1, help="loop_num")
parser.add_argument("--concurrent_num", type=int, default=1, help="concurrent_num")
parser.add_argument("--log_file_name", type=str, default="default.log", help="log_file_name")
args = parser.parse_args()

DS_WSS_ENDPOINT = "wss://dashscope.aliyuncs.com/api-ws/v1/inference"
MODEL = "avatar-dialog"
AVATAR_ID = "jiaoyue"
DS_AUTH = "Bearer " + os.getenv("DASHSCOPE_API_KEY")

AUDIO_PATH = "wav/audio48k.wav"	# 测试音频路径
TEST_TIME = 3600 * 8  # 测试时间 单位秒

RTC_H5_URL = "https://market.wapa.taobao.com/app/xr-paas/realtime-avatar-preview/index.html#/rtc"

AUDIO_COPY_TIMES = 1
AUDIO_SLICE_RATE = 2
sample_rata = 16000

# MAX_AUDIO_DURATION = 10
MAX_AUDIO_DURATION = -1

LOG_FILE_NAME = args.log_file_name  # 日志名

logger.add(LOG_FILE_NAME, rotation="1 week")

CHANNEL_ID = "test_channel"


def get_rtc_param(user_id):
    app_id = os.getenv("RTC_APP_ID")
    app_key = os.getenv("RTC_APP_KEY")
    timestamp = int(time.time()) + 24 * 60 * 60
    sha256_hash = hashlib.sha256()
    data = f"{app_id}{app_key}{CHANNEL_ID}{user_id}{timestamp}"
    sha256_hash.update(data.encode())
    token = sha256_hash.hexdigest()
    return {
        "app_id": app_id,
        "channel_id": CHANNEL_ID,
        "user_id": user_id,
        "nonce": "",
        "timestamp": timestamp,
        "token": token,
        "gslb": ["https://rgslb.rtc.aliyuncs.com"]
    }


def get_subscribe_url():
    rtc_param = get_rtc_param(user_id=str(uuid.uuid4()))
    url = (f"{RTC_H5_URL}?userId={rtc_param['user_id']}"
           f"&channelId={CHANNEL_ID}"
           f"&appId={rtc_param['app_id']}"
           f"&token={rtc_param['token']}"
           f"&timestamp={rtc_param['timestamp']}"
           f"&userName={str(uuid.uuid4())}"
          )
    return url


def get_init_msg(task_id):
    msg_initialize_video_session = {
        "header": {
            "task_id": task_id,
            "streaming": "duplex",
            "action": "run-task"
        },
        "payload": {
            "task_group": "aigc",
            "task": "video-generation",
            "function": "stream-generation",
            "model": MODEL,
            "input": {
                "header": {
                    "name": "InitializeVideoSession",
                    "request_id": str(uuid.uuid4())
                },
                "payload": {
                    "avatar_id": AVATAR_ID,
                    "format": "PCM",
                    "sample_rate": sample_rata,
                    "session_config": {
                        "enable_motion_data": False,
                        "motion_data_compression": "zip",
                        "motion_data_audio_encoder": "opus"
                    },
                    "h5_mode_enable": True,
                    "user_agent": {
                        "client": "BAILIAN",
                        "platform": "Dalvik/2.1.0 (Linux; U; Android 12; NOH-AL00 Build/HUAWEINOH-AL00)",
                        "version": "3.18.0",
                        "app_type": "Dev"
                    },
                    "rtc_param": get_rtc_param("test_user")
                }
            },
            "parameter": {}
        }
    }
    return json.dumps(msg_initialize_video_session)


def get_alive_message(task_id):
    msg_destroy_video_session = {
        "header": {
            "task_id": task_id,
            "streaming": "duplex",
            "action": "continue-task"
        },
        "payload": {
            "input": {
                "header": {
                    "name": "TriggerHeartbeat",
                    "request_id": str(uuid.uuid4())
                },
                "payload": {}
            }
        }
    }
    return json.dumps(msg_destroy_video_session)


def get_destroy_message(task_id):
    msg_destroy_video_session = {
        "header": {
            "task_id": task_id,
            "streaming": "duplex",
            "action": "finish-task"
        },
        "payload": {
            "input": {
                "header": {
                    "name": "DestroyVideoSession",
                    "request_id": str(uuid.uuid4())
                },
                "payload": {}
            }
        }
    }
    return json.dumps(msg_destroy_video_session)


def get_send_speech_text_message(task_id):
    send_speech_text_message = {
        "header": {
            "task_id": task_id,
            "streaming": "duplex",
            "action": "continue-task"
        },
        "payload": {
            "input": {
                "header": {
                    "name": "SendSpeechText",
                    "request_id": str(uuid.uuid4())
                },
                "payload": {
                    "task_id": task_id,
                    "source": "llm",
                    "mode": "full_text",
                    "text": "你好,我是云小宝",
                    "end_of_task": True
                }
            }
        }
    }
    return json.dumps(send_speech_text_message)


def get_inner_msg(msg):
    msg_json = json.loads(msg)
    output = msg_json["payload"].get("output", None)
    usage_msg = msg_json["payload"].get("usage", None)
    return output, usage_msg


def read_wav(wav_path: str):
    with wave.open(wav_path, 'rb') as wav_file:
        frame_rate = wav_file.getframerate()
        num_frames = wav_file.getnframes()
        audio_data = wav_file.readframes(num_frames)
        if MAX_AUDIO_DURATION > 0:
            audio_data = audio_data[:int(frame_rate * MAX_AUDIO_DURATION * 2)]
        return audio_data, frame_rate

def read_pcm_to_bytes(file_path: str, sample_rate):
    with open(file_path, 'rb') as f:
        audio_data = f.read()
    return audio_data, sample_rate

def read_pcm_file(file_path, sample_rate, sample_width=2, num_channels=1):
    """
    读取 PCM 格式的音频文件。

    :param file_path: PCM 文件的路径。
    :param sample_rate: PCM 音频的采样率。
    :param sample_width: 每个音频样本的字节数(例如,16 位音频对应2字节)。
    :param num_channels: 音频的通道数(例如,单声道是1,立体声是2)。
    :return: 一个包含音频数据的 numpy 数组,以及采样率。
    """
    # 打开 PCM 文件读取二进制数据
    with open(file_path, 'rb') as pcm_file:
        # 读取所有数据
        pcm_data = pcm_file.read()

    # 确定每个样本的格式
    if sample_width == 1:
        sample_format = 'b'  # 8-bit PCM
    elif sample_width == 2:
        sample_format = 'h'  # 16-bit PCM
    else:
        raise ValueError("Unsupported sample width: {}".format(sample_width))

    # 计算样本数量
    num_samples = len(pcm_data) // (sample_width * num_channels)

    # 使用 struct.unpack 解码二进制数据到整数
    audio_data = struct.unpack('<' + sample_format * num_samples * num_channels, pcm_data)

    # 将数据转换为 numpy 数组
    audio_array = np.array(audio_data, dtype=np.int16)

    # 如果是多通道音频,将数据重塑为(num_samples, num_channels)的形状
    if num_channels > 1:
        audio_array = audio_array.reshape(-1, num_channels)

    return audio_array, sample_rate


def get_audio_msg_list(task_id):
    audio_path = AUDIO_PATH
    audio_file, audio_sample_rate = read_wav(audio_path)
    # audio_file, audio_sample_rate = read_pcm_to_bytes(audio_path, 16000)
    global sample_rata
    sample_rata = audio_sample_rate
    audio_file = audio_file * AUDIO_COPY_TIMES
    end_of_task = False
    start = 0
    msg_list = []

    while not end_of_task:
        end = int(start + audio_sample_rate * AUDIO_SLICE_RATE)
        audio_data = audio_file[start:end]
        end_of_task = end >= len(audio_file) - 1
        audio_base64 = base64.b64encode(audio_data).decode("utf8")
        request_id = str(uuid.uuid4())
        msg_generate_video = {
            "header": {
                "task_id": task_id,
                "streaming": "duplex",
                "action": "continue-task"
            },
            "payload": {
                "input": {
                    "header": {
                        "name": "GenerateVideo",
                        "request_id": request_id
                    },
                    "payload": {
                        "speech_id": "",
                        "audio_data": "",
                        "end_of_task": end_of_task
                    }
                }
            }
        }
        msg_generate_video["payload"]["input"]["payload"]["audio_data"] = audio_base64
        msg_generate_video["payload"]["input"]["payload"]["end_of_task"] = end_of_task
        msg_generate_video["payload"]["input"]["payload"]["end_of_speech"] = end_of_task
        msg_list.append(msg_generate_video)
        if end_of_task:
            break
        else:
            start = end
    return msg_list


def update_msg_speech_id(msg_list):
    task_id = str(uuid.uuid4())
    for msg in msg_list:
        msg["payload"]["input"]["payload"]["task_id"] = task_id
        msg["payload"]["input"]["payload"]["speech_id"] = task_id


def update_msg_request_id(msg_list):
    request_id = str(uuid.uuid4())
    for msg in msg_list:
        msg["payload"]["input"]["header"]["request_id"] = request_id


class TimeCost:
    def __init__(self):
        self.init_start = None
        self.first_audio_data_send_ts = None
        self.first_motion_data_recv_ts = None


async def init(worker_num, ws, rec_timer):
    while True:
        msg = await ws.recv()
        logger.info("{}", msg)
        inner_msg, usage_msg = get_inner_msg(msg)
        if inner_msg and inner_msg["header"]["name"] == "VideoSessionInitialized":
            latency = int((time.time() - rec_timer.init_start) * 1000)
            logger.info(f"worker {worker_num} =================== initCost(in ms): {latency}")
            try:
                rtc_param = inner_msg["payload"]["rtc_param"]
                channel_id = rtc_param["channel_id"]
                app_id = rtc_param["app_id"]
                user_id = rtc_param["user_id"]
                token = rtc_param["token"]
                timestamp = rtc_param["timestamp"]

                assert app_id != ""

                url = (f"{RTC_H5_URL}?userId={user_id}"
                    f"&channelId={channel_id}"
                    f"&appId={app_id}"
                    f"&token={token}"
                    f"&timestamp={timestamp}"
                    f"&userName={str(uuid.uuid4())}"
                    )
            except:
                url = get_subscribe_url()
            print(f"Opening rtc channel {url}")
        elif inner_msg and inner_msg["header"]["name"] == "VideoSessionStarted":
            latency = int((time.time() - rec_timer.init_start) * 1000)
            logger.info(f"worker {worker_num} =================== startCost(in ms): {latency}")
            return


async def destroy(worker_num, ws, rec_timer):
    while True:
        msg = await ws.recv()
        inner_msg, usage_msg = get_inner_msg(msg)
        logger.info("receive message: {}", inner_msg)
        if inner_msg and inner_msg["header"]["name"] == "VideoSessionDestroyed":
            logger.info(f"worker {worker_num} =================== VideoSessionDestroyed")
            logger.info("任务结束,查看计量: {}", usage_msg)
            return


async def send_audio_messages(worker_num, ws, msg_list, rec_timer, alive_msg):
    for j in range(int(TEST_TIME / 10)):
        uu_msg = deepcopy(msg_list)
        update_msg_speech_id(uu_msg)
        update_msg_request_id(uu_msg)
        total_audio_duration = 0
        start_send_time = time.time()
        for i, message in enumerate(uu_msg):
            await ws.send(json.dumps(message))
            if i == 0:
                speech_id = message["payload"]["input"]["payload"]["task_id"]
                logger.info(
                    f"worker {worker_num} first send_audio_messages cost: {int((time.time() - rec_timer.first_audio_data_send_ts) * 1000)} speech_id: {speech_id}")
                rec_timer.first_audio_data_send_ts = time.time()
            current_audio_duration = AUDIO_SLICE_RATE / 2
            total_audio_duration += current_audio_duration
            sleep_time = current_audio_duration / 2
            await asyncio.sleep(sleep_time)
        time_to_sleep = (total_audio_duration - (time.time() - start_send_time)) + 5
        logger.info("total audio duration: {}, sleep time: {}", total_audio_duration, time_to_sleep)
        await asyncio.sleep(time_to_sleep)
        await ws.send(alive_msg)


async def demo(uri, auth_token, audio_msg_list, timer):
    global ws
    task_id = str(uuid.uuid4())
    try:
        timer = TimeCost()
        worker_num = 0
        async with websockets.connect(uri, ping_interval=300, ping_timeout=300,
                                      additional_headers={"Authorization": auth_token}, open_timeout=300) as websocket:
            ws = websocket
            init_message = get_init_msg(task_id)
            await websocket.send(init_message)
            timer.init_start = time.time()
            logger.info(f"demo init_start")
            await init(worker_num, websocket, timer)

            send_speech_text_message = get_send_speech_text_message(task_id)
            await websocket.send(send_speech_text_message)

            timer.first_audio_data_send_ts = time.time()
            alive_msg = get_alive_message(task_id)
            send_task = asyncio.create_task(
                send_audio_messages(worker_num, websocket, audio_msg_list, timer, alive_msg))

            # create destroy loop to recv message
            destroy_task = asyncio.create_task(
                destroy(worker_num, websocket, timer)
            )

            await send_task

            destroy_message = get_destroy_message(task_id)
            await websocket.send(destroy_message)
            logger.info(f"worker {worker_num}, sent destroy msg: {destroy_message}")
            
            await destroy_task

    except Exception as e:
        logger.opt(exception=True).error("{}", e)
    finally:
        if ws:
            await close_ws(ws, task_id)


async def async_only():
    audio_msg_list = get_audio_msg_list(str(uuid.uuid4()))
    timer = TimeCost()
    await demo(DS_WSS_ENDPOINT, DS_AUTH, audio_msg_list, timer)


async def close_ws(websocket, task_id):
    try:
        logger.info("ws state: {}", websocket.state)
        if websocket.state != 1:
            return
        destroy_message = get_destroy_message(task_id)
        await websocket.send(destroy_message)
        logger.info(f"worker {0}, sent destroy msg: {destroy_message}")
        await destroy(0, websocket, None)
    except Exception as e:
        logger.opt(exception=True).error("Failed to close WebSocket: {}", e)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(async_only())
    except KeyboardInterrupt:
        print("KeyboardInterrupt, stop")
        if ws:
            loop.run_until_complete(close_ws(ws))
        # 取消所有任务
        tasks = [t for t in asyncio.all_tasks(loop) if t is not asyncio.current_task(loop)]
        for task in tasks:
            task.cancel()
        loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
        print("All tasks cancelled.")
    finally:
        loop.stop()
        loop.close()
  1. 下载audio48k.wav到本地,视频文件位置与demo.py的文件目录结构如下:

avatar
├── demo.py                                # Python 示例代码
└── wav                                    # 存放音频文件的文件夹
    └── audio48k.wav                       # 示例音频文件
  1. 运行demo.py。

python demo.py

脚本运行后,日志中会打印一个 URL。在浏览器中打开该地址查看数字人播报的推流画面。如果遇到黑屏情况,刷新页面即可恢复正常。

image

  1. 推流画面展示如下所示。

image

错误码

接口状态(status_code)

错误消息(message)

说明

接口状态(status_code)

错误消息(message)

说明

400

rtc_param is required

需要客户端提供RTC入会参数

400

avatar_id xxx invalid, download avatar resource failed

avatar_id不存在,请在公共形象库中选择某一形象

400

sample_rate must be 16000, 24000, 32000, 48000

音频采样率设置错误

400

Avatar receive message from client timeout, since last receive xxx

客户端1分钟未发送消息,服务端会主动断开连接

下一步

本服务依赖视频直播云产品中的 RTC 能力,生成的音视频流将被推送到 RTC 中。如需获取音视频流,您需要接入 RTC 客户端,并开发拉流相关逻辑。

公共形象库

AvatarId表示数字人形象ID。请复制 AvatarId 的值,如jiaoyue,在上述WebSocket API 中使用。

AvatarId: taoji

名称:桃叽

image

AvatarId: aria

名称:Aria

image

AvatarId: jiaoyue

名称:椒月

image

AvatarId: shian

名称:时安

image

AvatarId: meike

名称:莓可

image

AvatarId: yanqiu

名称:砚秋

image

AvatarId: tangli

名称:棠梨

image

AvatarId: xingyao

名称:星瑶

image

AvatarId: lengzhou

名称:棱舟

image

AvatarId: mowen

名称:墨翁

image

常见问题

为什么运行demo.py后报错AccessDenied?

您没有开通数字人实时对话avatar-dialog模型,无权限访问。请在百炼模型广场申请使用该模型,申请通过后再重试。

报错信息如下:

{
    "header": {
        "task_id": "a9b6f562-af91-4076-9b37-xxxxxx",
        "event": "task-failed",
        "error_code": "AccessDenied",
        "error_message": "Access denied.",
        "attributes": {}
    },
    "payload": {}
}

  • 本页导读 (1)
  • 功能及模型介绍
  • 前提条件
  • WebSocket客户端与服务端交互流程
  • WebSocket客户端实现流程
  • 1. 建立WebSocket链接
  • 2. 发送并监听消息
  • 3. 关闭WebSocket连接
  • 4. 处理任务失败
  • 测试示例
  • 错误码
  • 下一步
  • 公共形象库
  • 常见问题