CosyVoice2提供API接口,用于管理音频文件、创建语音合成等功能。本文为您介绍CosyVoice2支持的接口类型及调用方式。
准备工作
部署CosyVoice2 WebUI服务或Frontend/Backend分离式高性能服务,且需要进行模型配置(用来保存上传的音频文件)。具体操作,请参见快速部署WebUI服务或快速部署Frontend/Backend分离式高性能服务。
获取服务访问地址和Token。
重要对于Frontend/Backend分离式高性能服务,API调用的是Frontend服务。
单击CosyVoice2的WebUI服务或Frontend服务名称,在概览页面的基本信息区域,单击查看调用信息。
在调用信息配置面板的共享网关页签,获取服务访问地址(EAS_SERVICE_URL)和Token(EAS_TOKEN),并将访问地址末尾的
/
删除。说明使用公网调用地址:调用客户端需支持访问公网。
使用VPC调用地址:调用客户端必须与服务位于同一个专有网络内。
准备音频文件。
本方案中使用的参考音频如下:
参考语音WAV文件:zero_shot_prompt.wav
参考语音文本:
希望你以后能够做得比我还好哟
接口列表
上传参考音频文件
调用方式
调用地址
<EAS_SERVICE_URL>/v1/audio/reference_audio
请求方式
POST
请求HEADERS
Authorization: Bearer <EAS_TOKEN>
请求参数
file:必填,表示需要上传的音频文件,支持MP3、opus、AAC、flac、WAV和pcm。类型:file,默认值:无。
text:必填,表示音频文件对应的文字内容。类型:string。
返回参数
返回一个reference audio object,详情请参见返回参数列表。
请求示例
cURL
# <EAS_SERVICE_URL>和<EAS_TOKEN>需分别替换为服务访问地址和Token。 curl -XPOST <EAS_SERVICE_URL>/v1/audio/reference_audio \ --header 'Authorization: Bearer <EAS_TOKEN>' \ --form 'file=@"/home/xxxx/zero_shot_prompt.wav"' \ --form 'text="希望你以后能够过得比我还好哟"'
Python
import requests response = requests.post( "<EAS_SERVICE_URL>/v1/audio/reference_audio", # <EAS_SERVICE_URL>需替换为服务访问地址。 headers={ "Authorization": "Bearer <EAS_TOKEN>", # <EAS_TOKEN>需替换为服务Token。 }, files={ "file": open("./zero_shot_prompt.wav", "rb"), }, data={ "text": "希望你以后能够做的得比我还好哟" } ) print(response.text)
返回示例
{ "id": "50a5fdb9-c3ad-445a-adbb-3be32750****", "filename": "zero_shot_prompt.wav", "bytes": 111496, "created_at": 1748416005, "text": "希望你以后能够过得比我还好哟" }
查看参考音频文件列表
调用方式
调用地址
<EAS_SERVICE_URL>/v1/audio/reference_audio
请求方式
GET
请求HEADERS
Authorization: Bearer <EAS_TOKEN>
Content-Type: application/json
请求参数
limit:选填,用于限制返回文件数。类型:integer,默认值:100。
order:选填,类型:string。按对象的created_at时间戳排序,取值如下:
asc:升序
desc(默认值):降序
返回参数
返回一个reference audio object,详情请参见返回参数列表。
请求示例
cURL
# <EAS_SERVICE_URL>和<EAS_TOKEN>需分别替换为服务访问地址和Token。 curl -XGET <EAS_SERVICE_URL>/v1/audio/reference_audio?limit=10&order=desc \ --header 'Authorization: Bearer <EAS_TOKEN>'
Python
import requests response = requests.get( "<EAS_SERVICE_URL>/v1/audio/reference_audio", # <EAS_SERVICE_URL>需替换为服务访问地址。 headers={ "Authorization": "Bearer <EAS_TOKEN>", # <EAS_TOKEN>需替换为服务Token。 "Content-Type": "application/json", } ) print(response.text)
返回示例
[ { "id": "50a5fdb9-c3ad-445a-adbb-3be32750****", "filename": "zero_shot_prompt.wav", "bytes": 111496, "created_at": 1748416005, "text": "希望你以后能够做得比我还好哟" } ]
查看指定参考音频文件
调用方式
调用地址
<EAS_SERVICE_URL>/v1/audio/reference_audio/<reference_audio_id>
请求方式
GET
请求HEADERS
Authorization: Bearer <EAS_TOKEN>
Content-Type: application/json
路径参数
reference_audio_id:必填,表示参考音频ID,如何获取,请参见List reference audio。类型:String,默认值:无。
返回参数
返回一个reference audio object,详情请参见返回参数列表。
请求示例
cURL
# <EAS_SERVICE_URL>和<EAS_TOKEN>需分别替换为服务访问地址和Token。 # <reference_audio_id>需替换为参考音频ID。 curl -XGET <EAS_SERVICE_URL>/v1/audio/reference_audio/<reference_audio_id> \ --header 'Authorization: Bearer <EAS_TOKEN>'
Python
import requests response = requests.get( "<EAS_SERVICE_URL>/v1/audio/reference_audio/<reference_audio_id>", # <EAS_SERVICE_URL>需替换为服务访问地址。 headers={ "Authorization": "Bearer <EAS_TOKEN>", # <EAS_TOKEN>需替换为服务Token。 "Content-Type": "application/json", } ) print(response.text)
返回示例
{ "id": "50a5fdb9-c3ad-445a-adbb-3be32750****", "filename": "zero_shot_prompt.wav", "bytes": 111496, "created_at": 1748416005, "text": "希望你以后能够做得比我还好哟" }
删除参考音频文件
调用方式
调用地址
<EAS_SERVICE_URL>/v1/audio/reference_audio/<reference_audio_id>
请求方式
DELETE
请求HEADERS
Authorization: Bearer <EAS_TOKEN>
Content-Type: application/json
路径参数
reference_audio_id:必填,表示参考音频ID,如何获取,请参见List reference audio。类型:String,默认值:无。
返回参数
返回一个reference audio object。
请求示例
cURL
# <EAS_SERVICE_URL>和<EAS_TOKEN>需分别替换为服务访问地址和Token。 # <reference_audio_id>需替换为参考音频ID。 curl -XDELETE <EAS_SERVICE_URL>/v1/audio/reference_audio/<reference_audio_id> \ --header 'Authorization: Bearer <EAS_TOKEN>'
Python
import requests response = requests.delete( "<EAS_SERVICE_URL>/v1/audio/reference_audio/<reference_audio_id>", # <EAS_SERVICE_URL>需替换为服务访问地址。 headers={ "Authorization": "Bearer <EAS_TOKEN>", # <EAS_TOKEN>需替换为服务Token。 "Content-Type": "application/json", } ) print(response.text)
返回示例
{ "code": "OK", "message": "reference audio: c0939ce0-308e-4073-918f-91ac88e3**** deleted.", "data": {} }
创建语音合成
调用方式
调用地址
<EAS_SERVICE_URL>/v1/audio/speech
请求方式
POST
请求HEADERS
Authorization: Bearer <EAS_TOKEN>
Content-Type: application/json
请求参数
model:必填,模型名称,目前仅支持CosyVoice2-0.5B。类型:string,默认值:无。
input:必填,类型:array,默认值:无。表示输入内容,取值如下:
mode:必填,类型:string。音频合成模式,取值如下:
fast_replication:快速复刻
cross_lingual_replication:跨语种复刻
natural_language_replication:自然语言复刻
text:必填,需要合成的文本。类型:string,默认值:无。
reference_audio_id:必填,表示参考音频ID,如何获取,请参见List reference audio。类型:string,默认值:无。
instruct:选填,instruct文本,动态调整语音风格,例如语气、情感、语速等。仅模式选择natural_language_replication时生效。类型:string,默认值:无。
sample_rate:选填,音频采样率。默认值:24000。
bit_rate:选填,MP3格式的比特率。默认值:192000。
compression_level:选填,MP3格式的压缩级别。默认值:2。
stream:选填,是否输出流式,目前仅支持stream=true。类型:boolean,默认值:true。
speed:选填,输出语音的速度,取值范围:[0.5~2.0]。类型:float,默认值:1.0。
返回参数
流式返回speech chunk object,详情请参见返回参数列表。
请求示例
非流式调用
cURL
<EAS_SERVICE_URL>和<EAS_TOKEN>需分别替换为服务访问地址和Token。
<reference_audio_id>需替换为参考音频ID。
curl -XPOST <EAS_SERVICE_URL>/v1/audio/speech \ --header 'Authorization: Bearer <EAS_TOKEN>' \ --header 'Content-Type: application/json' \ --data '{ "model": "CosyVoice2-0.5B", "input": { "mode": "natural_language_replication", "reference_audio_id": "<reference_audio_id>", "text": "北京, 你好, 收到好友从远方寄来的生日礼物,那份意外的惊喜与深深的祝福让我心中充满了甜蜜的快乐,笑容如花儿般绽放。", "instruct": "用四川话说或者用广东话说" }, "stream": false, "speed": 1.0 }' ------------------------------ Output ------------------------------ # data: {"output":{"finish_reason":null,"audio":{"data":"DNgB9djax9su3Ba...."}},"request_id": "f90a65be-f47b-46b5-9ddc-70bae550****"} # 返回base64位编码结果。
Python
import json import base64 import requests response = requests.post( "<EAS_SERVICE_URL>/v1/audio/speech", # <EAS_SERVICE_URL>需替换为服务访问地址。 headers={ "Authorization": "Bearer <EAS_TOKEN>", # <EAS_TOKEN>需替换为服务Token。 "Content-Type": "application/json", }, json={ "model": "CosyVoice2-0.5B", "input": { "mode": "natural_language_replication", "reference_audio_id": "<reference_audio_id>", # <reference_audio_id>需替换为参考音频ID。 "text": "北京, 你好, 收到好友从远方寄来的生日礼物,那份意外的惊喜与深深的祝福让我心中充满了甜蜜的快乐,笑容如花儿般绽放。", "instruct": "用四川话说或者用广东话说" }, "speed": 1.0, "stream": False } ) data = json.loads(response.text) encode_buffer = data['output']['audio']['data'] decode_buffer = base64.b64decode(encode_buffer) with open(f'http_non_stream.wav', 'wb') as f: f.write(decode_buffer)
流式调用
cURL
<EAS_SERVICE_URL>和<EAS_TOKEN>需分别替换为服务访问地址和Token。
<reference_audio_id>需替换为参考音频ID。
curl -XPOST <EAS_SERVICE_URL>/v1/audio/speech \ --header 'Authorization: Bearer <EAS_TOKEN>' \ --header 'Content-Type: application/json' \ --data '{ "model": "CosyVoice2-0.5B", "input": { "mode": "natural_language_replication", "reference_audio_id": "<reference_audio_id>", "text": "北京, 你好, 收到好友从远方寄来的生日礼物,那份意外的惊喜与深深的祝福让我心中充满了甜蜜的快乐,笑容如花儿般绽放。", "instruct": "用四川话说或者用广东话说" }, "stream": true, "speed": 1.0 }' ------------------------------ Stream Output ------------------------------ # data: {"output":{"finish_reason":null,"audio":{"data":"DNgB9djax9su3Ba...."}},"request_id": "f90a65be-f47b-46b5-9ddc-70bae550****"} # data: {"output":{"finish_reason":null,"audio":{"data":"DNgB9djax9su3Ba...."}},"request_id": "f90a65be-f47b-46b5-9ddc-70bae550****"} # data: {"output":{"finish_reason":null,"audio":{"data":"DNgB9djax9su3Ba...."}},"request_id": "f90a65be-f47b-46b5-9ddc-70bae550****"} # data: {"output":{"finish_reason":null,"audio":{"data":"DNgB9djax9su3Ba...."}},"request_id": "f90a65be-f47b-46b5-9ddc-70bae550****"} # 返回base64位编码结果。
Python
需要安装python SSE客户端:
pip install sseclient-py
import json import base64 import requests from sseclient import SSEClient # pip install sseclient-py response = requests.post( "<EAS_SERVICE_URL>/v1/audio/speech", # <EAS_SERVICE_URL>需替换为服务访问地址。 headers={ "Authorization": "Bearer <EAS_TOKEN>", # <EAS_TOKEN>需替换为服务Token。 "Content-Type": "application/json", }, json={ "model": "CosyVoice2-0.5B", "input": { "mode": "natural_language_replication", "reference_audio_id": "<reference_audio_id>", # <reference_audio_id>需替换为参考音频ID。 "text": "北京, 你好, 收到好友从远方寄来的生日礼物,那份意外的惊喜与深深的祝福让我心中充满了甜蜜的快乐,笑容如花儿般绽放。", "instruct": "用四川话说或者用广东话说" }, "speed": 1.0, "stream": True } ) messages = SSEClient(response) for i, msg in enumerate(messages.events()): print(f"Event: {msg.event}, Data: {msg.data}") data = json.loads(msg.data) encode_buffer = data['output']['audio']['data'] decode_buffer = base64.b64decode(encode_buffer) with open(f'http_stream_{i}.wav', 'wb') as f: f.write(decode_buffer)
Websocket API
需要安装如下依赖:
pip install torch==2.7.0 torchaudio==2.7.0 websocket-client==1.8.0 websockets==12.0 pyaudio==0.2.14 soundfile==0.13.1
websocket调用示例支持流式输出和流式播放。
#!/usr/bin/python # -*- coding: utf-8 -*- import io import json import traceback import uuid import wave import base64 import asyncio import pyaudio import torch import torchaudio import websockets from typing import List, Optional class TTSClient: def __init__(self, api_key: str, uri: str, params: dict, is_play_audio: bool = False): """ 初始化 TTSClient 实例 参数: api_key (str): 鉴权用的 API Key uri (str): WebSocket 服务地址 """ self.api_key = api_key self.uri = uri self.task_id = str(uuid.uuid4()) self.ws: Optional[websockets.WebSocketClientProtocol] = None self.task_started = False self.task_finished = False self._check_params(params) self._params = params self._is_play_audio = is_play_audio self._send_queue = asyncio.Queue() self._receive_queue = asyncio.Queue() self._stop_event = asyncio.Event() pa = pyaudio.PyAudio() self._stream = pa.open(format=8, channels=1, rate=self._params.get('sample_rate', 24000), output=True) def _check_params(self, params): assert 'mode' in params and params['mode'] in ['fast_replication', 'cross_lingual_replication', 'natural_language_replication'] assert 'reference_audio_id' in params assert 'output_format' in params and params['output_format'] in ['wav', 'mp3', 'pcm'] if params['mode'] == 'natural_language_replication': assert 'instruct' in params and params['instruct'] else: if 'instruct' in params: del params['instruct'] async def _sender(self): """独立发送任务""" try: while not self._stop_event.is_set(): message = await self._send_queue.get() if message is None: # 终止信号 break await self.ws.send(message) print(f"已发送消息: {message}...") # 截断长消息 except Exception as e: print(f"发送任务出错: {e}") self._stop_event.set() async def _receiver(self): """独立接收任务""" try: async for message in self.ws: await self._receive_queue.put(message) if self._stop_event.is_set(): break except Exception as e: print(f"接收任务出错: {e}") self._stop_event.set() async def _message_handler(self): """处理接收到的消息""" while not self._stop_event.is_set(): try: message = await self._receive_queue.get() await self._process_message(message) except Exception as e: print(f"消息处理出错: {e}") self._stop_event.set() async def _audio_player(self, decode_data: str): print('播放音频数据 ...') tts_speech, sample_rate = torchaudio.load(io.BytesIO(decode_data)) audio_buffer = self.tensor_to_pcm_wav(tts_speech, sample_rate, bits_per_sample=16) wf = wave.open(audio_buffer, 'rb') # 读取数据 data = wf.readframes(1024) # 播放音频 while data: self._stream.write(data) # 将数据写入流进行播放 data = wf.readframes(1024) async def _process_message(self, message: str): """处理单条消息""" try: msg_json = json.loads(message) print(f"收到消息事件: {msg_json['header']['event']}") header = msg_json["header"] event = header["event"] if event == "task-started": self.task_started = True print("任务已启动") elif event == "result-generated": encode_data = msg_json["payload"]["output"]["audio"]["data"] decode_data = base64.b64decode(encode_data) print(f"收到音频数据,大小: {len(decode_data)} 字节") # output_file = f'output_{int(time.time())}.wav' # with open(output_file, 'wb') as f: # f.write(decode_data) # audio_stream = io.BytesIO(decode_data) # self.play_wav_audio(audio_stream) if self._is_play_audio: await self._audio_player(decode_data) elif event in ("task-finished", "task-failed"): self.task_finished = True status = "完成" if event == "task-finished" else "失败" print(f"任务已{status}:", msg_json) await self.close() except json.JSONDecodeError: print("收到非JSON消息,可能是二进制数据") except Exception as e: print(f"处理消息时出错: {e}") def tensor_to_pcm_wav(self, audio_tensor, sample_rate, bits_per_sample=16): """ 将torch tensor转换为PCM WAV文件 参数: - audio_tensor: 音频tensor,形状为 (channels, samples) 或 (samples,) - sample_rate: 采样率 - bits_per_sample: 比特深度 (16, 24, 32) """ # 确保tensor格式正确 if audio_tensor.dim() == 1: audio_tensor = audio_tensor.unsqueeze(0) # 转换为 (1, samples) # 归一化到合适的范围 if audio_tensor.dtype == torch.float32 or audio_tensor.dtype == torch.float64: # 假设浮点音频在[-1, 1]范围内 audio_tensor = torch.clamp(audio_tensor, -1.0, 1.0) # 根据比特深度进行量化 if bits_per_sample == 16: audio_tensor = (audio_tensor * 32767).to(torch.int16) elif bits_per_sample == 24: # 24位需要特殊处理 audio_tensor = (audio_tensor * 8388607).to(torch.int32) elif bits_per_sample == 32: audio_tensor = (audio_tensor * 2147483647).to(torch.int32) else: # 如果已经是整数格式,确保是正确的类型 if bits_per_sample == 16: audio_tensor = audio_tensor.to(torch.int16) elif bits_per_sample == 24: audio_tensor = audio_tensor.to(torch.int32) elif bits_per_sample == 32: audio_tensor = audio_tensor.to(torch.int32) # 保存为WAV文件 audio_bytes = io.BytesIO() if bits_per_sample == 24: # 24位需要特殊处理,因为torchaudio不完全支持 # 先保存为32位,然后可以转换为24位 # 或者使用其他库进行24位转换 torchaudio.save(audio_bytes, audio_tensor.to(torch.int32), sample_rate, bits_per_sample=32, format='wav') else: torchaudio.save(audio_bytes, audio_tensor, sample_rate, bits_per_sample=bits_per_sample, format='wav') audio_bytes.seek(0) return audio_bytes async def send_run_task(self): """发送run-task指令""" cmd = { "header": { "action": "run-task", "task_id": self.task_id, "streaming": "duplex" }, "payload": { "task_group": "audio", "task": "tts", "function": "SpeechSynthesizer", "model": "cosyvoice-v2", "parameters": { "mode": self._params['mode'], "reference_audio_id": self._params['reference_audio_id'], "instruct": self._params.get('instruct', ''), "output_format": self._params['output_format'], "sample_rate": self._params.get('sample_rate', 24000), "bit_rate": self._params.get('bit_rate', 192000), "compression_level": self._params.get('compression_level', 2), }, "input": {} } } await self._send_queue.put(json.dumps(cmd, ensure_ascii=False)) async def send_continue_task(self, text: str): """发送continue-task指令""" cmd = { "header": { "action": "continue-task", "task_id": self.task_id, "streaming": "duplex" }, "payload": { "input": { "text": text } } } await self._send_queue.put(json.dumps(cmd, ensure_ascii=False)) async def send_finish_task(self): """发送finish-task指令""" cmd = { "header": { "action": "finish-task", "task_id": self.task_id, "streaming": "duplex" }, "payload": { "input": {} } } await self._send_queue.put(json.dumps(cmd, ensure_ascii=False)) async def close(self): # 清理资源 self._stream.stop_stream() self._stream.close() self._stop_event.set() if self.ws: await self.ws.close() async def run(self, texts: List[str]): """运行客户端""" headers = { "Authorization": f"Bearer {self.api_key}", } tasks = [] try: # 建立连接 self.ws = await websockets.connect(self.uri, extra_headers=headers) print("WebSocket连接已建立") # 启动发送和接收任务 sender_task = asyncio.create_task(self._sender()) receiver_task = asyncio.create_task(self._receiver()) handler_task = asyncio.create_task(self._message_handler()) tasks.extend([sender_task, receiver_task, handler_task]) # 发送初始指令 await self.send_run_task() # 发送文本内容 for text in texts: await self.send_continue_task(text) await asyncio.sleep(1) # 控制发送频率 # 发送结束指令 await self.send_finish_task() # 等待任务完成 await self._stop_event.wait() except Exception as e: print(f"客户端运行出错: {e}\n{traceback.format_exc()}") finally: await self.close() await asyncio.gather( *tasks, return_exceptions=True ) print("客户端已关闭") async def main(): # your-api-key需替换为服务Token。 API_KEY = 'your-api-key' # localhost:50000 需替换为服务访问地址。 SERVER_URI = "ws://localhost:50000/v1/audio/speech/ws" texts = [ "床前明月光,疑是地上霜。", "举头望明月,低头思故乡。" ] params = { 'mode': 'fast_replication', 'reference_audio_id': '<reference_audio_id>', # <reference_audio_id>需替换为参考音频ID 'output_format': 'wav', 'sample_rate': 24000, 'texts': texts } client = TTSClient(API_KEY, SERVER_URI, params, is_play_audio=True) await client.run(texts) if __name__ == "__main__": asyncio.run(main())