语音合成CosyVoice WebSocket API

本文介绍如何通过WebSocket连接访问CosyVoice语音合成服务。

DashScope SDK目前仅支持JavaPython。若想使用其他编程语言开发CosyVoice语音合成应用程序,可以通过WebSocket连接与服务进行通信。

用户指南:关于模型介绍和选型建议请参见实时语音合成-CosyVoice/Sambert

WebSocket是一种支持全双工通信的网络协议。客户端和服务器通过一次握手建立持久连接,双方可以互相主动推送数据,因此在实时性和效率方面具有显著优势。

对于常用编程语言,有许多现成的WebSocket库和示例可供参考,例如:

  • Go:gorilla/websocket

  • PHP:Ratchet

  • Node.js:ws

建议您先了解WebSocket的基本原理和技术细节,再参照本文进行开发。

前提条件

已开通服务并获取API Key。请配置API Key到环境变量,而非硬编码在代码中,防范因代码泄露导致的安全风险。

说明

当您需要为第三方应用或用户提供临时访问权限,或者希望严格控制敏感数据访问、删除等高风险操作时,建议使用临时鉴权Token

与长期有效的 API Key 相比,临时鉴权 Token 具备时效性短(60秒)、安全性高的特点,适用于临时调用场景,能有效降低API Key泄露的风险。

使用方式:在代码中,将原本用于鉴权的 API Key 替换为获取到的临时鉴权 Token 即可。

模型与价格

模型名称

单价

免费额度

cosyvoice-v3-plus

2元/万字符

202511150点前开通阿里云百炼:2000字符

202511150点后开通阿里云百炼:1万字符

有效期:阿里云百炼开通后90天内

cosyvoice-v3-flash

1元/万字符

cosyvoice-v2

2元/万字符

cosyvoice-v1

语音合成文本限制与格式规范

文本长度限制

单次通过continue-task指令发送的待合成文本长度不得超过 2000 字符,多次调用continue-task指令累计发送的文本总长度不得超过 20 万字符。

字符计算规则

  • 汉字(包括简/繁体汉字、日文汉字和韩文汉字)按2个字符计算,其他所有字符(如标点符号、字母、数字、日韩文假名/谚文等)均按 1个字符计算

  • 计算文本长度时,不包含SSML 标签内容

  • 示例:

    • "你好" → 2(你)+2(好)=4字符

    • "中A123" → 2(中)+1(A)+2(文)+1(1)+1(2)+1(3)=8字符

    • "中文。" → 2(中)+2(文)+1(。)=5字符

    • "中 文。" → 2(中)+1(空格)+2(文)+1(。)=6字符

    • "<speak>你好</speak>" → 2(你)+2(好)=4字符

编码格式

需采用UTF-8编码。

数学表达式支持说明

当前数学表达式解析功能仅适用于cosyvoice-v2cosyvoice-v3-flashcosyvoice-v3-plus模型,支持识别中小学常见的数学表达式,包括但不限于基础运算、代数、几何等内容。

详情请参见LaTeX 公式转语音

SSML标记语言支持说明

当前SSML(Speech Synthesis Markup Language,语音合成标记语言)功能仅cosyvoice-v3-plus、cosyvoice-v3-flashcosyvoice-v2模型的部分音色可用,使用时需满足以下条件:

使用方式如下:

  1. 在发送run-task指令时,将参数enable_ssml设置为true,以开启SSML支持;

  2. 随后通过continue-task指令发送包含SSML的文本。

重要

开启 SSML 支持(即将 enable_ssml 参数设为 true)后,仅允许通过一次continue-task指令提交完整的待合成文本,不支持多次发送。

交互流程

image

客户端发送给服务端的消息称作指令;服务端返回给客户端的消息有两种:JSON格式的事件和二进制音频流。

按时间顺序,客户端与服务端的交互流程如下:

  1. 建立连接:客户端与服务端建立WebSocket连接。

  2. 开启任务:

    • 客户端发送run-task指令以开启任务。

    • 客户端收到服务端返回的task-started事件,标志着任务已成功开启,可以进行后续步骤。

  3. 发送待合成文本:

    客户端按顺序向服务端发送一个或多个包含待合成文本的continue-task指令,服务端接收到完整语句后返回音频流(文本长度有约束, 详情参见continue-task指令text字段描述)。

    说明

    您可以多次发送continue-task指令,按顺序提交文本片段。服务端接收文本片段后自动进行分句:

    • 完整语句立即合成,此时客户端能够接收到服务端返回的音频

    • 不完整语句缓存至完整后合成,语句不完整时服务端不返回音频

    当发送finish-task指令时,服务端会强制合成所有缓存内容。

  4. 通知服务端结束任务:

    待文本发送完毕后,客户端发送finish-task指令通知服务端结束任务,并继续接收服务端返回的音频流(注意不要遗漏该步骤,否则可能收不到语音或收不到结尾部分的语音)。

  5. 任务结束:

    客户端收到服务端返回的task-finished事件,标志着任务结束。

  6. 关闭连接:客户端关闭WebSocket连接。

URL

WebSocket URL固定如下:

wss://dashscope.aliyuncs.com/api-ws/v1/inference

Headers

请求头中需添加如下信息:

{
    "Authorization": "bearer <your_dashscope_api_key>", // 必选 将<your_dashscope_api_key>替换成您自己的API Key
    "user-agent": "your_platform_info", // 可选
    "X-DashScope-WorkSpace": workspace, // 可选,阿里云百炼业务空间ID
    "X-DashScope-DataInspection": "enable"
}

指令(客户端→服务端)

指令是客户端发送给服务端的消息,为JSON格式,以Text Frame方式发送,用于控制任务的起止和标识任务边界。

发送指令需严格遵循以下时序,否则可能导致任务失败:

  1. 发送 run-task指令

  2. 发送continue-task指令

    • 用于发送待合成文本。

    • 必须在接收到服务端返回的task-started事件后,才能发送此指令。

  3. 发送finish-task指令

1. run-task指令:开启任务

该指令用于开启语音合成任务。可在该指令中对音色、采样率等请求参数进行设置。

重要
  • 发送时机:WebSocket连接建立后。

  • 不要发送待合成文本:此处发送合成文本不利于问题排查,因此应避免在此发送文本。

示例:

{
    "header": {
        "action": "run-task",
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx", // 随机uuid
        "streaming": "duplex"
    },
    "payload": {
        "task_group": "audio",
        "task": "tts",
        "function": "SpeechSynthesizer",
        "model": "cosyvoice-v2",
        "parameters": {
            "text_type": "PlainText",
            "voice": "longxiaochun_v2",            // 音色
            "format": "mp3",		        // 音频格式
            "sample_rate": 22050,	        // 采样率
            "volume": 50,			// 音量
            "rate": 1,				// 语速
            "pitch": 1				// 音调
        },
        "input": {// input不能省去,不然会报错
        }
    }
}

header参数说明:

参数

类型

是否必选

说明

header.action

string

指令类型。

当前指令中,固定为"run-task"。

header.task_id

string

当次任务ID。

32位通用唯一识别码(UUID),由32个随机生成的字母和数字组成。可以带横线(如 "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx")或不带横线(如 "2bf83b9abaeb4fda8d9axxxxxxxxxxxx")。大多数编程语言都内置了生成UUIDAPI,例如Python:

import uuid

def generateTaskId(self):
    # 生成随机UUID
    return uuid.uuid4().hex

在后续发送continue-task指令finish-task指令时,用到的task_id需要和发送run-task指令时使用的task_id保持一致。

header.streaming

string

固定字符串:"duplex"

payload参数说明:

参数

类型

是否必选

说明

payload.task_group

string

固定字符串:"audio"。

payload.task

string

固定字符串:"tts"。

payload.function

string

固定字符串:"SpeechSynthesizer"。

payload.model

string

指定模型。

不同版本的模型编码方式一致,但使用时须确保模型(model)与音色(voice)匹配:每个版本的模型只能使用本版本的默认音色或专属音色。

