CosyVoice 语音合成服务基于 WebSocket 协议,以支持流式实时通信。然而,在高并发场景下,为每个请求独立创建和销毁 WebSocket 连接会产生巨大的网络与系统资源开销,并引入显著的连接延迟。为优化性能并确保稳定性,DashScope SDK 内置了高效的资源复用机制(如连接池与对象池)。本文档将详细介绍如何利用 DashScope Python 和 Java SDK 中的这些特性,在高并发场景下高效调用 CosyVoice 服务。
本文档仅适用于“中国大陆(北京)”地域,且必须使用该地域的API Key。
前提条件
已安装符合版本要求的DashScope SDK,建议安装最新版:
Python SDK:版本≥1.25.2
Java SDK:版本≥2.16.6
Python SDK:对象池优化
Python SDK 通过 SpeechSynthesizerObjectPool 实现对象池优化,用于管理和复用 SpeechSynthesizer 对象。
对象池在初始化时会立即创建指定数量的 SpeechSynthesizer 实例并预先建立 WebSocket 连接。从池中获取对象时无需等待连接建立,可直接发起请求,有效降低首包延迟。当任务完成并将对象归还到对象池后,其 WebSocket 连接不会关闭,而是保持活跃状态等待下次任务复用。
实现步骤
安装依赖:安装DashScope依赖(
pip install -U dashscope)创建并配置对象池
对象池大小需要通过
SpeechSynthesizerObjectPool进行设置。推荐值:峰值并发数的 1.5 至 2 倍。对象池大小不应超过您账户的 QPS(每秒查询率)限制。通过以下代码创建全局单例固定大小对象池。对象池在初始化时会立即创建指定数量的
SpeechSynthesizer对象并建立 WebSocket 连接,因此会有一定耗时。from dashscope.audio.tts_v2 import SpeechSynthesizerObjectPool synthesizer_object_pool = SpeechSynthesizerObjectPool(max_size=20)从对象池中获取
SpeechSynthesizer对象如果当前未归还的对象数量已超过对象池的最大容量,系统会额外创建一个新的
SpeechSynthesizer对象。此类新创建的对象需要重新进行初始化并建立 WebSocket 连接,无法利用对象池的既有连接资源,因此不具备复用效果。
speech_synthesizer = connectionPool.borrow_synthesizer( model='cosyvoice-v3-flash', voice='longanyang', seed=12382, callback=synthesizer_callback )进行语音合成
调用
SpeechSynthesizer对象的call或streaming_call方法进行语音合成。归还
SpeechSynthesizer对象语音合成任务结束后,归还
SpeechSynthesizer对象,以便后续任务可以复用该对象。不要归还未完成任务或任务失败的对象。
connectionPool.return_synthesizer(speech_synthesizer)
完整代码
# !/usr/bin/env python3
# Copyright (C) Alibaba Group. All Rights Reserved.
# MIT License (https://opensource.org/licenses/MIT)
import os
import time
import threading
import dashscope
from dashscope.audio.tts_v2 import *
USE_CONNECTION_POOL = True
text_to_synthesize = [
'第一句、欢迎使用阿里巴巴语音合成服务。',
'第二句、欢迎使用阿里巴巴语音合成服务。',
'第三句、欢迎使用阿里巴巴语音合成服务。',
]
connectionPool = None
if USE_CONNECTION_POOL:
print('creating connection pool')
start_time = time.time() * 1000
connectionPool = SpeechSynthesizerObjectPool(max_size=3)
end_time = time.time() * 1000
print('connection pool created, cost: {} ms'.format(end_time - start_time))
def init_dashscope_api_key():
'''
Set your DashScope API-key. More information:
https://github.com/aliyun/alibabacloud-bailian-speech-demo/blob/master/PREREQUISITES.md
'''
if 'DASHSCOPE_API_KEY' in os.environ:
dashscope.api_key = os.environ[
'DASHSCOPE_API_KEY'] # load API-key from environment variable DASHSCOPE_API_KEY
else:
dashscope.api_key = '<your-dashscope-api-key>' # set API-key manually
def synthesis_text_to_speech_and_play_by_streaming_mode(text, task_id):
global USE_CONNECTION_POOL, connectionPool
'''
Synthesize speech with given text by streaming mode, async call and play the synthesized audio in real-time.
for more information, please refer to https://help.aliyun.com/document_detail/2712523.html
'''
complete_event = threading.Event()
# Define a callback to handle the result
class Callback(ResultCallback):
def on_open(self):
# when using object pool, on_open will be called after task start
self.file = open(f'result_{task_id}.mp3', 'wb')
print(f'[task_{task_id}] start')
def on_complete(self):
print(f'[task_{task_id}] speech synthesis task complete successfully.')
complete_event.set()
def on_error(self, message: str):
print(f'[task_{task_id}] speech synthesis task failed, {message}')
def on_close(self):
# when using object pool, on_open will be called after task finished
print(f'[task_{task_id}] finished')
def on_event(self, message):
# print(f'recv speech synthsis message {message}')
pass
def on_data(self, data: bytes) -> None:
# send to player
# save audio to file
self.file.write(data)
# Call the speech synthesizer callback
synthesizer_callback = Callback()
# Initialize the speech synthesizer
# you can customize the synthesis parameters, like voice, format, sample_rate or other parameters
if USE_CONNECTION_POOL:
speech_synthesizer = connectionPool.borrow_synthesizer(
model='cosyvoice-v3-flash',
voice='longanyang',
seed=12382,
callback=synthesizer_callback
)
else:
speech_synthesizer = SpeechSynthesizer(model='cosyvoice-v3-flash',
voice='longanyang',
seed=12382,
callback=synthesizer_callback)
try:
speech_synthesizer.call(text)
except Exception as e:
print(f'[task_{task_id}] speech synthesis task failed, {e}')
if USE_CONNECTION_POOL:
# close the synthesizer connection manually if task failed when using connection pool.
speech_synthesizer.close()
return
print('[task_{}] Synthesized text: {}'.format(task_id, text))
complete_event.wait()
print('[task_{}][Metric] requestId: {}, first package delay ms: {}'.format(
task_id,
speech_synthesizer.get_last_request_id(),
speech_synthesizer.get_first_package_delay()))
if USE_CONNECTION_POOL:
connectionPool.return_synthesizer(speech_synthesizer)
# main function
if __name__ == '__main__':
init_dashscope_api_key()
task_thread_list = []
for task_id in range(3):
thread = threading.Thread(
target=synthesis_text_to_speech_and_play_by_streaming_mode,
args=(text_to_synthesize[task_id], task_id))
task_thread_list.append(thread)
for task_thread in task_thread_list:
task_thread.start()
for task_thread in task_thread_list:
task_thread.join()
if USE_CONNECTION_POOL:
connectionPool.shutdown()资源管理与异常处理
任务成功:当语音合成任务正常完成时,必须调用
connectionPool.return_synthesizer(speech_synthesizer)将SpeechSynthesizer对象归还到池中,以便复用。重要不要归还未完成任务或任务失败的
SpeechSynthesizer对象。任务失败:当 SDK 内部或业务逻辑抛出异常导致任务中断时,主动关闭底层的 WebSocket 连接:
speech_synthesizer.close()在所有语音合成任务完成后,要通过如下方式关闭对象池:
connectionPool.shutdown()。在服务出现TaskFailed报错时,不需要额外处理。
Java SDK:连接池与对象池优化
Java SDK通过内置的连接池和自定义的对象池协同工作,实现最佳性能。
连接池:SDK 内部集成的 OkHttp3 连接池,负责管理和复用底层的 WebSocket 连接,减少网络握手开销。此功能默认开启。
对象池:基于
commons-pool2实现,用于维护一组已预先建立好连接的SpeechSynthesizer对象。从池中获取对象可消除连接建立的延迟,显著降低首包延迟。
实现步骤
添加依赖
根据项目构建工具,在依赖配置文件中添加 dashscope-sdk-java 和 commons-pool2。
以Maven和Gradle为例,配置如下:
Maven
打开您的Maven项目的
pom.xml文件。在
<dependencies>标签内添加以下依赖信息。
<dependency> <groupId>com.alibaba</groupId> <artifactId>dashscope-sdk-java</artifactId> <!-- 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java --> <version>the-latest-version</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <!-- 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 --> <version>the-latest-version</version> </dependency>保存
pom.xml文件。使用Maven命令(如
mvn clean install或mvn compile)来更新项目依赖
Gradle
打开您的Gradle项目的
build.gradle文件。在
dependencies块内添加以下依赖信息。dependencies { // 请将 'the-latest-version' 替换为2.16.6及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java implementation group: 'com.alibaba', name: 'dashscope-sdk-java', version: 'the-latest-version' // 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 implementation group: 'org.apache.commons', name: 'commons-pool2', version: 'the-latest-version' }保存
build.gradle文件。在命令行中,切换到您的项目根目录,执行以下Gradle命令来更新项目依赖。
./gradlew build --refresh-dependencies或者,如果您使用的是Windows系统,命令应为:
gradlew build --refresh-dependencies
配置连接池
通过环境变量配置连接池关键参数:
环境变量
描述
DASHSCOPE_CONNECTION_POOL_SIZE
连接池大小。
推荐值:峰值并发数的 2 倍以上。
默认值:32。
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS
最大异步请求数。
推荐值:与
DASHSCOPE_CONNECTION_POOL_SIZE保持一致。默认值:32。
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST
单主机最大异步请求数。
推荐值:与
DASHSCOPE_CONNECTION_POOL_SIZE保持一致。默认值:32。
配置对象池
通过环境变量配置对象池大小:
环境变量
描述
COSYVOICE_OBJECTPOOL_SIZE
对象池大小。
推荐值:峰值并发数的 1.5 至 2 倍。
默认值:500。
重要对象池的大小(
COSYVOICE_OBJECTPOOL_SIZE)必须小于或等于连接池的大小(DASHSCOPE_CONNECTION_POOL_SIZE)。否则,当对象池请求对象时,若连接池已满,会导致调用线程阻塞,等待可用连接。对象池大小不应超过您账户的 QPS(每秒查询率)限制。
通过如下代码创建对象池:
class CosyvoiceObjectPool { // 。。。这里省略其它代码,完整示例请参见完整代码 public static GenericObjectPool<SpeechSynthesizer> getInstance() { lock.lock(); if (synthesizerPool == null) { // 您可以在这里设置对象池的大小。或在环境变量COSYVOICE_OBJECTPOOL_SIZE中设置。 // 建议设置为服务器最大并发连接数的1.5到2倍。 int objectPoolSize = getObjectivePoolSize(); SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory = new SpeechSynthesizerObjectFactory(); GenericObjectPoolConfig<SpeechSynthesizer> config = new GenericObjectPoolConfig<>(); config.setMaxTotal(objectPoolSize); config.setMaxIdle(objectPoolSize); config.setMinIdle(objectPoolSize); synthesizerPool = new GenericObjectPool<>(speechSynthesizerObjectFactory, config); } lock.unlock(); return synthesizerPool; } }从对象池中获取
SpeechSynthesizer对象如果当前未归还的对象数量已超过对象池的最大容量,系统会额外创建一个新的
SpeechSynthesizer对象。此类新创建的对象需要重新进行初始化并建立 WebSocket 连接,无法利用对象池的既有连接资源,因此不具备复用效果。
synthesizer = CosyvoiceObjectPool.getInstance().borrowObject();进行语音合成
调用
SpeechSynthesizer对象的call或streamingCall方法进行语音合成。归还
SpeechSynthesizer对象语音合成任务结束后,归还
SpeechSynthesizer对象,以便后续任务可以复用该对象。不要归还未完成任务或任务失败的对象。
CosyvoiceObjectPool.getInstance().returnObject(synthesizer);
完整代码
package org.alibaba.bailian.example.examples;
import com.alibaba.dashscope.audio.tts.SpeechSynthesisResult;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisAudioFormat;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisParam;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesizer;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
* 您需要在项目中引入org.apache.commons.pool2和DashScope相关的包。
*
* DashScope SDK 2.16.6及后续版本针对高并发场景进行了优化,
* DashScope SDK 2.16.6之前的版本不推荐在高并发场景下使用。
*
*
* 在对TTS服务进行高并发调用之前,
* 请通过以下环境变量配置连接池的相关参数。
*
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST
* DASHSCOPE_CONNECTION_POOL_SIZE
*
*/
class SpeechSynthesizerObjectFactory
extends BasePooledObjectFactory<SpeechSynthesizer> {
public SpeechSynthesizerObjectFactory() {
super();
}
@Override
public SpeechSynthesizer create() throws Exception {
return new SpeechSynthesizer();
}
@Override
public PooledObject<SpeechSynthesizer> wrap(SpeechSynthesizer obj) {
return new DefaultPooledObject<>(obj);
}
}
class CosyvoiceObjectPool {
public static GenericObjectPool<SpeechSynthesizer> synthesizerPool;
public static String COSYVOICE_OBJECTPOOL_SIZE_ENV = "COSYVOICE_OBJECTPOOL_SIZE";
public static int DEFAULT_OBJECT_POOL_SIZE = 500;
private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
public static int getObjectivePoolSize() {
try {
Integer n = Integer.parseInt(System.getenv(COSYVOICE_OBJECTPOOL_SIZE_ENV));
System.out.println("Using Object Pool Size In Env: "+ n);
return n;
} catch (NumberFormatException e) {
System.out.println("Using Default Object Pool Size: "+ DEFAULT_OBJECT_POOL_SIZE);
return DEFAULT_OBJECT_POOL_SIZE;
}
}
public static GenericObjectPool<SpeechSynthesizer> getInstance() {
lock.lock();
if (synthesizerPool == null) {
// 您可以在这里设置对象池的大小。或在环境变量COSYVOICE_OBJECTPOOL_SIZE中设置。
// 建议设置为服务器最大并发连接数的1.5到2倍。
int objectPoolSize = getObjectivePoolSize();
SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
new SpeechSynthesizerObjectFactory();
GenericObjectPoolConfig<SpeechSynthesizer> config =
new GenericObjectPoolConfig<>();
config.setMaxTotal(objectPoolSize);
config.setMaxIdle(objectPoolSize);
config.setMinIdle(objectPoolSize);
synthesizerPool =
new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
}
lock.unlock();
return synthesizerPool;
}
}
class SynthesizeTaskWithCallback implements Runnable {
String[] textArray;
String requestId;
long timeCost;
public SynthesizeTaskWithCallback(String[] textArray) {
this.textArray = textArray;
}
@Override
public void run() {
SpeechSynthesizer synthesizer = null;
long startTime = System.currentTimeMillis();
// if recv onError
final boolean[] hasError = {false};
try {
class ReactCallback extends ResultCallback<SpeechSynthesisResult> {
ReactCallback() {}
@Override
public void onEvent(SpeechSynthesisResult message) {
if (message.getAudioFrame() != null) {
try {
byte[] bytesArray = message.getAudioFrame().array();
System.out.println("收到音频,音频文件流length为:" + bytesArray.length);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Override
public void onComplete() {}
@Override
public void onError(Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
hasError[0] = true;
}
}
// 将your-dashscope-api-key替换成您自己的API-KEY
String dashScopeApiKey = "your-dashscope-api-key";
SpeechSynthesisParam param =
SpeechSynthesisParam.builder()
.model("cosyvoice-v3-flash")
.voice("longanyang")
.format(SpeechSynthesisAudioFormat
.MP3_22050HZ_MONO_256KBPS) // 流式合成使用PCM或者MP3
.apiKey(dashScopeApiKey)
.build();
try {
synthesizer = CosyvoiceObjectPool.getInstance().borrowObject();
synthesizer.updateParamAndCallback(param, new ReactCallback());
for (String text : textArray) {
synthesizer.streamingCall(text);
}
Thread.sleep(20);
synthesizer.streamingComplete(60000);
requestId = synthesizer.getLastRequestId();
} catch (Exception e) {
System.out.println("Exception e: " + e.toString());
hasError[0] = true;
}
} catch (Exception e) {
hasError[0] = true;
throw new RuntimeException(e);
}
if (synthesizer != null) {
try {
if (hasError[0] == true) {
// 如果出现异常,则关闭连接并在对象池中禁用该对象。
synthesizer.getDuplexApi().close(1000, "bye");
CosyvoiceObjectPool.getInstance().invalidateObject(synthesizer);
} else {
// 如果任务正常结束,则归还对象。
CosyvoiceObjectPool.getInstance().returnObject(synthesizer);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
timeCost = endTime - startTime;
System.out.println("[线程 " + Thread.currentThread() + "] 语音合成任务结束。耗时 " + timeCost + " ms, RequestId " + requestId);
}
}
}
@Slf4j
public class SynthesizeTextToSpeechWithCallbackConcurrently {
public static void checkoutEnv(String envName, int defaultSize) {
if (System.getenv(envName) != null) {
System.out.println("[ENV CHECK]: " + envName + " "
+ System.getenv(envName));
} else {
System.out.println("[ENV CHECK]: " + envName
+ " Using Default which is " + defaultSize);
}
}
public static void main(String[] args)
throws InterruptedException, NoApiKeyException {
// Check for connection pool env
checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
checkoutEnv(CosyvoiceObjectPool.COSYVOICE_OBJECTPOOL_SIZE_ENV, CosyvoiceObjectPool.DEFAULT_OBJECT_POOL_SIZE);
int runTimes = 3;
// Create the pool of SpeechSynthesis objects
ExecutorService executorService = Executors.newFixedThreadPool(runTimes);
for (int i = 0; i < runTimes; i++) {
// Record the task submission time
LocalDateTime submissionTime = LocalDateTime.now();
executorService.submit(new SynthesizeTaskWithCallback(new String[] {
"床前明月光,", "疑似地上霜。", "举头望明月,", "低头思故乡。"}));
}
// Shut down the ExecutorService and wait for all tasks to complete
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
System.exit(0);
}
}推荐配置
以下配置基于在指定规格的阿里云服务器上仅运行 CosyVoice 语音合成服务的测试结果。过高的并发数可能导致任务处理延迟。
其中单机并发数指的是同一时刻正在运行的CosyVoice语音合成任务数,也可以理解为工作线程数。
机器配置(阿里云) | 单机最大并发数 | 对象池大小 | 连接池大小 |
4核8GiB | 100 | 500 | 2000 |
8核16GiB | 150 | 500 | 2000 |
16核32GiB | 200 | 500 | 2000 |
资源管理与异常处理
任务成功:当语音合成任务正常完成时,必须调用GenericObjectPool的returnObject方法将
SpeechSynthesizer对象归还到池中,以便复用。在当前代码中,对应
CosyvoiceObjectPool.getInstance().returnObject(synthesizer)重要不要归还未完成任务或任务失败的
SpeechSynthesizer对象。任务失败:当 SDK 内部或业务逻辑抛出异常导致任务中断时,必须执行以下两个操作:
主动关闭底层的 WebSocket 连接
从对象池中废弃该对象,防止被再次使用
// 在当前代码中对应如下内容 // 关闭连接 synthesizer.getDuplexApi().close(1000, "bye"); // 在对象池中废弃出现异常的synthesizer CosyvoiceObjectPool.getInstance().invalidateObject(synthesizer);在服务出现TaskFailed报错时,不需要额外处理。