高并发场景下实时语音识别的性能优化

本文介绍在高并发场景下,如何通过DashScope Java SDK,高效调用Paraformer实时语音识别服务。

Paraformer实时语音识别内部通过WebSocket协议实现,在高并发场景下,WebSocket连接可能会被不断地创建,从而带来较大的资源消耗。

在使用DashScope Java SDK时,您可以根据服务器的实际情况,通过给连接池、对象池设定合理的大小来降低运行开销。

前提条件

已开通服务并获取API Key

推荐配置

连接池和对象池不是越多越好,过少或过多都会导致程序运行变慢。建议您根据自己服务器的实际规格进行配置。

我们在服务器上只运行Paraformer实时语音识别服务的情况下,进行测试后得到了如下推荐配置供您参考:

常见机器配置(阿里云)

单机最大并发数

对象池大小

连接池大小

48GiB

100

500

2000

816GiB

200

500

2000

1632GiB

400

500

2000

其中单机并发数指的是同一时刻正在运行的Paraformer实时语音识别任务数,也可以理解为工作线程数。

可配置参数

连接池

DashScope Java SDK使用了OkHttp3提供的连接池复用WebSocket连接,降低不断创建WebSocket连接的耗时和资源开销。

连接池为SDK默认开启的优化项,您需要根据使用场景配置连接池的大小。

请您在运行Java服务前通过环境变量的方式在机器中提前按需配置好连接池的相关参数。连接池配置参数如下:

DASHSCOPE_CONNECTION_POOL_SIZE

配置连接池大小。

推荐配置为您的峰值并发数的2倍以上。默认值为32。

DASHSCOPE_MAXIMUM_ASYNC_REQUESTS

配置最大异步请求数。

推荐配置为和连接池大小一致。默认值为32。

更多信息参见参考文档

DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST

配置单host最大异步请求数。

推荐配置为和连接池大小一致。默认值为32。

更多信息参见参考文档

对象池

推荐使用对象池的方式复用Recognition对象,可以进一步降低反复创建、销毁对象带来的内存和时间开销。

请您在运行Java服务前通过环境变量或代码的方式在机器中提前按需配置好对象池的大小。对象池配置参数如下:

RECOGNITION_OBJECTPOOL_SIZE

对象池大小。

推荐配置为您的峰值并发数的1.5~2倍。

对象池大小需要小于等于连接池大小,不然会出现对象等待连接的情况造成调用阻塞。

关于如何配置环境变量,可参考配置API Key到环境变量

示例代码

下面为使用资源池的示例代码。其中,对象池为全局单例对象。

  • 每个主账号默认每秒可提交20Paraformer实时语音识别任务。

    如需开通更高QPS联系我们

  • 在运行示例前,请提前下载好asr_example.wav示例音频,或替换为本地音频文件。

    asr_example.wav

  • 您需要在项目中引入DashScopeorg.apache.commons.pool2相关的包,DashScope要求版本号>=2.16.9。

    MavenGradle为例,配置如下:

    Maven

    1. 打开您的Maven项目的pom.xml文件。

    2. <dependencies>标签内添加以下依赖信息。

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>dashscope-sdk-java</artifactId>
        <!-- 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java -->
        <version>the-latest-version</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <!-- 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <version>the-latest-version</version>
    </dependency>
    1. 保存pom.xml文件。

    2. 使用Maven命令(如mvn clean installmvn compile)来更新项目依赖

    Gradle

    1. 打开您的Gradle项目的build.gradle文件。

    2. dependencies块内添加以下依赖信息。

      dependencies {
          // 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java
          implementation group: 'com.alibaba', name: 'dashscope-sdk-java', version: 'the-latest-version'
          
          // 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
          implementation group: 'org.apache.commons', name: 'commons-pool2', version: 'the-latest-version'
      }
    3. 保存build.gradle文件。

    4. 在命令行中,切换到您的项目根目录,执行以下Gradle命令来更新项目依赖。

      ./gradlew build --refresh-dependencies

      或者,如果您使用的是Windows系统,命令应为:

      gradlew build --refresh-dependencies