payload.input

object

  • 如果不在此时发送待合成文本,input格式为:

    "input": {}
  • 如果在此时发送待合成文本,input格式为:

    "input": {
      "text": "今天天气怎么样?" // 待合成文本
    }

payload.parameters

text_type

string

固定字符串:“PlainText”。

voice

string

指定语音合成所使用的音色。

支持默认音色和专属音色:

⚠️ 使用声音复刻系列模型合成语音时,仅能使用该模型复刻生成的专属音色,不能使用默认音色。

⚠️ 使用专属音色合成语音时,语音合成模型(model)必须与声音复刻模型(target_model)相同。

format

string

音频编码格式。

  • 所有模型均支持的编码格式:pcm、wavmp3(默认)

  • cosyvoice-v1外,其他模型支持的编码格式:opus

音频格式为opus时,支持通过bit_rate参数调整码率。

sample_rate

integer

音频采样率,支持下述采样率(单位:Hz):

8000, 16000, 22050(默认), 24000, 44100, 48000。

说明

默认采样率代表当前音色的最佳采样率,缺省条件下默认按照该采样率输出,同时支持降采样或升采样。

volume

integer

音量,取值范围:0~100。默认值:50。

rate

float

合成音频的语速,取值范围:0.5~2。

  • 0.5:表示默认语速的0.5倍速。

  • 1:表示默认语速。默认语速是指模型默认输出的合成语速,语速会依据每一个音色略有不同,约每秒钟4个字。

  • 2:表示默认语速的2倍速。

默认值:1.0。

pitch

float

合成音频的语调,取值范围:0.5~2。

默认值:1.0。

enable_ssml

boolean

是否开启SSML功能。

该参数设为 true 后,仅允许发送一次文本,支持纯文本或包含SSML的文本。

bit_rate

int

指定音频的码率,取值范围:6~510kbps。

码率越大,音质越好,音频文件体积越大。

仅在音频格式(format)为opus时可用。

cosyvoice-v1模型不支持该参数。

word_timestamp_enabled

boolean

是否开启字级别时间戳,默认为false关闭。

cosyvoice-v2支持该功能。

seed

int

生成时使用的随机数种子,使合成的效果产生变化。默认值0。取值范围:0~65535。

cosyvoice-v1不支持该功能。

language_hints

array[string]

提供语言提示,仅cosyvoice-v3-flash、cosyvoice-v3-plus支持该功能。

在语音合成中有如下作用:

  1. 指定 TN(Text Normalization,文本规范化)处理所用的语言,影响数字、缩写、符号等的朗读方式(仅中文、英文生效)。

    取值范围:

    • zh:中文

    • en:英文

  2. 指定语音合成的目标语言(仅限复刻音色),帮助提升合成效果准确性,对英文、法语、德语、日语、韩语、俄语生效(无需填写中文)。须和声音复刻时使用的languageHints/language_hints一致。

    取值范围:

    • en:英文

    • fr:法语

    • de:德语

    • ja:日语

    • ko:韩语

    • ru:俄语

若设置的语言提示与文本内容明显不符(如为中文文本设置en),将忽略此提示,并依据文本内容自动检测语言。

注意:此参数为数组,但当前版本仅处理第一个元素,因此建议只传入一个值。

instruction

String

设置提示词。仅cosyvoice-v3-flash、cosyvoice-v3-plus支持该功能。

在语音合成中有如下作用:

  1. 指定小语种(仅限复刻音色)

    • 格式:“你会用<小语种>说出来。”(注意,结尾一定不要遗漏句号,使用时将“<小语种>”替换为具体的小语种,例如替换为德语)。

    • 示例:“你会用德语说出来。

    • 支持的小语种:法语、德语、日语、韩语、俄语。

  2. 指定方言(仅限复刻音色)

    • 格式:“请用<方言>表达。”(注意,结尾一定不要遗漏句号,使用时将“<方言>”替换为具体的方言,例如替换为广东话)。

    • 示例:“请用广东话表达。

    • 支持的方言:广东话、东北话、甘肃话、贵州话、河南话、湖北话、江西话、闽南话、宁夏话、山西话、陕西话、山东话、上海话、四川话、天津话、云南话。

  3. 指定情感、场景、角色或身份等:仅部分默认音色支持该功能,且因音色而异,详情请参见音色列表

2. continue-task指令

该指令专门用来发送待合成文本。

可以在一个continue-task指令中一次性发送待合成文本,也可以将文本分段并按顺序在多个continue-task指令中发送。

重要

发送时机:在收到task-started事件后发送。

说明

发送文本片段的间隔不得超过23秒,否则触发“request timeout after 23 seconds”异常。

若无待发送文本,需及时发送finish-task指令结束任务。

服务端强制设定23秒超时机制,客户端无法修改该配置。

示例:

{
    "header": {
        "action": "continue-task",
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx", // 随机uuid
        "streaming": "duplex"
    },
    "payload": {
        "input": {
            "text": "床前明月光,疑是地上霜"
        }
    }
}

header参数说明:

参数

类型

是否必选

说明

header.action

string

指令类型。

当前指令中,固定为"continue-task"。

header.task_id

string

当次任务ID。

需要和发送run-task指令时使用的task_id保持一致。

header.streaming

string

固定字符串:"duplex"

payload参数说明:

参数

类型

是否必选

说明

input.text

string

待合成文本。

3. finish-task指令:结束任务

该指令用于结束语音合成任务。

请务必确保发送该指令,否则可能会出现合成语音缺失的问题。

该指令发送后,服务端会将剩余的文本转成语音,语音合成完成后,服务端向客户端返回task-finished事件

重要

发送时机:continue-task指令发送完成后发送。

示例:

{
    "header": {
        "action": "finish-task",
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
        "streaming": "duplex"
    },
    "payload": {
        "input": {}//input不能省去,否则会报错
    }
}

header参数说明:

参数

类型

是否必选

说明

header.action

string

指令类型。

当前指令中,固定为"finish-task"。

header.task_id

string

当次任务ID。

需要和发送run-task指令时使用的task_id保持一致。

header.streaming

string

固定字符串:"duplex"

payload参数说明:

参数

类型

是否必选

说明

payload.input

object

固定格式:{}。

事件(服务端→客户端)

事件是服务端返回给客户端的消息,为JSON格式,代表不同的处理阶段。

说明

服务端返回给客户端的二进制音频不包含在任何事件中,需单独接收。

1. task-started事件:任务已开启

当监听到服务端返回的task-started事件时,标志着任务已成功开启。只有在接收到该事件后,才能向服务端发送continue-task指令finish-task指令;否则,任务将执行失败。

task-started事件的payload没有内容。

示例:

{
    "header": {
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
        "event": "task-started",
        "attributes": {}
    },
    "payload": {}
}

header参数说明:

参数

类型

说明

header.event

string

事件类型。

当前事件中,固定为"task-started"。

header.task_id

string

客户端生成的task_id

2. result-generated事件

客户端发送continue-task指令finish-task指令的同时,服务端持续返回result-generated事件。

CosyVoice服务中,result-generated事件为协议预留接口,封装了Request ID等信息,现阶段可以忽略。

示例:

{
    "header": {
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
        "event": "result-generated",
        "attributes": {
            "request_uuid": "0a9dba9e-d3a6-45a4-be6d-xxxxxxxxxxxx"
        }
    },
    "payload": {}
}

header参数说明:

参数

类型

说明

header.event

string

事件类型。

当前事件中,固定为"result-generated"。

header.task_id

string

客户端生成的task_id。

header.attributes.request_uuid

string

Request ID。

payload参数说明:

参数

类型

说明

payload.usage.characters

integer

截止当前,本次请求中计费的有效字符数。 在一次任务中,usage可能会出现在result-generated事件task-finished事件中。下发的usage字段为累加后的结果,请按最后一次为准。

3. task-finished事件:任务已结束

当监听到服务端返回的task-finished事件时,说明任务已结束。

