Paraformer 实时语音识别服务基于 WebSocket 协议,以支持流式实时通信。然而,在高并发场景下,为每个请求独立创建和销毁 WebSocket 连接会产生巨大的网络与系统资源开销,并引入显著的连接延迟。为优化性能并确保稳定性,DashScope SDK 内置了高效的资源复用机制(如连接池与对象池)。本文档将详细介绍如何利用 DashScope Java SDK 中的这些特性,在高并发场景下高效调用 Paraformer 实时语音识别服务。
本文档仅适用于“中国内地(北京)”地域,且必须使用该地域的API Key。
用户指南:关于模型介绍和选型建议请参见实时语音识别-Fun-ASR/Gummy/Paraformer。
在线体验:仅paraformer-realtime-v2、paraformer-realtime-8k-v2和paraformer-realtime-v1支持在线体验。
前提条件
Java SDK通过内置的连接池和自定义的对象池协同工作,实现最佳性能。
连接池:SDK 内部集成的 OkHttp3 连接池,负责管理和复用底层的 WebSocket 连接,减少网络握手开销。此功能默认开启。
对象池:基于
commons-pool2实现,用于维护一组已预先建立好连接的Recognition对象。从池中获取对象可消除连接建立的延迟,显著降低首包延迟。
实现步骤
添加依赖
根据项目构建工具,在依赖配置文件中添加 dashscope-sdk-java 和 commons-pool2。
以Maven和Gradle为例,配置如下:
Maven
打开您的Maven项目的
pom.xml文件。在
<dependencies>标签内添加以下依赖信息。
<dependency> <groupId>com.alibaba</groupId> <artifactId>dashscope-sdk-java</artifactId> <!-- 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java --> <version>the-latest-version</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <!-- 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 --> <version>the-latest-version</version> </dependency>保存
pom.xml文件。使用Maven命令(如
mvn clean install或mvn compile)来更新项目依赖
Gradle
打开您的Gradle项目的
build.gradle文件。在
dependencies块内添加以下依赖信息。dependencies { // 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java implementation group: 'com.alibaba', name: 'dashscope-sdk-java', version: 'the-latest-version' // 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 implementation group: 'org.apache.commons', name: 'commons-pool2', version: 'the-latest-version' }保存
build.gradle文件。在命令行中,切换到您的项目根目录,执行以下Gradle命令来更新项目依赖。
./gradlew build --refresh-dependencies或者,如果您使用的是Windows系统,命令应为:
gradlew build --refresh-dependencies
配置连接池
通过环境变量配置连接池关键参数:
环境变量
描述
DASHSCOPE_CONNECTION_POOL_SIZE
连接池大小。
推荐值:峰值并发数的 2 倍以上。
默认值:32。
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS
最大异步请求数。
推荐值:与
DASHSCOPE_CONNECTION_POOL_SIZE保持一致。默认值:32。
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST
单主机最大异步请求数。
推荐值:与
DASHSCOPE_CONNECTION_POOL_SIZE保持一致。默认值:32。
配置对象池
通过环境变量配置对象池大小:
环境变量
描述
RECOGNITION_OBJECTPOOL_SIZE
对象池大小。
推荐值:峰值并发数的 1.5 至 2 倍。
默认值:500。
重要对象池的大小(
RECOGNITION_OBJECTPOOL_SIZE)必须小于或等于连接池的大小(DASHSCOPE_CONNECTION_POOL_SIZE)。否则,当对象池请求对象时,若连接池已满,会导致调用线程阻塞,等待可用连接。对象池大小不应超过您账户的 QPS(每秒查询率)限制。
通过如下代码创建对象池:
class RecognitionObjectPool { // 。。。这里省略其它代码,完整示例请参见完整代码 public static GenericObjectPool<Recognition> getInstance() { lock.lock(); if (recognitionGenericObjectPool == null) { // 您可以在这里设置对象池的大小。或在环境变量RECOGNITION_OBJECTPOOL_SIZE中设置。 // 建议设置为服务器最大并发连接数的1.5到2倍。 int objectPoolSize = getObjectivePoolSize(); System.out.println("RECOGNITION_OBJECTPOOL_SIZE: " + objectPoolSize); RecognitionObjectFactory recognitionObjectFactory = new RecognitionObjectFactory(); GenericObjectPoolConfig<Recognition> config = new GenericObjectPoolConfig<>(); config.setMaxTotal(objectPoolSize); config.setMaxIdle(objectPoolSize); config.setMinIdle(objectPoolSize); recognitionGenericObjectPool = new GenericObjectPool<>(recognitionObjectFactory, config); } lock.unlock(); return recognitionGenericObjectPool; } }从对象池中获取Recognition对象
如果当前未归还的对象数量已超过对象池的最大容量,系统会额外创建一个新的
Recognition对象。此类新创建的对象需要重新进行初始化并建立 WebSocket 连接,无法利用对象池的既有连接资源,因此不具备复用效果。
recognizer = RecognitionObjectPool.getInstance().borrowObject();进行语音识别
调用
Recognition对象的call或streamCall方法进行语音识别。归还
Recognition对象语音识别任务结束后,归还Recognition对象,以便后续任务可以复用该对象。
不要归还未完成任务或任务失败的对象。
RecognitionObjectPool.getInstance().returnObject(recognizer);
完整代码
package org.alibaba.bailian.example.examples;
import com.alibaba.dashscope.audio.asr.recognition.Recognition;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionParam;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionResult;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.ApiKey;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
* Before making high-concurrency calls to the ASR service,
* please configure the connection pool size through following environment
* variables.
*
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS=2000
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST=2000
* DASHSCOPE_CONNECTION_POOL_SIZE=2000
*
* The default is 32, and it is recommended to set it to 2 times the maximum
* concurrent connections of a single server.
*/
public class Main {
public static void checkoutEnv(String envName, int defaultSize) {
if (System.getenv(envName) != null) {
System.out.println("[ENV CHECK]: " + envName + " "
+ System.getenv(envName));
} else {
System.out.println("[ENV CHECK]: " + envName
+ " Using Default which is " + defaultSize);
}
}
public static void main(String[] args)
throws NoApiKeyException, InterruptedException {
// Check for connection pool env
checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
checkoutEnv(RecognitionObjectPool.RECOGNITION_OBJECTPOOL_SIZE_ENV, RecognitionObjectPool.DEFAULT_OBJECT_POOL_SIZE);
int threadNums = 3;
String currentDir = System.getProperty("user.dir");
// Please replace the path with your audio source
Path[] filePaths = {
Paths.get(currentDir, "asr_example.wav"),
Paths.get(currentDir, "asr_example.wav"),
Paths.get(currentDir, "asr_example.wav"),
};
// Use ThreadPool to run recognition tasks
ExecutorService executorService = Executors.newFixedThreadPool(threadNums);
for (int i = 0; i < threadNums; i++) {
executorService.submit(new RealtimeRecognizeTask(filePaths));
}
executorService.shutdown();
// wait for all tasks to complete
executorService.awaitTermination(10, TimeUnit.MINUTES);
System.exit(0);
}
}
class RecognitionObjectFactory extends BasePooledObjectFactory<Recognition> {
public RecognitionObjectFactory() {
super();
}
@Override
public Recognition create() throws Exception {
return new Recognition();
}
@Override
public PooledObject<Recognition> wrap(Recognition obj) {
return new DefaultPooledObject<>(obj);
}
}
class RecognitionObjectPool {
public static GenericObjectPool<Recognition> recognitionGenericObjectPool;
public static String RECOGNITION_OBJECTPOOL_SIZE_ENV =
"RECOGNITION_OBJECTPOOL_SIZE";
public static int DEFAULT_OBJECT_POOL_SIZE = 500;
private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
public static int getObjectivePoolSize() {
try {
Integer n = Integer.parseInt(System.getenv(RECOGNITION_OBJECTPOOL_SIZE_ENV));
return n;
} catch (NumberFormatException e) {
return DEFAULT_OBJECT_POOL_SIZE;
}
}
public static GenericObjectPool<Recognition> getInstance() {
lock.lock();
if (recognitionGenericObjectPool == null) {
// You can set the object pool size here. or in environment variable
// RECOGNITION_OBJECTPOOL_SIZE It is recommended to set it to 1.5 to 2
// times your server's maximum concurrent connections.
int objectPoolSize = getObjectivePoolSize();
System.out.println("RECOGNITION_OBJECTPOOL_SIZE: "
+ objectPoolSize);
RecognitionObjectFactory recognitionObjectFactory =
new RecognitionObjectFactory();
GenericObjectPoolConfig<Recognition> config =
new GenericObjectPoolConfig<>();
config.setMaxTotal(objectPoolSize);
config.setMaxIdle(objectPoolSize);
config.setMinIdle(objectPoolSize);
recognitionGenericObjectPool =
new GenericObjectPool<>(recognitionObjectFactory, config);
}
lock.unlock();
return recognitionGenericObjectPool;
}
}
class RealtimeRecognizeTask implements Runnable {
private static final Object lock = new Object();
private Path[] filePaths;
public RealtimeRecognizeTask(Path[] filePaths) {
this.filePaths = filePaths;
}
/**
* Set your DashScope API key. In
* fact, if you have set DASHSCOPE_API_KEY in your environment variable, you
* can ignore this, and the SDK will automatically get the api_key from the
* environment variable
* */
private static String getDashScopeApiKey() throws NoApiKeyException {
String dashScopeApiKey = null;
try {
ApiKey apiKey = new ApiKey();
dashScopeApiKey =
ApiKey.getApiKey(null); // Retrieve from environment variable.
} catch (NoApiKeyException e) {
System.out.println("No API key found in environment.");
}
if (dashScopeApiKey == null) {
// If you cannot set api_key in your environment variable,
// you can set it here by code
dashScopeApiKey = "your-dashscope-apikey";
}
return dashScopeApiKey;
}
public void runCallback() {
for (Path filePath : filePaths) {
// Create recognition params
// you can customize the recognition parameters, like model, format,
// sample_rate
RecognitionParam param = null;
try {
param =
RecognitionParam.builder()
.model("paraformer-realtime-v2")
.format(
"pcm") // 'pcm'、'wav'、'opus'、'speex'、'aac'、'amr', you
// can check the supported formats in the document
.sampleRate(16000) // supported 8000、16000
.apiKey(getDashScopeApiKey()) // use getDashScopeApiKey to get
// api key.
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
Recognition recognizer = null;
// if recv onError
final boolean[] hasError = {false};
try {
recognizer = RecognitionObjectPool.getInstance().borrowObject();
String threadName = Thread.currentThread().getName();
ResultCallback<RecognitionResult> callback =
new ResultCallback<RecognitionResult>() {
@Override
public void onEvent(RecognitionResult message) {
synchronized (lock) {
if (message.isSentenceEnd()) {
System.out.println("[process " + threadName
+ "] Fix:" + message.getSentence().getText());
} else {
System.out.println("[process " + threadName
+ "] Result: " + message.getSentence().getText());
}
}
}
@Override
public void onComplete() {
System.out.println("[" + threadName + "] Recognition complete");
}
@Override
public void onError(Exception e) {
System.out.println("[" + threadName
+ "] RecognitionCallback error: " + e.getMessage());
hasError[0] = true;
}
};
// Please replace the path with your audio file path
System.out.println(
"[" + threadName + "] Input file_path is: " + filePath);
FileInputStream fis = null;
// Read file and send audio by chunks
try {
fis = new FileInputStream(filePath.toFile());
} catch (Exception e) {
System.out.println("Error when loading file: " + filePath);
e.printStackTrace();
}
// set param & callback
recognizer.call(param, callback);
// chunk size set to 100 ms for 16KHz sample rate
byte[] buffer = new byte[3200];
int bytesRead;
// Loop to read chunks of the file
while ((bytesRead = fis.read(buffer)) != -1) {
ByteBuffer byteBuffer;
if (bytesRead < buffer.length) {
byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
} else {
byteBuffer = ByteBuffer.wrap(buffer);
}
// Send the ByteBuffer to the recognition instance
recognizer.sendAudioFrame(byteBuffer);
Thread.sleep(100);
buffer = new byte[3200];
}
System.out.println(
"[" + threadName + "] send audio done");
recognizer.stop();
System.out.println(
"[" + threadName + "] asr task finished");
} catch (Exception e) {
e.printStackTrace();
hasError[0] = true;
}
if (recognizer != null) {
try {
if (hasError[0] == true) {
// Invalid the recognition object error.
recognizer.getDuplexApi().close(1000, "bye");
RecognitionObjectPool.getInstance().invalidateObject(recognizer);
} else {
// Return the recognition object to the pool if no error or exception.
RecognitionObjectPool.getInstance().returnObject(recognizer);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Override
public void run() {
runCallback();
}
}推荐配置
以下配置基于在指定规格的阿里云服务器上仅运行 Paraformer 实时语音识别服务的测试结果。过高的并发数可能导致任务处理延迟。
其中单机并发数指的是同一时刻正在运行的Paraformer实时语音识别任务数,也可以理解为工作线程数。
机器配置(阿里云) | 单机最大并发数 | 对象池大小 | 连接池大小 |
4核8GiB | 100 | 500 | 2000 |
8核16GiB | 200 | 500 | 2000 |
16核32GiB | 400 | 500 | 2000 |
资源管理与异常处理
任务成功:当语音识别任务正常完成时,必须调用GenericObjectPool的returnObject方法将Recognition对象归还到池中,以便复用。
在当前代码中,对应
RecognitionObjectPool.getInstance().returnObject(recognizer)重要不要归还未完成任务或任务失败的Recognition对象。
任务失败:当 SDK 内部或业务逻辑抛出异常导致任务中断时,必须执行以下两个操作:
主动关闭底层的 WebSocket 连接
从对象池中废弃该对象,防止被再次使用
// 在当前代码中对应如下内容 // 关闭连接 recognizer.getDuplexApi().close(1000, "bye"); // 在对象池中废弃出现异常的recognizer RecognitionObjectPool.getInstance().invalidateObject(recognizer);在服务出现TaskFailed报错时,不需要额外处理。
调用预热与耗时统计说明
在对 DashScope Java SDK 进行并发调用延迟等性能评估时,建议在正式测试前执行充分的预热操作。预热能够确保测量结果准确反映服务在稳定状态下的真实性能,避免因初始连接耗时导致的数据偏差。
连接复用机制
DashScope Java SDK 通过全局单例的连接池高效管理和复用 WebSocket 连接,旨在减少频繁建连和断连的开销,提升高并发场景下的处理能力。
该机制的工作特点如下:
按需创建:SDK 不会在服务启动时预创建 WebSocket 连接,而是在首次调用时按需建立。
限时复用:请求完成后,连接将在池中保留最多 60 秒以备复用。
若 60 秒内有新请求,将复用现有连接,避免重复握手开销。
若连接空闲超过 60 秒,将被自动关闭以释放资源。
预热的重要性
在以下场景中,连接池中可能没有可复用的活跃连接,导致请求需要新建连接:
应用刚启动,尚未发起任何调用。
服务空闲时间超过 60 秒,池中连接已因超时而关闭。
在这些场景下,首次或初期请求会触发完整的 WebSocket 建连过程(包括 TCP 握手、TLS 加密协商和协议升级),其端到端延迟会显著高于后续复用连接的请求。这部分额外耗时源于网络连接初始化,并非服务本身的处理延迟。因此,若未进行预热,性能测试结果会因包含初始建连时间而产生偏差。
推荐做法
为获取可靠的性能数据,在正式进行性能压测或延迟统计前,请遵循以下预热步骤:
模拟正式测试的并发级别,提前发起一定数量的调用(例如,持续 1-2 分钟),以充分填充连接池。
确认连接池已建立并维持足够的活跃连接后,再开始正式的性能数据采集。
通过合理的预热,可使 SDK 连接池进入稳定复用状态,从而测量出更具代表性的延迟指标,真实反映服务在线上平稳运行时的性能。