import com.alibaba.dashscope.audio.asr.recognition.Recognition;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionParam;
import com.alibaba.dashscope.audio.asr.recognition.RecognitionResult;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.alibaba.dashscope.utils.ApiKey;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * Before making high-concurrency calls to the ASR service,
 * please configure the connection pool size through following environment
 * variables.
 *
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS=2000
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST=2000
 * DASHSCOPE_CONNECTION_POOL_SIZE=2000
 *
 * The default is 32, and it is recommended to set it to 2 times the maximum
 * concurrent connections of a single server.
 */
public class RecognitionWithRealtimeApiConcurrently {
    public static void checkoutEnv(String envName, int defaultSize) {
        if (System.getenv(envName) != null) {
            System.out.println("[ENV CHECK]: " + envName + " "
                    + System.getenv(envName));
        } else {
            System.out.println("[ENV CHECK]: " + envName
                    + " Using Default which is " + defaultSize);
        }
    }

    public static void main(String[] args)
            throws NoApiKeyException, InterruptedException {
        // Check for connection pool env
        checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
        checkoutEnv(RecognitionObjectPool.RECOGNITION_OBJECTPOOL_SIZE_ENV, RecognitionObjectPool.DEFAULT_OBJECT_POOL_SIZE);

        int threadNums = 3;
        String currentDir = System.getProperty("user.dir");
        // Please replace the path with your audio source
        Path[] filePaths = {
                Paths.get(currentDir, "asr_example.wav"),
                Paths.get(currentDir, "asr_example.wav"),
                Paths.get(currentDir, "asr_example.wav"),
        };
        // Use ThreadPool to run recognition tasks
        ExecutorService executorService = Executors.newFixedThreadPool(threadNums);
        for (int i = 0; i < threadNums; i++) {
            executorService.submit(new RealtimeRecognizeTask(filePaths));
        }
        executorService.shutdown();
        // wait for all tasks to complete
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.exit(0);
    }
}

class RecognitionObjectFactory extends BasePooledObjectFactory<Recognition> {
    public RecognitionObjectFactory() {
        super();
    }

    @Override
    public Recognition create() throws Exception {
        return new Recognition();
    }

    @Override
    public PooledObject<Recognition> wrap(Recognition obj) {
        return new DefaultPooledObject<>(obj);
    }
}

class RecognitionObjectPool {
    public static GenericObjectPool<Recognition> recognitionGenericObjectPool;
    public static String RECOGNITION_OBJECTPOOL_SIZE_ENV =
            "RECOGNITION_OBJECTPOOL_SIZE";
    public static int DEFAULT_OBJECT_POOL_SIZE = 500;
    private static Lock lock = new java.util.concurrent.locks.ReentrantLock();

    public static int getObjectivePoolSize() {
        try {
            Integer n = Integer.parseInt(System.getenv(RECOGNITION_OBJECTPOOL_SIZE_ENV));
            return n;
        } catch (NumberFormatException e) {
            return DEFAULT_OBJECT_POOL_SIZE;
        }
    }

    public static GenericObjectPool<Recognition> getInstance() {
        lock.lock();
        if (recognitionGenericObjectPool == null) {
            // You can set the object pool size here. or in environment variable
            // RECOGNITION_OBJECTPOOL_SIZE It is recommended to set it to 1.5 to 2
            // times your server's maximum concurrent connections.
            int objectPoolSize = getObjectivePoolSize();
            System.out.println("RECOGNITION_OBJECTPOOL_SIZE: "
                    + objectPoolSize);
            RecognitionObjectFactory recognitionObjectFactory =
                    new RecognitionObjectFactory();
            GenericObjectPoolConfig<Recognition> config =
                    new GenericObjectPoolConfig<>();
            config.setMaxTotal(objectPoolSize);
            config.setMaxIdle(objectPoolSize);
            config.setMinIdle(objectPoolSize);
            recognitionGenericObjectPool =
                    new GenericObjectPool<>(recognitionObjectFactory, config);
        }
        lock.unlock();
        return recognitionGenericObjectPool;
    }
}