结束任务后可以关闭WebSocket连接结束程序,也可以复用WebSocket连接,重新发送run-task指令开启下一个任务(参见关于建连开销和连接复用)。

示例:

{
    "header": {
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
        "event": "task-finished",
        "attributes": {
            "request_uuid": "0a9dba9e-d3a6-45a4-be6d-xxxxxxxxxxxx"
        }
    },
    "payload": {
        "output": {
            "sentence": {
                "words": []
            }
        },
        "usage": {
            "characters": 13
        }
    }
}

header参数说明:

参数

类型

说明

header.event

string

事件类型。

当前事件中,固定为"task-finished"。

header.task_id

string

客户端生成的task_id。

header.attributes.request_uuid

string

Request ID,可提供给CosyVoice开发人员定位问题。

payload参数说明:

参数

类型

说明

payload.usage.characters

integer

截止当前,本次请求中计费的有效字符数。 在一次任务中,usage可能会出现在result-generated事件task-finished事件中。下发的usage字段为累加后的结果,请按最后一次为准。

payload.output.sentence.index

integer

句子的编号,从0开始。

本字段和以下字段需要通过word_timestamp_enabled开启字级别时间戳

payload.output.sentence.words[k]

text

string

字的文本。

begin_index

integer

字在句子中的开始位置索引,从 0 开始。

end_index

integer

字在句子中的结束位置索引,从 1 开始。

begin_time

integer

字对应音频的开始时间戳,单位为毫秒。

end_time

integer

字对应音频的结束时间戳,单位为毫秒。

通过word_timestamp_enabled开启字级别时间戳后会返回时间戳信息,示例如下:

点击查看开启字级别时间戳后的响应

{
    "header": {
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
        "event": "task-finished",
        "attributes": {"request_uuid": "0a9dba9e-d3a6-45a4-be6d-xxxxxxxxxxxx"}
    },
    "payload": {
        "output": {
            "sentence": {
                "index": 0,
                "words": [
                    {
                        "text": "今",
                        "begin_index": 0,
                        "end_index": 1,
                        "begin_time": 80,
                        "end_time": 200
                    },
                    {
                        "text": "天",
                        "begin_index": 1,
                        "end_index": 2,
                        "begin_time": 240,
                        "end_time": 360
                    },
                    {
                        "text": "天",
                        "begin_index": 2,
                        "end_index": 3,
                        "begin_time": 360,
                        "end_time": 480
                    },
                    {
                        "text": "气",
                        "begin_index": 3,
                        "end_index": 4,
                        "begin_time": 480,
                        "end_time": 680
                    },
                    {
                        "text": "怎",
                        "begin_index": 4,
                        "end_index": 5,
                        "begin_time": 680,
                        "end_time": 800
                    },
                    {
                        "text": "么",
                        "begin_index": 5,
                        "end_index": 6,
                        "begin_time": 800,
                        "end_time": 920
                    },
                    {
                        "text": "样",
                        "begin_index": 6,
                        "end_index": 7,
                        "begin_time": 920,
                        "end_time": 1000
                    },
                    {
                        "text": "?",
                        "begin_index": 7,
                        "end_index": 8,
                        "begin_time": 1000,
                        "end_time": 1320
                    }
                ]
            }
        },
        "usage": {"characters": 15}
    }
}

4. task-failed事件:任务失败

如果接收到task-failed事件,表示任务失败。此时需要关闭WebSocket连接并处理错误。通过分析报错信息,如果是由于编程问题导致的任务失败,您可以调整代码进行修正。

示例:

{
    "header": {
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
        "event": "task-failed",
        "error_code": "InvalidParameter",
        "error_message": "[tts:]Engine return error code: 418",
        "attributes": {}
    },
    "payload": {}
}

header参数说明:

参数

类型

说明

header.event

string

事件类型。

当前事件中,固定为"task-failed"。

header.task_id

string

客户端生成的task_id。

header.error_code

string

报错类型描述。

header.error_message

string

具体报错原因。

关于建连开销和连接复用

WebSocket服务支持连接复用以提升资源的利用效率,避免建立连接开销。

服务端收到客户端发送的 run-task指令后,将启动一个新的任务,客户端发送finish-task指令后,服务端在任务完成时返回task-finished事件以结束该任务。结束任务后WebSocket连接可以被复用,客户端重新发送run-task指令即可开启下一个任务。

重要
  1. 在复用连接中的不同任务需要使用不同 task_id。

  2. 如果在任务执行过程中发生失败,服务将依然返回task-failed事件,并关闭该连接。此时这个连接无法继续复用。

  3. 如果在任务结束后60秒没有新的任务,连接会超时自动断开。

示例代码

示例代码仅提供最基础的服务调通实现,实际业务场景的相关代码需您自行开发。

在编写WebSocket客户端代码时,为了同时发送和接收消息,通常采用异步编程。您可以按照以下步骤来编写程序:

  1. 建立WebSocket连接

    调用WebSocket库函数(具体实现方式因编程语言或库函数而异),传入HeadersURL建立WebSocket连接。

  2. 监听服务端消息

    通过 WebSocket 库提供的回调函数(观察者模式),您可以监听服务端返回的消息。具体实现方式因编程语言不同而有所差异。

    服务端返回的消息分为两类:二进制音频流和事件

    监听事件

    处理二进制音频流:服务端通过 binary 通道分帧下发音频流。完整的音频数据被分成多个数据包传输。

    • 流式语音合成中,对于mp3/opus等压缩格式,音频分段传输需使用流式播放器,不可逐帧播放,避免解码失败。

      支持流式播放的播放器:ffmpeg、pyaudio (Python)、AudioFormat (Java)、MediaSource (Javascript)等。
    • 将音频数据合成完整的音频文件时,应以追加模式写入同一文件。

    • 流式语音合成的wav/mp3 格式音频仅首帧包含头信息,后续帧为纯音频数据。

  3. 向服务端发送消息(请务必注意时序)

    在不同于监听服务端消息的线程(如主线程,具体实现因编程语言而异)中,向服务端发送指令。

    发送指令需严格遵循以下时序,否则可能导致任务失败:

    1. 发送 run-task指令

    2. 发送continue-task指令

      • 用于发送待合成文本。

      • 必须在接收到服务端返回的task-started事件后,才能发送此指令。

    3. 发送finish-task指令

  4. 关闭WebSocket连接

    在程序正常结束、运行中出现异常或接收到task-finished事件task-failed事件时,关闭WebSocket连接。通常通过调用工具库中的close函数来实现。

点击查看完整示例

Go

package main

