本文介绍在高并发场景下,如何通过DashScope Java SDK高效调用Sambert语音合成服务。
Sambert语音合成服务使用WebSocket协议,在高并发场景下,频繁创建WebSocket连接会增加连接耗时并消耗大量资源。
在使用DashScope Java SDK时,您可以根据服务器的实际情况,通过合理设置连接池和对象池的大小来降低运行开销。
前提条件
推荐配置
连接池和对象池不是越多越好,过少或过多都会导致程序运行变慢。建议您根据自己服务器的实际规格进行配置。
在服务器上只运行Sambert语音合成服务的情况下,进行测试后得到了如下推荐配置供您参考:
常见机器配置(阿里云) | 单机最大并发数 | 对象池大小 | 连接池大小 |
4核8GiB | 600 | 1200 | 2000 |
单机并发数指的是同一时刻正在运行的Sambert语音合成任务的数量,也可以理解为工作线程数。
在高并发调用时,同一个对象会复用同一个WebSocket连接,因此WebSocket连接只会在服务启动时创建。
需要注意的是,同时创建过多的WebSocket连接会导致阻塞,因此在实际启动服务时应逐步提高单机并发数。
可配置参数
连接池
DashScope Java SDK使用了OkHttp3提供的连接池来复用WebSocket连接,从而减少频繁创建WebSocket连接的耗时和资源开销。
连接池是DashScope SDK默认开启的优化项,您需要根据使用场景配置连接池的大小。
请在运行Java服务前,通过环境变量的方式提前按需配置好连接池的相关参数。连接池配置参数如下:
DASHSCOPE_CONNECTION_POOL_SIZE | 配置连接池大小。默认值为32。 推荐配置为您的峰值并发数的2倍以上。 |
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS | 配置最大异步请求数。默认值为32。 推荐配置为和连接池大小一致。 更多信息参见参考文档。 |
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST | 配置单host最大异步请求数。默认值为32。 推荐配置为和连接池大小一致。 更多信息参见参考文档。 |
对象池
推荐使用对象池的方式来复用SpeechSynthesizer
对象,这样可以进一步降低反复创建和销毁对象带来的内存和时间开销。
请在运行 Java 服务前,通过环境变量或代码的方式提前按需配置好对象池的大小。对象池配置参数如下:
SAMBERT_OBJECTPOOL_SIZE | 对象池大小。 推荐配置为您的峰值并发数的1.5~2倍。 对象池大小需要小于或等于连接池大小,否则会出现对象等待连接的情况,导致调用阻塞。 |
关于如何配置环境变量,可参考配置API Key到环境变量。
示例代码
以下为使用资源池的示例代码。其中,对象池为全局单例对象。
每个主账号默认每秒可提交3个Sambert语音合成任务。
如需开通更高QPS请联系我们。
示例代码中,不同的线程通过等待随机时间来避免同时创建过多的WebSocket连接。
import com.alibaba.dashscope.audio.tts.SpeechSynthesisAudioFormat;
import com.alibaba.dashscope.audio.tts.SpeechSynthesisParam;
import com.alibaba.dashscope.audio.tts.SpeechSynthesisResult;
import com.alibaba.dashscope.audio.tts.SpeechSynthesizer;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.time.LocalDateTime;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
* Before making high-concurrency calls to the TTS service,
* please configure the connection pool size through following environment
* variables.
*
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS=2000
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST=2000
* DASHSCOPE_CONNECTION_POOL_SIZE=2000
*
* The default is 32, and it is recommended to set it to 2 times the maximum
* concurrent connections of a single server.
*/
@Slf4j
public class SynthesizeTextToSpeechUsingSambertConcurrently {
public static void checkoutEnv(String envName, int defaultSize) {
if (System.getenv(envName) != null) {
System.out.println("[ENV CHECK]: " + envName + " "
+ System.getenv(envName));
} else {
System.out.println("[ENV CHECK]: " + envName
+ " Using Default which is " + defaultSize);
}
}
public static void main(String[] args)
throws InterruptedException, NoApiKeyException {
// Check for connection pool env
checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
checkoutEnv(SambertObjectPool.SAMBERT_OBJECTPOOL_SIZE_ENV, SambertObjectPool.DEFAULT_CONNECTION_POOL_SIZE);
checkoutEnv(SambertObjectPool.SAMBERT_OBJECTPOOL_SIZE_ENV, 500);
// Record task start time
int runTimes = 1;
// Create the pool of SpeechSynthesis objects
ExecutorService executorService = Executors.newFixedThreadPool(runTimes);
for (int i = 0; i < runTimes; i++) {
// Record the task submission time
LocalDateTime submissionTime = LocalDateTime.now();
executorService.submit(new SynthesizeTask(new String[]{
"床前明月光,",
"疑似地上霜。",
"举头望明月,",
"低头思故乡。"
}));
}
// Shut down the ExecutorService and wait for all tasks to complete
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
System.exit(0);
}
}
class SpeechSynthesizerObjectFactory
extends BasePooledObjectFactory<SpeechSynthesizer> {
public SpeechSynthesizerObjectFactory() {
super();
}
@Override
public SpeechSynthesizer create() throws Exception {
return new SpeechSynthesizer();
}
@Override
public PooledObject<SpeechSynthesizer> wrap(SpeechSynthesizer obj) {
return new DefaultPooledObject<>(obj);
}
}
class SambertObjectPool {
public static GenericObjectPool<SpeechSynthesizer> synthesizerPool;
public static String SAMBERT_OBJECTPOOL_SIZE_ENV = "SAMBERT_OBJECTPOOL_SIZE";
public static int DEFAULT_CONNECTION_POOL_SIZE = 500;
private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
public static int getObjectivePoolSize() {
try {
Integer n = Integer.parseInt(System.getenv(SAMBERT_OBJECTPOOL_SIZE_ENV));
return n;
} catch (NumberFormatException e) {
return DEFAULT_CONNECTION_POOL_SIZE;
}
}
public static GenericObjectPool<SpeechSynthesizer> getInstance() {
lock.lock();
if (synthesizerPool == null) {
// You can set the object pool size here. or in environment variable
// SAMBERT_OBJECTPOOL_SIZE It is recommended to set it to 1.5 to 2 times
// your server's maximum concurrent connections.
int objectPoolSize = getObjectivePoolSize();
SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
new SpeechSynthesizerObjectFactory();
GenericObjectPoolConfig<SpeechSynthesizer> config =
new GenericObjectPoolConfig<>();
config.setMaxTotal(objectPoolSize);
config.setMinIdle(objectPoolSize);
config.setMinIdle(objectPoolSize);
synthesizerPool =
new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
}
lock.unlock();
return synthesizerPool;
}
}
class SynthesizeTask implements Runnable {
String[] textList;
String requestId;
long timeCost;
public SynthesizeTask(String[] textList) {
this.textList = textList;
}
@Override
public void run() {
// sleep random time before start task, avoid creating too much websocket at the same time.
Random random = new Random();
try {
Thread.sleep(random.nextInt(30*1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (String text:textList) {
SpeechSynthesizer synthesizer = null;
long startTime = System.currentTimeMillis();
try {
CountDownLatch latch = new CountDownLatch(1);
class ReactCallback extends ResultCallback<SpeechSynthesisResult> {
ReactCallback() {}
@Override
public void onEvent(SpeechSynthesisResult message) {
if (message.getAudioFrame() != null) {
try {
byte[] bytesArray = message.getAudioFrame().array();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Override
public void onComplete() {
latch.countDown();
}
@Override
public void onError(Exception e) {
System.out.println(e.getMessage());
e.printStackTrace();
latch.countDown();
}
}
// you can set your dashscope apikey here by code or in environment
// variable DASHSCOPE_API_KEY
String dashScopeApiKey = "your-dashscope-apikey";
SpeechSynthesisParam param =
SpeechSynthesisParam.builder()
.model("sambert-zhichu-v1")
.format(SpeechSynthesisAudioFormat.MP3) // 使用PCM或者MP3
.text(text)
.enablePhonemeTimestamp(true)
.enableWordTimestamp(true)
.apiKey(dashScopeApiKey)
.build();
try {
synthesizer = SambertObjectPool.getInstance().borrowObject();
synthesizer.call(param, new ReactCallback());
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
requestId = synthesizer.getLastRequestId();
} catch (Exception e) {
System.out.println("Exception e: " + e.toString());
synthesizer.getSyncApi().close(1000, "bye");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (synthesizer != null) {
try {
// Return the SpeechSynthesizer object to the pool
SambertObjectPool.getInstance().returnObject(synthesizer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
long endTime = System.currentTimeMillis();
timeCost = endTime - startTime;
System.out.println("[线程" + Thread.currentThread() + "] 语音合成任务:(" + text + ")结束。耗时" + timeCost + "ms, RequestId" + requestId);
}
}
}
异常处理
在服务出现TaskFailed报错时,不需要额外处理。
如果在语音合成中途,客户端出现错误(如SDK内部异常或业务逻辑异常)导致语音合成任务未完成,则需要您主动关闭连接。
关闭连接方法如下:
// 将下面这段代码放在try-catch块中 synthesizer.getSyncApi().close(1000, "bye");