class RealtimeRecognizeTask implements Runnable {
    private static final Object lock = new Object();
    private Path[] filePaths;

    public RealtimeRecognizeTask(Path[] filePaths) {
        this.filePaths = filePaths;
    }

    /**
     * Set your DashScope API key. More information: <a
     * href="https://help.aliyun.com/document_detail/2712195.html">...</a> In
     * fact, if you have set DASHSCOPE_API_KEY in your environment variable, you
     * can ignore this, and the SDK will automatically get the api_key from the
     * environment variable
     * */
    private static String getDashScopeApiKey() throws NoApiKeyException {
        String dashScopeApiKey = null;
        try {
            ApiKey apiKey = new ApiKey();
            dashScopeApiKey =
                    apiKey.getApiKey(null); // Retrieve from environment variable.
        } catch (NoApiKeyException e) {
            System.out.println("No API key found in environment.");
        }
        if (dashScopeApiKey == null) {
            // If you cannot set api_key in your environment variable,
            // you can set it here by code
            dashScopeApiKey = "your-dashscope-apikey";
        }
        return dashScopeApiKey;
    }

    public void runCallback() {
        for (Path filePath : filePaths) {
            // Create recognition params
            // you can customize the recognition parameters, like model, format,
            // sample_rate for more information, please refer to
            // https://help.aliyun.com/document_detail/2712536.html
            RecognitionParam param = null;
            try {
                param =
                        RecognitionParam.builder()
                                .model("paraformer-realtime-v2")
                                .format(
                                        "pcm") // 'pcm'、'wav'、'opus'、'speex'、'aac'、'amr', you
                                // can check the supported formats in the document
                                .sampleRate(16000) // supported 8000、16000
                                .apiKey(getDashScopeApiKey()) // use getDashScopeApiKey to get
                                // api key.
                                .build();
            } catch (NoApiKeyException e) {
                throw new RuntimeException(e);
            }

            Recognition recognizer = null;

            try {
                recognizer = RecognitionObjectPool.getInstance().borrowObject();
                CountDownLatch latch = new CountDownLatch(1);

                String threadName = Thread.currentThread().getName();

                ResultCallback<RecognitionResult> callback =
                        new ResultCallback<RecognitionResult>() {
                            @Override
                            public void onEvent(RecognitionResult message) {
                                synchronized (lock) {
                                    if (message.isSentenceEnd()) {
                                        System.out.println("[process " + threadName
                                                + "] Fix:" + message.getSentence().getText());
                                    } else {
                                        System.out.println("[process " + threadName
                                                + "] Result: " + message.getSentence().getText());
                                    }
                                }
                            }

                            @Override
                            public void onComplete() {
                                System.out.println("[" + threadName + "] Recognition complete");
                                latch.countDown();
                            }

                            @Override
                            public void onError(Exception e) {
                                System.out.println("[" + threadName
                                        + "] RecognitionCallback error: " + e.getMessage());
                            }
                        };
                // set param & callback
                recognizer.call(param, callback);
                // Please replace the path with your audio file path
                System.out.println(
                        "[" + threadName + "] Input file_path is: " + filePath);
                // Read file and send audio by chunks
                try (FileInputStream fis = new FileInputStream(filePath.toFile())) {
                    // chunk size set to 100 ms for 16KHz sample rate
                    byte[] buffer = new byte[3200];
                    int bytesRead;
                    // Loop to read chunks of the file
                    while ((bytesRead = fis.read(buffer)) != -1) {
                        ByteBuffer byteBuffer;
                        if (bytesRead < buffer.length) {
                            byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
                        } else {
                            byteBuffer = ByteBuffer.wrap(buffer);
                        }
                        // Send the ByteBuffer to the recognition instance
                        recognizer.sendAudioFrame(byteBuffer);
                        Thread.sleep(100);
                        buffer = new byte[3200];
                    }
                    System.out.println(LocalDateTime.now());
                } catch (Exception e) {
                    e.printStackTrace();
                    recognizer.getDuplexApi().close(1000, "bye");
                }

                recognizer.stop();
                // wait for the recognition to complete
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } catch (Exception e) {
                e.printStackTrace();
                recognizer.getDuplexApi().close(1000, "bye");
            } finally {
                if (recognizer != null) {
                    try {
                        // Return the recognition object to the pool
                        RecognitionObjectPool.getInstance().returnObject(recognizer);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    @Override
    public void run() {
        runCallback();
    }
}

异常处理

  • 在服务出现TaskFailed报错时,不需要额外处理。

  • 在语音识别中途,客户端出现错误(SDK内部异常或业务逻辑异常)导致语音识别任务没有完成时,需要您主动关闭连接。

    关闭连接方法如下:

    // 将下面这段代码放在try-catch块中
    recognizer.getDuplexApi().close(1000, "bye");

常见异常

异常 1、 业务流量平稳,但是服务器 TCP 连接数持续上升

出错原因:

每一个 SDK 对象创建时都会申请一个连接。如果没有使用对象池,每一次任务结束后对象都被析构。此时这一个连接将进入无引用状态,需要等待 61s 秒后服务端报错连接超时才会真正断开,这会导致这个连接在 61 秒内不可复用。

在高并发场景下,新的任务在发现没有可复用连接时会创建新连接,会造成如下后果:

  1. 连接数持续上升。

  2. 由于连接数过多,服务器资源不足,服务器卡顿。

  3. 连接池被打满、新任务由于启动时需要等待可用连接而阻塞。

解决方法

常见的原因是没有使用对象池复用对象。可以通过使用对象池解决。

异常 2、任务耗时比正常调用多 60 秒

同“异常 1”,连接池已经达到最大连接限制,新的任务需要等待无引用状态的连接 61 秒触发超时后才可以获得连接。

异常 3、服务启动时任务慢,之后慢慢恢复正常

出错原因

在高并发调用时,同一个对象会复用同一个WebSocket连接,因此WebSocket连接只会在服务启动时创建。需要注意的是,任务启动阶段如果立刻开始较高并发调用,同时创建过多的WebSocket连接会导致阻塞。

解决方法

启动服务后逐步提升并发量,或增加预热任务。

异常 4、服务端报错 Invalid action('run-task')! Please follow the protocol!

出错原因

这是由于出现了客户端报错后,服务端不知道客户端出错,连接处于任务中状态。此时连接和对象被复用并开启下一个任务,导致流程错误,下一个任务失败。

解决方法

在抛出异常后主动关闭 WebSocket 连接后归还对象池。

异常 5、业务流量平稳,调用量出现异常尖刺

出错原因

同时创建过多 WebSocket 连接导致阻塞,但业务流量持续打进来,导致任务短时间积压,并且在阻塞后所有积压任务立刻调用。这会造成调用量尖刺,并且有可能造成瞬时超过账号的并发数限制导致部分任务失败、服务器卡顿等。

这种瞬间创建过多 WebSocket 的情况多发生于:

  • 服务启动阶段

  • 网络出现异常,大量 WebSocket 连接同时中断重连

  • 某一时刻出现大量服务端报错,导致大量 WebSocket 重连。常见报错如并发数超过账号限制(“Requests rate limit exceeded, please try again later.”)。

解决方法

  1. 检查网络情况。

  2. 排查尖刺前是否出现大量其他服务端报错。

  3. 提高账号并发限制。

  4. 调小对象池和连接池大小,通过对象池上限限制最大并发数。

  5. 提升服务器配置或扩充机器数。

异常 6、随着并发数提升,所有任务都变慢

解决方法

  1. 检查是否已经达到网络带宽上限。

  2. 检查实际并发数是否已经过高。