import (
	"encoding/json"
	"fmt"
	"net/http"
	"os"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

const (
	wsURL      = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" // WebSocket服务端地址
	outputFile = "output.mp3"                                        // 输出文件路径
)

func main() {
	// 若没有将API Key配置到环境变量,可将下行替换为:apiKey := "your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
	apiKey := os.Getenv("DASHSCOPE_API_KEY")
	// 检查并清空输出文件
	if err := clearOutputFile(outputFile); err != nil {
		fmt.Println("清空输出文件失败:", err)
		return
	}

	// 连接WebSocket服务
	conn, err := connectWebSocket(apiKey)
	if err != nil {
		fmt.Println("连接WebSocket失败:", err)
		return
	}
	defer closeConnection(conn)

	// 启动一个goroutine来接收结果
	done, taskStarted := startResultReceiver(conn)

	// 发送run-task指令
	taskID, err := sendRunTaskCmd(conn)
	if err != nil {
		fmt.Println("发送run-task指令失败:", err)
		return
	}

	// 等待task-started事件
	for !*taskStarted {
		time.Sleep(100 * time.Millisecond)
	}

	// 发送待合成文本
	if err := sendContinueTaskCmd(conn, taskID); err != nil {
		fmt.Println("发送待合成文本失败:", err)
		return
	}

	// 发送finish-task指令
	if err := sendFinishTaskCmd(conn, taskID); err != nil {
		fmt.Println("发送finish-task指令失败:", err)
		return
	}

	// 等待接收结果的goroutine完成
	<-done
}

var dialer = websocket.DefaultDialer

// 定义结构体来表示JSON数据
type Header struct {
	Action       string                 `json:"action"`
	TaskID       string                 `json:"task_id"`
	Streaming    string                 `json:"streaming"`
	Event        string                 `json:"event"`
	ErrorCode    string                 `json:"error_code,omitempty"`
	ErrorMessage string                 `json:"error_message,omitempty"`
	Attributes   map[string]interface{} `json:"attributes"`
}

type Payload struct {
	TaskGroup  string     `json:"task_group"`
	Task       string     `json:"task"`
	Function   string     `json:"function"`
	Model      string     `json:"model"`
	Parameters Params     `json:"parameters"`
	Resources  []Resource `json:"resources"`
	Input      Input      `json:"input"`
}

type Params struct {
	TextType   string `json:"text_type"`
	Voice      string `json:"voice"`
	Format     string `json:"format"`
	SampleRate int    `json:"sample_rate"`
	Volume     int    `json:"volume"`
	Rate       int    `json:"rate"`
	Pitch      int    `json:"pitch"`
}

type Resource struct {
	ResourceID   string `json:"resource_id"`
	ResourceType string `json:"resource_type"`
}

type Input struct {
	Text string `json:"text"`
}

type Event struct {
	Header  Header  `json:"header"`
	Payload Payload `json:"payload"`
}

// 连接WebSocket服务
func connectWebSocket(apiKey string) (*websocket.Conn, error) {
	header := make(http.Header)
	header.Add("X-DashScope-DataInspection", "enable")
	header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey))
	conn, _, err := dialer.Dial(wsURL, header)
	if err != nil {
		fmt.Println("连接WebSocket失败:", err)
		return nil, err
	}
	return conn, nil
}

// 发送run-task指令
func sendRunTaskCmd(conn *websocket.Conn) (string, error) {
	runTaskCmd, taskID, err := generateRunTaskCmd()
	if err != nil {
		return "", err
	}
	err = conn.WriteMessage(websocket.TextMessage, []byte(runTaskCmd))
	return taskID, err
}

// 生成run-task指令
func generateRunTaskCmd() (string, string, error) {
	taskID := uuid.New().String()
	runTaskCmd := Event{
		Header: Header{
			Action:    "run-task",
			TaskID:    taskID,
			Streaming: "duplex",
		},
		Payload: Payload{
			TaskGroup: "audio",
			Task:      "tts",
			Function:  "SpeechSynthesizer",
			Model:     "cosyvoice-v2",
			Parameters: Params{
				TextType:   "PlainText",
				Voice:      "longxiaochun_v2",
				Format:     "mp3",
				SampleRate: 22050,
				Volume:     50,
				Rate:       1,
				Pitch:      1,
			},
			Input: Input{},
		},
	}
	runTaskCmdJSON, err := json.Marshal(runTaskCmd)
	return string(runTaskCmdJSON), taskID, err
}

// 发送待合成文本
func sendContinueTaskCmd(conn *websocket.Conn, taskID string) error {
	texts := []string{"床前明月光", "疑是地上霜", "举头望明月", "低头思故乡"}

	for _, text := range texts {
		runTaskCmd, err := generateContinueTaskCmd(text, taskID)
		if err != nil {
			return err
		}

		err = conn.WriteMessage(websocket.TextMessage, []byte(runTaskCmd))
		if err != nil {
			return err
		}
	}

	return nil
}

// 生成continue-task指令
func generateContinueTaskCmd(text string, taskID string) (string, error) {
	runTaskCmd := Event{
		Header: Header{
			Action:    "continue-task",
			TaskID:    taskID,
			Streaming: "duplex",
		},
		Payload: Payload{
			Input: Input{
				Text: text,
			},
		},
	}
	runTaskCmdJSON, err := json.Marshal(runTaskCmd)
	return string(runTaskCmdJSON), err
}

// 启动一个goroutine来接收结果
func startResultReceiver(conn *websocket.Conn) (chan struct{}, *bool) {
	done := make(chan struct{})
	taskStarted := new(bool)
	*taskStarted = false

	go func() {
		defer close(done)
		for {
			msgType, message, err := conn.ReadMessage()
			if err != nil {
				fmt.Println("解析服务器消息失败:", err)
				return
			}

			if msgType == websocket.BinaryMessage {
				// 处理二进制音频流
				if err := writeBinaryDataToFile(message, outputFile); err != nil {
					fmt.Println("写入二进制数据失败:", err)
					return
				}
			} else {
				// 处理文本消息
				var event Event
				err = json.Unmarshal(message, &event)
				if err != nil {
					fmt.Println("解析事件失败:", err)
					continue
				}
				if handleEvent(conn, event, taskStarted) {
					return
				}
			}
		}
	}()

	return done, taskStarted
}

// 处理事件
func handleEvent(conn *websocket.Conn, event Event, taskStarted *bool) bool {
	switch event.Header.Event {
	case "task-started":
		fmt.Println("收到task-started事件")
		*taskStarted = true
	case "result-generated":
		// 忽略result-generated事件
		return false
	case "task-finished":
		fmt.Println("任务完成")
		return true
	case "task-failed":
		handleTaskFailed(event, conn)
		return true
	default:
		fmt.Printf("预料之外的事件:%v\n", event)
	}
	return false
}

// 处理任务失败事件
func handleTaskFailed(event Event, conn *websocket.Conn) {
	if event.Header.ErrorMessage != "" {
		fmt.Printf("任务失败:%s\n", event.Header.ErrorMessage)
	} else {
		fmt.Println("未知原因导致任务失败")
	}
}

// 关闭连接
func closeConnection(conn *websocket.Conn) {
	if conn != nil {
		conn.Close()
	}
}

// 写入二进制数据到文件
func writeBinaryDataToFile(data []byte, filePath string) error {
	file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		return err
	}
	defer file.Close()

	_, err = file.Write(data)
	if err != nil {
		return err
	}

	return nil
}

// 发送finish-task指令
func sendFinishTaskCmd(conn *websocket.Conn, taskID string) error {
	finishTaskCmd, err := generateFinishTaskCmd(taskID)
	if err != nil {
		return err
	}
	err = conn.WriteMessage(websocket.TextMessage, []byte(finishTaskCmd))
	return err
}

// 生成finish-task指令
func generateFinishTaskCmd(taskID string) (string, error) {
	finishTaskCmd := Event{
		Header: Header{
			Action:    "finish-task",
			TaskID:    taskID,
			Streaming: "duplex",
		},
		Payload: Payload{
			Input: Input{},
		},
	}
	finishTaskCmdJSON, err := json.Marshal(finishTaskCmd)
	return string(finishTaskCmdJSON), err
}

