本文介绍在高并发场景下,如何通过DashScope Java SDK,高效调用Paraformer实时语音识别服务。
Paraformer实时语音识别内部通过WebSocket协议实现,在高并发场景下,WebSocket连接可能会被不断地创建,从而带来较大的资源消耗。
在使用DashScope Java SDK时,您可以根据服务器的实际情况,通过给连接池、对象池设定合理的大小来降低运行开销。
前提条件
推荐配置
连接池和对象池不是越多越好,过少或过多都会导致程序运行变慢。建议您根据自己服务器的实际规格进行配置。
我们在服务器上只运行Paraformer实时语音识别服务的情况下,进行测试后得到了如下推荐配置供您参考:
常见机器配置(阿里云) | 单机最大并发数 | 对象池大小 | 连接池大小 |
4核8GiB | 100 | 500 | 2000 |
8核16GiB | 200 | 500 | 2000 |
16核32GiB | 400 | 500 | 2000 |
其中单机并发数指的是同一时刻正在运行的Paraformer实时语音识别任务数,也可以理解为工作线程数。
可配置参数
连接池
DashScope Java SDK使用了OkHttp3提供的连接池复用WebSocket连接,降低不断创建WebSocket连接的耗时和资源开销。
连接池为SDK默认开启的优化项,您需要根据使用场景配置连接池的大小。
请您在运行Java服务前通过环境变量的方式在机器中提前按需配置好连接池的相关参数。连接池配置参数如下:
DASHSCOPE_CONNECTION_POOL_SIZE | 配置连接池大小。 推荐配置为您的峰值并发数的2倍以上。默认值为32。 |
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS | 配置最大异步请求数。 推荐配置为和连接池大小一致。默认值为32。 更多信息参见参考文档。 |
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST | 配置单host最大异步请求数。 推荐配置为和连接池大小一致。默认值为32。 更多信息参见参考文档。 |
对象池
推荐使用对象池的方式复用Recognition
对象,可以进一步降低反复创建、销毁对象带来的内存和时间开销。
请您在运行Java服务前通过环境变量或代码的方式在机器中提前按需配置好对象池的大小。对象池配置参数如下:
RECOGNITION_OBJECTPOOL_SIZE | 对象池大小。 推荐配置为您的峰值并发数的1.5~2倍。 对象池大小需要小于等于连接池大小,不然会出现对象等待连接的情况造成调用阻塞。 |
关于如何配置环境变量,可参考配置API Key到环境变量。
示例代码
下面为使用资源池的示例代码。其中,对象池为全局单例对象。
每个主账号默认每秒可提交20个Paraformer实时语音识别任务。
如需开通更高QPS请联系我们。
在运行示例前,请提前下载好asr_example.wav示例音频,或替换为本地音频文件。
import com.alibaba.dashscope.audio.asr.recognition.Recognition;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionParam;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionResult;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.ApiKey;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
* Before making high-concurrency calls to the ASR service,
* please configure the connection pool size through following environment
* variables.
*
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS=2000
* DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST=2000
* DASHSCOPE_CONNECTION_POOL_SIZE=2000
*
* The default is 32, and it is recommended to set it to 2 times the maximum
* concurrent connections of a single server.
*/
public class RecognitionWithRealtimeApiConcurrently {
public static void checkoutEnv(String envName, int defaultSize) {
if (System.getenv(envName) != null) {
System.out.println("[ENV CHECK]: " + envName + " "
+ System.getenv(envName));
} else {
System.out.println("[ENV CHECK]: " + envName
+ " Using Default which is " + defaultSize);
}
}
public static void main(String[] args)
throws NoApiKeyException, InterruptedException {
// Check for connection pool env
checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
checkoutEnv(RecognitionObjectPool.RECOGNITION_OBJECTPOOL_SIZE_ENV, RecognitionObjectPool.DEFAULT_OBJECT_POOL_SIZE);
int threadNums = 3;
String currentDir = System.getProperty("user.dir");
// Please replace the path with your audio source
Path[] filePaths = {
Paths.get(currentDir, "asr_example.wav"),
Paths.get(currentDir, "asr_example.wav"),
Paths.get(currentDir, "asr_example.wav"),
};
// Use ThreadPool to run recognition tasks
ExecutorService executorService = Executors.newFixedThreadPool(threadNums);
for (int i = 0; i < threadNums; i++) {
executorService.submit(new RealtimeRecognizeTask(filePaths));
}
executorService.shutdown();
// wait for all tasks to complete
executorService.awaitTermination(1, TimeUnit.MINUTES);
System.exit(0);
}
}
class RecognitionObjectFactory extends BasePooledObjectFactory<Recognition> {
public RecognitionObjectFactory() {
super();
}
@Override
public Recognition create() throws Exception {
return new Recognition();
}
@Override
public PooledObject<Recognition> wrap(Recognition obj) {
return new DefaultPooledObject<>(obj);
}
}
class RecognitionObjectPool {
public static GenericObjectPool<Recognition> recognitionGenericObjectPool;
public static String RECOGNITION_OBJECTPOOL_SIZE_ENV =
"RECOGNITION_OBJECTPOOL_SIZE";
public static int DEFAULT_OBJECT_POOL_SIZE = 500;
private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
public static int getObjectivePoolSize() {
try {
Integer n = Integer.parseInt(System.getenv(RECOGNITION_OBJECTPOOL_SIZE_ENV));
return n;
} catch (NumberFormatException e) {
return DEFAULT_OBJECT_POOL_SIZE;
}
}
public static GenericObjectPool<Recognition> getInstance() {
lock.lock();
if (recognitionGenericObjectPool == null) {
// You can set the object pool size here. or in environment variable
// RECOGNITION_OBJECTPOOL_SIZE It is recommended to set it to 1.5 to 2
// times your server's maximum concurrent connections.
int objectPoolSize = getObjectivePoolSize();
System.out.println("RECOGNITION_OBJECTPOOL_SIZE: "
+ objectPoolSize);
RecognitionObjectFactory recognitionObjectFactory =
new RecognitionObjectFactory();
GenericObjectPoolConfig<Recognition> config =
new GenericObjectPoolConfig<>();
config.setMaxTotal(objectPoolSize);
config.setMaxIdle(objectPoolSize);
config.setMinIdle(objectPoolSize);
recognitionGenericObjectPool =
new GenericObjectPool<>(recognitionObjectFactory, config);
}
lock.unlock();
return recognitionGenericObjectPool;
}
}
class RealtimeRecognizeTask implements Runnable {
private static final Object lock = new Object();
private Path[] filePaths;
public RealtimeRecognizeTask(Path[] filePaths) {
this.filePaths = filePaths;
}
/**
* Set your DashScope API key. More information: <a
* href="https://help.aliyun.com/document_detail/2712195.html">...</a> In
* fact, if you have set DASHSCOPE_API_KEY in your environment variable, you
* can ignore this, and the SDK will automatically get the api_key from the
* environment variable
* */
private static String getDashScopeApiKey() throws NoApiKeyException {
String dashScopeApiKey = null;
try {
ApiKey apiKey = new ApiKey();
dashScopeApiKey =
apiKey.getApiKey(null); // Retrieve from environment variable.
} catch (NoApiKeyException e) {
System.out.println("No API key found in environment.");
}
if (dashScopeApiKey == null) {
// If you cannot set api_key in your environment variable,
// you can set it here by code
dashScopeApiKey = "your-dashscope-apikey";
}
return dashScopeApiKey;
}
public void runCallback() {
for (Path filePath : filePaths) {
// Create recognition params
// you can customize the recognition parameters, like model, format,
// sample_rate for more information, please refer to
// https://help.aliyun.com/document_detail/2712536.html
RecognitionParam param = null;
try {
param =
RecognitionParam.builder()
.model("paraformer-realtime-v2")
.format(
"pcm") // 'pcm'、'wav'、'opus'、'speex'、'aac'、'amr', you
// can check the supported formats in the document
.sampleRate(16000) // supported 8000、16000
.apiKey(getDashScopeApiKey()) // use getDashScopeApiKey to get
// api key.
.build();
} catch (NoApiKeyException e) {
throw new RuntimeException(e);
}
Recognition recognizer = null;
try {
recognizer = RecognitionObjectPool.getInstance().borrowObject();
CountDownLatch latch = new CountDownLatch(1);
String threadName = Thread.currentThread().getName();
ResultCallback<RecognitionResult> callback =
new ResultCallback<RecognitionResult>() {
@Override
public void onEvent(RecognitionResult message) {
synchronized (lock) {
if (message.isSentenceEnd()) {
System.out.println("[process " + threadName
+ "] Fix:" + message.getSentence().getText());
} else {
System.out.println("[process " + threadName
+ "] Result: " + message.getSentence().getText());
}
}
}
@Override
public void onComplete() {
System.out.println("[" + threadName + "] Recognition complete");
latch.countDown();
}
@Override
public void onError(Exception e) {
System.out.println("[" + threadName
+ "] RecognitionCallback error: " + e.getMessage());
}
};
// set param & callback
recognizer.call(param, callback);
// Please replace the path with your audio file path
System.out.println(
"[" + threadName + "] Input file_path is: " + filePath);
// Read file and send audio by chunks
try (FileInputStream fis = new FileInputStream(filePath.toFile())) {
// chunk size set to 100 ms for 16KHz sample rate
byte[] buffer = new byte[3200];
int bytesRead;
// Loop to read chunks of the file
while ((bytesRead = fis.read(buffer)) != -1) {
ByteBuffer byteBuffer;
if (bytesRead < buffer.length) {
byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
} else {
byteBuffer = ByteBuffer.wrap(buffer);
}
// Send the ByteBuffer to the recognition instance
recognizer.sendAudioFrame(byteBuffer);
Thread.sleep(100);
buffer = new byte[3200];
}
System.out.println(LocalDateTime.now());
} catch (Exception e) {
e.printStackTrace();
recognizer.getDuplexApi().close(1000, "bye");
}
recognizer.stop();
// wait for the recognition to complete
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} catch (Exception e) {
e.printStackTrace();
recognizer.getDuplexApi().close(1000, "bye");
} finally {
if (recognizer != null) {
try {
// Return the recognition object to the pool
RecognitionObjectPool.getInstance().returnObject(recognizer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
@Override
public void run() {
runCallback();
}
}
异常处理
在服务出现TaskFailed报错时,不需要额外处理。
在语音识别中途,客户端出现错误(SDK内部异常或业务逻辑异常)导致语音识别任务没有完成时,需要您主动关闭连接。
关闭连接方法如下:
// 将下面这段代码放在try-catch块中 recognizer.getDuplexApi().close(1000, "bye");