// 清空输出文件
func clearOutputFile(filePath string) error {
	file, err := os.OpenFile(filePath, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
	if err != nil {
		return err
	}
	file.Close()
	return nil
}

C#

using System.Net.WebSockets;
using System.Text;
using System.Text.Json;

class Program {
    // 若没有将API Key配置到环境变量,可将下行替换为:private const string ApiKey="your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
    private static readonly string ApiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY") ?? throw new InvalidOperationException("DASHSCOPE_API_KEY environment variable is not set.");

    // WebSocket服务器地址
    private const string WebSocketUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/";
    // 输出文件路径
    private const string OutputFilePath = "output.mp3";

    // WebSocket客户端
    private static ClientWebSocket _webSocket = new ClientWebSocket();
    // 取消令牌源
    private static CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
    // 任务ID
    private static string? _taskId;
    // 任务是否已启动
    private static TaskCompletionSource<bool> _taskStartedTcs = new TaskCompletionSource<bool>();

    static async Task Main(string[] args) {
        try {
            // 清空输出文件
            ClearOutputFile(OutputFilePath);

            // 连接WebSocket服务
            await ConnectToWebSocketAsync(WebSocketUrl);

            // 启动接收消息的任务
            Task receiveTask = ReceiveMessagesAsync();

            // 发送run-task指令
            _taskId = GenerateTaskId();
            await SendRunTaskCommandAsync(_taskId);

            // 等待task-started事件
            await _taskStartedTcs.Task;

            // 持续发送continue-task指令
            string[] texts = {
                "床前明月光",
                "疑是地上霜",
                "举头望明月",
                "低头思故乡"
            };
            foreach (string text in texts) {
                await SendContinueTaskCommandAsync(text);
            }

            // 发送finish-task指令
            await SendFinishTaskCommandAsync(_taskId);

            // 等待接收任务完成
            await receiveTask;

            Console.WriteLine("任务完成,连接已关闭。");
        } catch (OperationCanceledException) {
            Console.WriteLine("任务被取消。");
        } catch (Exception ex) {
            Console.WriteLine($"发生错误:{ex.Message}");
        } finally {
            _cancellationTokenSource.Cancel();
            _webSocket.Dispose();
        }
    }

    private static void ClearOutputFile(string filePath) {
        if (File.Exists(filePath)) {
            File.WriteAllText(filePath, string.Empty);
            Console.WriteLine("输出文件已清空。");
        } else {
            Console.WriteLine("输出文件不存在,无需清空。");
        }
    }

    private static async Task ConnectToWebSocketAsync(string url) {
        var uri = new Uri(url);
        if (_webSocket.State == WebSocketState.Connecting || _webSocket.State == WebSocketState.Open) {
            return;
        }

        // 设置WebSocket连接的头部信息
        _webSocket.Options.SetRequestHeader("Authorization", $"bearer {ApiKey}");
        _webSocket.Options.SetRequestHeader("X-DashScope-DataInspection", "enable");

        try {
            await _webSocket.ConnectAsync(uri, _cancellationTokenSource.Token);
            Console.WriteLine("已成功连接到WebSocket服务。");
        } catch (OperationCanceledException) {
            Console.WriteLine("WebSocket连接被取消。");
        } catch (Exception ex) {
            Console.WriteLine($"WebSocket连接失败: {ex.Message}");
            throw;
        }
    }

    private static async Task SendRunTaskCommandAsync(string taskId) {
        var command = CreateCommand("run-task", taskId, "duplex", new {
            task_group = "audio",
            task = "tts",
            function = "SpeechSynthesizer",
            model = "cosyvoice-v2",
            parameters = new
            {
                text_type = "PlainText",
                voice = "longxiaochun_v2",
                format = "mp3",
                sample_rate = 22050,
                volume = 50,
                rate = 1,
                pitch = 1
            },
            input = new { }
        });

        await SendJsonMessageAsync(command);
        Console.WriteLine("已发送run-task指令。");
    }

    private static async Task SendContinueTaskCommandAsync(string text) {
        if (_taskId == null) {
            throw new InvalidOperationException("任务ID未初始化。");
        }

        var command = CreateCommand("continue-task", _taskId, "duplex", new {
            input = new {
                text
            }
        });

        await SendJsonMessageAsync(command);
        Console.WriteLine("已发送continue-task指令。");
    }

    private static async Task SendFinishTaskCommandAsync(string taskId) {
        var command = CreateCommand("finish-task", taskId, "duplex", new {
            input = new { }
        });

        await SendJsonMessageAsync(command);
        Console.WriteLine("已发送finish-task指令。");
    }

    private static async Task SendJsonMessageAsync(string message) {
        var buffer = Encoding.UTF8.GetBytes(message);
        try {
            await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, _cancellationTokenSource.Token);
        } catch (OperationCanceledException) {
            Console.WriteLine("消息发送被取消。");
        }
    }

    private static async Task ReceiveMessagesAsync() {
        while (_webSocket.State == WebSocketState.Open) {
            var response = await ReceiveMessageAsync();
            if (response != null) {
                var eventStr = response.RootElement.GetProperty("header").GetProperty("event").GetString();
                switch (eventStr) {
                    case "task-started":
                        Console.WriteLine("任务已启动。");
                        _taskStartedTcs.TrySetResult(true);
                        break;
                    case "task-finished":
                        Console.WriteLine("任务已完成。");
                        _cancellationTokenSource.Cancel();
                        break;
                    case "task-failed":
                        Console.WriteLine("任务失败。");
                        _cancellationTokenSource.Cancel();
                        break;
                    default:
                        // result-generated可在此处理
                        break;
                }
            }
        }
    }

    private static async Task<JsonDocument?> ReceiveMessageAsync() {
        var buffer = new byte[1024 * 4];
        var segment = new ArraySegment<byte>(buffer);

        try {
            WebSocketReceiveResult result = await _webSocket.ReceiveAsync(segment, _cancellationTokenSource.Token);

            if (result.MessageType == WebSocketMessageType.Close) {
                await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", _cancellationTokenSource.Token);
                return null;
            }

            if (result.MessageType == WebSocketMessageType.Binary) {
                // 处理二进制数据
                Console.WriteLine("接收到二进制数据...");

                // 将二进制数据保存到文件
                using (var fileStream = new FileStream(OutputFilePath, FileMode.Append)) {
                    fileStream.Write(buffer, 0, result.Count);
                }

                return null;
            }

            string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
            return JsonDocument.Parse(message);
        } catch (OperationCanceledException) {
            Console.WriteLine("消息接收被取消。");
            return null;
        }
    }

    private static string GenerateTaskId() {
        return Guid.NewGuid().ToString("N").Substring(0, 32);
    }

    private static string CreateCommand(string action, string taskId, string streaming, object payload) {
        var command = new {
            header = new {
                action,
                task_id = taskId,
                streaming
            },
            payload
        };

        return JsonSerializer.Serialize(command);
    }
}

PHP

示例代码目录结构为:

my-php-project/

├── composer.json

├── vendor/

└── index.php

composer.json内容如下,相关依赖的版本号请根据实际情况自行决定:

{
    "require": {
        "react/event-loop": "^1.3",
        "react/socket": "^1.11",
        "react/stream": "^1.2",
        "react/http": "^1.1",
        "ratchet/pawl": "^0.4"
    },
    "autoload": {
        "psr-4": {
            "App\\": "src/"
        }
    }
}

index.php内容如下:

<?php

require __DIR__ . '/vendor/autoload.php';

use Ratchet\Client\Connector;
use React\EventLoop\Loop;
use React\Socket\Connector as SocketConnector;

# 若没有将API Key配置到环境变量,可将下行替换为:$api_key="your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
$api_key = getenv("DASHSCOPE_API_KEY");
$websocket_url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // WebSocket服务器地址
$output_file = 'output.mp3'; // 输出文件路径

$loop = Loop::get();

if (file_exists($output_file)) {
    // 清空文件内容
    file_put_contents($output_file, '');
}

// 创建自定义的连接器
$socketConnector = new SocketConnector($loop, [
    'tcp' => [
        'bindto' => '0.0.0.0:0',
    ],
    'tls' => [
        'verify_peer' => false,
        'verify_peer_name' => false,
    ],
]);

$connector = new Connector($loop, $socketConnector);

$headers = [
    'Authorization' => 'bearer ' . $api_key,
    'X-DashScope-DataInspection' => 'enable'
];

$connector($websocket_url, [], $headers)->then(function ($conn) use ($loop, $output_file) {
    echo "连接到WebSocket服务器\n";

    // 生成任务ID
    $taskId = generateTaskId();

    // 发送 run-task 指令
    sendRunTaskMessage($conn, $taskId);

    // 定义发送 continue-task 指令的函数
    $sendContinueTask = function() use ($conn, $loop, $taskId) {
        // 待发送的文本
        $texts = ["床前明月光", "疑是地上霜", "举头望明月", "低头思故乡"];
        $continueTaskCount = 0;
        foreach ($texts as $text) {
            $continueTaskMessage = json_encode([
                "header" => [
                    "action" => "continue-task",
                    "task_id" => $taskId,
                    "streaming" => "duplex"
                ],
                "payload" => [
                    "input" => [
                        "text" => $text
                    ]
                ]
            ]);
            echo "准备发送continue-task指令: " . $continueTaskMessage . "\n";
            $conn->send($continueTaskMessage);
            $continueTaskCount++;
        }
        echo "发送的continue-task指令个数为:" . $continueTaskCount . "\n";

        // 发送 finish-task 指令
        sendFinishTaskMessage($conn, $taskId);
    };

    // 标记是否收到 task-started 事件
    $taskStarted = false;

    // 监听消息
    $conn->on('message', function($msg) use ($conn, $sendContinueTask, $loop, &$taskStarted, $taskId, $output_file) {
        if ($msg->isBinary()) {
            // 写入二进制数据到本地文件
            file_put_contents($output_file, $msg->getPayload(), FILE_APPEND);
        } else {
            // 处理非二进制消息
            $response = json_decode($msg, true);

            if (isset($response['header']['event'])) {
                handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, $taskStarted);
            } else {
                echo "未知的消息格式\n";
            }
        }
    });

    // 监听连接关闭
    $conn->on('close', function($code = null, $reason = null) {
        echo "连接已关闭\n";
        if ($code !== null) {
            echo "关闭代码: " . $code . "\n";
        }
        if ($reason !== null) {
            echo "关闭原因:" . $reason . "\n";
        }
    });
}, function ($e) {
    echo "无法连接:{$e->getMessage()}\n";
});

$loop->run();

/**
 * 生成任务ID
 * @return string
 */
function generateTaskId(): string {
    return bin2hex(random_bytes(16));
}

/**
 * 发送 run-task 指令
 * @param $conn
 * @param $taskId
 */
function sendRunTaskMessage($conn, $taskId) {
    $runTaskMessage = json_encode([
        "header" => [
            "action" => "run-task",
            "task_id" => $taskId,
            "streaming" => "duplex"
        ],
        "payload" => [
            "task_group" => "audio",
            "task" => "tts",
            "function" => "SpeechSynthesizer",
            "model" => "cosyvoice-v2",
            "parameters" => [
                "text_type" => "PlainText",
                "voice" => "longxiaochun_v2",
                "format" => "mp3",
                "sample_rate" => 22050,
                "volume" => 50,
                "rate" => 1,
                "pitch" => 1
            ],
            "input" => (object) []
        ]
    ]);
    echo "准备发送run-task指令: " . $runTaskMessage . "\n";
    $conn->send($runTaskMessage);
    echo "run-task指令已发送\n";
}

/**
 * 读取音频文件
 * @param string $filePath
 * @return bool|string
 */
function readAudioFile(string $filePath) {
    $voiceData = file_get_contents($filePath);
    if ($voiceData === false) {
        echo "无法读取音频文件\n";
    }
    return $voiceData;
}

/**
 * 分割音频数据
 * @param string $data
 * @param int $chunkSize
 * @return array
 */
function splitAudioData(string $data, int $chunkSize): array {
    return str_split($data, $chunkSize);
}

/**
 * 发送 finish-task 指令
 * @param $conn
 * @param $taskId
 */
function sendFinishTaskMessage($conn, $taskId) {
    $finishTaskMessage = json_encode([
        "header" => [
            "action" => "finish-task",
            "task_id" => $taskId,
            "streaming" => "duplex"
        ],
        "payload" => [
            "input" => (object) []
        ]
    ]);
    echo "准备发送finish-task指令: " . $finishTaskMessage . "\n";
    $conn->send($finishTaskMessage);
    echo "finish-task指令已发送\n";
}

/**
 * 处理事件
 * @param $conn
 * @param $response
 * @param $sendContinueTask
 * @param $loop
 * @param $taskId
 * @param $taskStarted
 */
function handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, &$taskStarted) {
    switch ($response['header']['event']) {
        case 'task-started':
            echo "任务开始,发送continue-task指令...\n";
            $taskStarted = true;
            // 发送 continue-task 指令
            $sendContinueTask();
            break;
        case 'result-generated':
            // 忽略result-generated事件
            break;
        case 'task-finished':
            echo "任务完成\n";
            $conn->close();
            break;
        case 'task-failed':
            echo "任务失败\n";
            echo "错误代码:" . $response['header']['error_code'] . "\n";
            echo "错误信息:" . $response['header']['error_message'] . "\n";
            $conn->close();
            break;
        case 'error':
            echo "错误:" . $response['payload']['message'] . "\n";
            break;
        default:
            echo "未知事件:" . $response['header']['event'] . "\n";
            break;
    }

    // 如果任务已完成,关闭连接
    if ($response['header']['event'] == 'task-finished') {
        // 等待1秒以确保所有数据都已传输完毕
        $loop->addTimer(1, function() use ($conn) {
            $conn->close();
            echo "客户端关闭连接\n";
        });
    }

    // 如果没有收到 task-started 事件,关闭连接
    if (!$taskStarted && in_array($response['header']['event'], ['task-failed', 'error'])) {
        $conn->close();
    }
}

Node.js

需安装相关依赖:

npm install ws
npm install uuid

示例代码如下:

const WebSocket = require('ws');
const fs = require('fs');
const uuid = require('uuid').v4;

// 若没有将API Key配置到环境变量,可将下行替换为:apiKey = 'your_api_key'。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
const apiKey = process.env.DASHSCOPE_API_KEY;
// WebSocket服务器地址
const url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/';
// 输出文件路径
const outputFilePath = 'output.mp3';

// 清空输出文件
fs.writeFileSync(outputFilePath, '');

// 创建WebSocket客户端
const ws = new WebSocket(url, {
  headers: {
    Authorization: `bearer ${apiKey}`,
    'X-DashScope-DataInspection': 'enable'
  }
});

let taskStarted = false;
let taskId = uuid();

ws.on('open', () => {
  console.log('已连接到WebSocket服务器');

  // 发送run-task指令
  const runTaskMessage = JSON.stringify({
    header: {
      action: 'run-task',
      task_id: taskId,
      streaming: 'duplex'
    },
    payload: {
      task_group: 'audio',
      task: 'tts',
      function: 'SpeechSynthesizer',
      model: 'cosyvoice-v2',
      parameters: {
        text_type: 'PlainText',
        voice: 'longxiaochun_v2', // 音色
        format: 'mp3', // 音频格式
        sample_rate: 22050, // 采样率
        volume: 50, // 音量
        rate: 1, // 语速
        pitch: 1 // 音调
      },
      input: {}
    }
  });
  ws.send(runTaskMessage);
  console.log('已发送run-task消息');
});

const fileStream = fs.createWriteStream(outputFilePath, { flags: 'a' });
ws.on('message', (data, isBinary) => {
  if (isBinary) {
    // 写入二进制数据到文件
    fileStream.write(data);
  } else {
    const message = JSON.parse(data);

    switch (message.header.event) {
      case 'task-started':
        taskStarted = true;
        console.log('任务已开始');
        // 发送continue-task指令
        sendContinueTasks(ws);
        break;
      case 'task-finished':
        console.log('任务已完成');
        ws.close();
        fileStream.end(() => {
          console.log('文件流已关闭');
        });
        break;
      case 'task-failed':
        console.error('任务失败:', message.header.error_message);
        ws.close();
        fileStream.end(() => {
          console.log('文件流已关闭');
        });
        break;
      default:
        // 可以在这里处理result-generated
        break;
    }
  }
});

function sendContinueTasks(ws) {
  const texts = [
    '床前明月光,',
    '疑是地上霜。',
    '举头望明月,',
    '低头思故乡。'
  ];
  
  texts.forEach((text, index) => {
    setTimeout(() => {
      if (taskStarted) {
        const continueTaskMessage = JSON.stringify({
          header: {
            action: 'continue-task',
            task_id: taskId,
            streaming: 'duplex'
          },
          payload: {
            input: {
              text: text
            }
          }
        });
        ws.send(continueTaskMessage);
        console.log(`已发送continue-task,文本:${text}`);
      }
    }, index * 1000); // 每隔1秒发送一次
  });

  // 发送finish-task指令
  setTimeout(() => {
    if (taskStarted) {
      const finishTaskMessage = JSON.stringify({
        header: {
          action: 'finish-task',
          task_id: taskId,
          streaming: 'duplex'
        },
        payload: {
          input: {}
        }
      });
      ws.send(finishTaskMessage);
      console.log('已发送finish-task');
    }
  }, texts.length * 1000 + 1000); // 在所有continue-task指令发送完毕后1秒发送
}

ws.on('close', () => {
  console.log('已断开与WebSocket服务器的连接');
});

Java

如您使用Java编程语言,建议采用Java DashScope SDK进行开发,详情请参见Java SDK

以下是Java WebSocket的调用示例。在运行示例前,请确保已导入以下依赖:

  • Java-WebSocket

  • jackson-databind

推荐您使用MavenGradle管理依赖包,其配置如下:

pom.xml

<dependencies>
    <!-- WebSocket Client -->
    <dependency>
        <groupId>org.java-websocket</groupId>
        <artifactId>Java-WebSocket</artifactId>
        <version>1.5.3</version>
    </dependency>

    <!-- JSON Processing -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.13.0</version>
    </dependency>
</dependencies>

build.gradle

// 省略其它代码
dependencies {
  // WebSocket Client
  implementation 'org.java-websocket:Java-WebSocket:1.5.3'
  // JSON Processing
  implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.0'
}
// 省略其它代码

Java代码如下:

import com.fasterxml.jackson.databind.ObjectMapper;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;

public class TTSWebSocketClient extends WebSocketClient {
    private final String taskId = UUID.randomUUID().toString();
    private final String outputFile = "output_" + System.currentTimeMillis() + ".mp3";
    private boolean taskFinished = false;

    public TTSWebSocketClient(URI serverUri, Map<String, String> headers) {
        super(serverUri, headers);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        System.out.println("连接成功");

        // 发送run-task指令
        String runTaskCommand = "{ \"header\": { \"action\": \"run-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"task_group\": \"audio\", \"task\": \"tts\", \"function\": \"SpeechSynthesizer\", \"model\": \"cosyvoice-v2\", \"parameters\": { \"text_type\": \"PlainText\", \"voice\": \"longxiaochun_v2\", \"format\": \"mp3\", \"sample_rate\": 22050, \"volume\": 50, \"rate\": 1, \"pitch\": 1 }, \"input\": {} }}";
        send(runTaskCommand);
    }

    @Override
    public void onMessage(String message) {
        System.out.println("收到服务端返回的消息:" + message);
        try {
            // Parse JSON message
            Map<String, Object> messageMap = new ObjectMapper().readValue(message, Map.class);

            if (messageMap.containsKey("header")) {
                Map<String, Object> header = (Map<String, Object>) messageMap.get("header");

                if (header.containsKey("event")) {
                    String event = (String) header.get("event");

                    if ("task-started".equals(event)) {
                        System.out.println("收到服务端返回的task-started事件");

                        List<String> texts = Arrays.asList(
                                "床前明月光,疑是地上霜",
                                "举头望明月,低头思故乡"
                        );

                        for (String text : texts) {
                            // 发送continue-task指令
                            sendContinueTask(text);
                        }

                        // 发送finish-task指令
                        sendFinishTask();
                    } else if ("task-finished".equals(event)) {
                        System.out.println("收到服务端返回的task-finished事件");
                        taskFinished = true;
                        closeConnection();
                    } else if ("task-failed".equals(event)) {
                        System.out.println("任务失败:" + message);
                        closeConnection();
                    }
                }
            }
        } catch (Exception e) {
            System.err.println("出现异常:" + e.getMessage());
        }
    }

    @Override
    public void onMessage(ByteBuffer message) {
        System.out.println("收到的二进制音频数据大小为:" + message.remaining());

        try (FileOutputStream fos = new FileOutputStream(outputFile, true)) {
            byte[] buffer = new byte[message.remaining()];
            message.get(buffer);
            fos.write(buffer);
            System.out.println("音频数据已写入本地文件" + outputFile + "中");
        } catch (IOException e) {
            System.err.println("音频数据写入本地文件失败:" + e.getMessage());
        }
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        System.out.println("连接关闭:" + reason + " (" + code + ")");
    }

    @Override
    public void onError(Exception ex) {
        System.err.println("报错:" + ex.getMessage());
        ex.printStackTrace();
    }

    private void sendContinueTask(String text) {
        String command = "{ \"header\": { \"action\": \"continue-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": { \"text\": \"" + text + "\" } }}";
        send(command);
    }

    private void sendFinishTask() {
        String command = "{ \"header\": { \"action\": \"finish-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": {} }}";
        send(command);
    }

    private void closeConnection() {
        if (!isClosed()) {
            close();
        }
    }

    public static void main(String[] args) {
        try {
            String apiKey = System.getenv("DASHSCOPE_API_KEY");
            if (apiKey == null || apiKey.isEmpty()) {
                System.err.println("请设置 DASHSCOPE_API_KEY 环境变量");
                return;
            }

            Map<String, String> headers = new HashMap<>();
            headers.put("Authorization", "bearer " + apiKey);
            TTSWebSocketClient client = new TTSWebSocketClient(new URI("wss://dashscope.aliyuncs.com/api-ws/v1/inference/"), headers);

            client.connect();

            while (!client.isClosed() && !client.taskFinished) {
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            System.err.println("连接WebSocket服务失败:" + e.getMessage());
            e.printStackTrace();
        }
    }
}

Python

如您使用Python编程语言,建议采用Python DashScope SDK进行开发,详情请参见Python SDK

以下是Python WebSocket的调用示例。在运行示例前,请确保通过如下方式导入依赖:

pip uninstall websocket-client
pip uninstall websocket
pip install websocket-client
重要

请不要将运行示例代码的Python命名为“websocket.py”,否则会报错(AttributeError: module 'websocket' has no attribute 'WebSocketApp'. Did you mean: 'WebSocket'?)。

import websocket
import json
import uuid
import os
import time


class TTSClient:
    def __init__(self, api_key, uri):
        """
    初始化 TTSClient 实例

    参数:
        api_key (str): 鉴权用的 API Key
        uri (str): WebSocket 服务地址
    """
        self.api_key = api_key  # 替换为你的 API Key
        self.uri = uri  # 替换为你的 WebSocket 地址
        self.task_id = str(uuid.uuid4())  # 生成唯一任务 ID
        self.output_file = f"output_{int(time.time())}.mp3"  # 输出音频文件路径
        self.ws = None  # WebSocketApp 实例
        self.task_started = False  # 是否收到 task-started
        self.task_finished = False  # 是否收到 task-finished / task-failed

    def on_open(self, ws):
        """
    WebSocket 连接建立时回调函数
    发送 run-task 指令开启语音合成任务
    """
        print("WebSocket 已连接")

        # 构造 run-task 指令
        run_task_cmd = {
            "header": {
                "action": "run-task",
                "task_id": self.task_id,
                "streaming": "duplex"
            },
            "payload": {
                "task_group": "audio",
                "task": "tts",
                "function": "SpeechSynthesizer",
                "model": "cosyvoice-v2",
                "parameters": {
                    "text_type": "PlainText",
                    "voice": "longxiaochun_v2",
                    "format": "mp3",
                    "sample_rate": 22050,
                    "volume": 50,
                    "rate": 1,
                    "pitch": 1
                },
                "input": {}
            }
        }

        # 发送 run-task 指令
        ws.send(json.dumps(run_task_cmd))
        print("已发送 run-task 指令")

    def on_message(self, ws, message):
        """
    接收到消息时的回调函数
    区分文本和二进制消息处理
    """
        if isinstance(message, str):
            # 处理 JSON 文本消息
            try:
                msg_json = json.loads(message)
                print(f"收到 JSON 消息: {msg_json}")

                if "header" in msg_json:
                    header = msg_json["header"]

                    if "event" in header:
                        event = header["event"]

                        if event == "task-started":
                            print("任务已启动")
                            self.task_started = True

                            # 发送 continue-task 指令
                            texts = [
                                "床前明月光,疑是地上霜",
                                "举头望明月,低头思故乡"
                            ]

                            for text in texts:
                                self.send_continue_task(text)

                            # 所有 continue-task 发送完成后发送 finish-task
                            self.send_finish_task()

                        elif event == "task-finished":
                            print("任务已完成")
                            self.task_finished = True
                            self.close(ws)

                        elif event == "task-failed":
                            error_msg = msg_json.get("error_message", "未知错误")
                            print(f"任务失败: {error_msg}")
                            self.task_finished = True
                            self.close(ws)

            except json.JSONDecodeError as e:
                print(f"JSON 解析失败: {e}")
        else:
            # 处理二进制消息(音频数据)
            print(f"收到二进制消息,大小: {len(message)} 字节")
            with open(self.output_file, "ab") as f:
                f.write(message)
            print(f"已将音频数据写入本地文件{self.output_file}中")

    def on_error(self, ws, error):
        """发生错误时的回调"""
        print(f"WebSocket 出错: {error}")

    def on_close(self, ws, close_status_code, close_msg):
        """连接关闭时的回调"""
        print(f"WebSocket 已关闭: {close_msg} ({close_status_code})")

    def send_continue_task(self, text):
        """发送 continue-task 指令,附带要合成的文本内容"""
        cmd = {
            "header": {
                "action": "continue-task",
                "task_id": self.task_id,
                "streaming": "duplex"
            },
            "payload": {
                "input": {
                    "text": text
                }
            }
        }

        self.ws.send(json.dumps(cmd))
        print(f"已发送 continue-task 指令,文本内容: {text}")

    def send_finish_task(self):
        """发送 finish-task 指令,结束语音合成任务"""
        cmd = {
            "header": {
                "action": "finish-task",
                "task_id": self.task_id,
                "streaming": "duplex"
            },
            "payload": {
                "input": {}
            }
        }

        self.ws.send(json.dumps(cmd))
        print("已发送 finish-task 指令")

    def close(self, ws):
        """主动关闭连接"""
        if ws and ws.sock and ws.sock.connected:
            ws.close()
            print("已主动关闭连接")

    def run(self):
        """启动 WebSocket 客户端"""
        # 设置请求头部(鉴权)
        header = {
            "Authorization": f"bearer {self.api_key}",
            "X-DashScope-DataInspection": "enable"
        }

        # 创建 WebSocketApp 实例
        self.ws = websocket.WebSocketApp(
            self.uri,
            header=header,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )

        print("正在监听 WebSocket 消息...")
        self.ws.run_forever()  # 启动长连接监听


# 示例使用方式
if __name__ == "__main__":
    API_KEY = os.environ.get("DASHSCOPE_API_KEY")  # 如您未将API Key配置到环境变量,请将API_KEY 设置为您的 API Key
    SERVER_URI = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/"  # 替换为你的 WebSocket 地址

    client = TTSClient(API_KEY, SERVER_URI)
    client.run()

错误码

如遇报错问题,请参见错误信息进行排查。

常见问题

功能特性/计量计费/限流

Q:当遇到发音不准的情况时,有什么解决方案可以尝试?

通过SSML可以对语音合成效果进行个性化定制。

Q:为什么使用WebSocket协议而非HTTP/HTTPS协议?为什么不提供RESTful API?

语音服务选择 WebSocket 而非 HTTP/HTTPS/RESTful,根本在于其依赖全双工通信能力——WebSocket 允许服务端与客户端主动双向传输数据(如实时推送语音合成/识别进度),而基于 HTTP 的 RESTful 仅支持客户端发起的单向请求-响应模式,无法满足实时交互需求。

Q:语音合成是按文本字符数计费的,要如何查看或获取每次合成的文本长度?

通过服务端返回的result-generated事件payload.usage.characters参数获取字符数,请以收到的最后一个result-generated事件为准。

故障排查

重要

代码报错时,建议您检查发送至服务端的指令是否正确:可以通过打印指令内容,检查是否存在格式有误或必填参数遗漏的情况。如指令正确,请根据错误码中的信息进行排查。

Q:如何获取Request ID

通过以下两种方式可以获取:

Q:使用SSML功能失败是什么原因?

请按以下步骤排查:

  1. 确保当前音色支持SSML功能(声音复刻音色不支持SSML)

  2. 确保model参数值为cosyvoice-v2

  3. 确保用正确的方式进行调用,详情请参见SSML标记语言支持说明

  4. 确保待合成文本为纯文本格式且符合格式要求,详情请参见SSML标记语言介绍

Q:为什么音频无法播放?

请根据以下场景逐一排查:

  1. 音频保存为完整文件(如xx.mp3)的情况

    1. 音频格式一致性:确保请求参数中设置的音频格式与文件后缀一致。例如,如果请求参数设置的音频格式为wav,但文件后缀为mp3,可能会导致播放失败。

    2. 播放器兼容性:确认使用的播放器是否支持该音频文件的格式和采样率。例如,某些播放器可能不支持高采样率或特定编码的音频文件。

  2. 流式播放音频的情况

    1. 将音频流保存为完整文件,尝试使用播放器播放。如果文件无法播放,请参考场景 1 的排查方法。

    2. 如果文件可以正常播放,则问题可能出在流式播放的实现上。请确认使用的播放器是否支持流式播放。

      常见的支持流式播放的工具和库包括:ffmpeg、pyaudio (Python)、AudioFormat (Java)、MediaSource (Javascript)等。

Q:为什么音频播放卡顿?

请根据以下场景逐一排查:

  1. 检查文本发送速度: 确保发送文本的间隔合理,避免前一句音频播放完毕后,下一句文本未能及时发送。

  2. 检查回调函数性能:

    • 检查回调函数中是否存在过多业务逻辑,导致阻塞。

    • 回调函数运行在 WebSocket 线程中,若被阻塞,可能会影响 WebSocket 接收网络数据包,进而导致音频接收卡顿。

    • 建议将音频数据写入一个独立的音频缓冲区(audio buffer),然后在其他线程中读取并处理,避免阻塞 WebSocket 线程。

  3. 检查网络稳定性: 确保网络连接稳定,避免因网络波动导致音频传输中断或延迟。

Q:语音合成慢(合成时间长)是什么原因?

请按以下步骤排查:

  1. 检查输入间隔

    如果是流式语音合成,请确认文字发送间隔是否过长(如上段发出后延迟数秒才发送下段),过久间隔会导致合成总时长增加。

  2. 分析性能指标

    • 首包延迟:正常500ms左右。

    • RTF(RTF = 合成总耗时/音频时长):正常小于1.0。

Q:合成的语音发音错误如何处理?

  • 若当前使用的模型为cosyvoice-v1,推荐使用cosyvoice-v2,v2效果更好且支持SSML

  • 若当前使用的模型为cosyvoice-v2,请使用SSML<phoneme>标签指定正确的发音。

Q:为什么没有返回语音?为什么结尾部分的文本没能成功转换成语音?(合成语音缺失)

请确认是否忘记发送finish-task指令。在语音合成过程中,服务端会在缓存足够文本后才开始合成。如果忘记发送finish-task指令,可能会导致缓存中的结尾部分文本未能被合成为语音。

Q:为什么返回的音频流顺序错乱?导致播放内容混乱

请从以下两个方面排查:

权限与认证

Q:我希望我的 API Key 仅用于 CosyVoice 语音合成服务,而不被百炼其他模型使用(权限隔离),我该如何做

可以通过新建业务空间并只授权特定模型来限制API Key的使用范围。详情请参见业务空间管理

Q:使用子业务空间的API Key是否可以调用CosyVoice模型?

对于默认业务空间,cosyvoice-v1cosyvoice-v2均可调用。

对于子业务空间:需要为API Key对应的子业务空间进行模型授权,详情请参见子业务空间的模型调用

更多问题

请参见GitHub QA