高并发场景

本文介绍在高并发场景下,如何通过DashScope Java SDK,高效调用CosyVoice语音合成大模型服务。

CosyVoice大模型的服务协议为WebSocket,在高并发场景下,如果不断地创建WebSocket连接会增加连接耗时且会有较大资源消耗。您在通过DashScope Java SDK调用CosyVoice语音大模型服务时,可以使用连接池对象池这两种资源池,降低高并发场景下的程序开销。

您在了解了资源池后,可参照文末的推荐配置,结合示例代码在您自己的服务器上进行试验。

说明

SDK 2.16.6及后续版本针对高并发场景进行了优化,SDK 2.16.6之前的版本不推荐在高并发场景下使用。

连接池

DashScope Java SDK使用了OkHttp3提供的连接池复用WebSocket连接,降低不断创建WebSocket连接的耗时和资源开销。

连接池为SDK默认开启的优化项,您需要根据使用场景配置连接池的大小。

请您在运行Java服务前通过环境变量的方式在机器中提前按需配置好连接池的相关参数。连接池配置参数如下:

DASHSCOPE_CONNECTION_POOL_SIZE

配置连接池大小。

推荐配置为您的峰值并发数的2倍以上。默认值为32。

说明

对象池大小需要小于等于连接池大小,不然会出现对象等待连接的情况造成调用阻塞。

DASHSCOPE_MAXIMUM_ASYNC_REQUESTS

配置最大异步请求数。

推荐配置为和连接池大小一致。默认值为32。

更多信息参见参考文档

DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST

配置单host最大异步请求数。

推荐配置为和连接池大小一致。默认值为32。

更多信息参见参考文档

对象池

推荐使用对象池的方式复用SpeechSynthesizer对象,可以进一步降低反复创建、销毁对象带来的内存和时间开销。

请您在运行Java服务前通过环境变量的方式在机器中提前按需配置好对象池的大小。对象池配置参数如下:

COSYVOICE_OBJECTPOOL_SIZE

对象池大小。

推荐配置为您的峰值并发数的1.5~2倍。

示例代码

下面为使用资源池的示例代码。其中,对象池为全局单例对象。

  • 每个主账号默认每秒可提交3CosyVoice实时语音合成任务。

    如需开通更高QPS联系我们

  • 您需要在项目中引入DashScopeorg.apache.commons.pool2相关的包。

    MavenGradle为例,配置如下:

    Maven

    1. 打开您的Maven项目的pom.xml文件。

    2. <dependencies>标签内添加以下依赖信息。

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>dashscope-sdk-java</artifactId>
        <!-- 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java -->
        <version>the-latest-version</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <!-- 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <version>the-latest-version</version>
    </dependency>
    1. 保存pom.xml文件。

    2. 使用Maven命令(如mvn clean installmvn compile)来更新项目依赖

    Gradle

    1. 打开您的Gradle项目的build.gradle文件。

    2. dependencies块内添加以下依赖信息。

      dependencies {
          // 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java
          implementation group: 'com.alibaba', name: 'dashscope-sdk-java', version: 'the-latest-version'
          
          // 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
          implementation group: 'org.apache.commons', name: 'commons-pool2', version: 'the-latest-version'
      }
    3. 保存build.gradle文件。

    4. 在命令行中,切换到您的项目根目录,执行以下Gradle命令来更新项目依赖。

      ./gradlew build --refresh-dependencies

      或者,如果您使用的是Windows系统,命令应为:

      gradlew build --refresh-dependencies
package org.example.ttsv2.sampleCodes;

import com.alibaba.dashscope.audio.tts.SpeechSynthesisResult;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisAudioFormat;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisParam;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesizer;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * 您需要在项目中引入org.apache.commons.pool2和DashScope相关的包。
 *
 * DashScope SDK 2.16.6及后续版本针对高并发场景进行了优化,
 * DashScope SDK 2.16.6之前的版本不推荐在高并发场景下使用。
 *
 *
 * 在对TTS服务进行高并发调用之前,
 * 请通过以下环境变量配置连接池的相关参数。
 *
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST
 * DASHSCOPE_CONNECTION_POOL_SIZE
 *
 */

class SpeechSynthesizerObjectFactory
        extends BasePooledObjectFactory<SpeechSynthesizer> {
    public SpeechSynthesizerObjectFactory() {
        super();
    }
    @Override
    public SpeechSynthesizer create() throws Exception {
        return new SpeechSynthesizer();
    }

    @Override
    public PooledObject<SpeechSynthesizer> wrap(SpeechSynthesizer obj) {
        return new DefaultPooledObject<>(obj);
    }
}

class CosyvoiceObjectPool {
    public static GenericObjectPool<SpeechSynthesizer> synthesizerPool;
    public static String COSYVOICE_OBJECTPOOL_SIZE_ENV = "COSYVOICE_OBJECTPOOL_SIZE";
    public static int DEFAULT_OBJECT_POOL_SIZE = 500;
    private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
    public static int getObjectivePoolSize() {
        try {
            Integer n = Integer.parseInt(System.getenv(COSYVOICE_OBJECTPOOL_SIZE_ENV));
            System.out.println("Using Object Pool Size In Env: "+ DEFAULT_OBJECT_POOL_SIZE);
            return n;
        } catch (NumberFormatException e) {
            System.out.println("Using Default Object Pool Size: "+ DEFAULT_OBJECT_POOL_SIZE);
            return DEFAULT_OBJECT_POOL_SIZE;
        }
    }
    public static GenericObjectPool<SpeechSynthesizer> getInstance() {
        lock.lock();
        if (synthesizerPool == null) {
            // 您可以在这里设置对象池的大小。或在环境变量COSYVOICE_OBJECTPOOL_SIZE中设置。
            // 建议设置为服务器最大并发连接数的1.5到2倍。
            int objectPoolSize = getObjectivePoolSize();
            SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
                    new SpeechSynthesizerObjectFactory();
            GenericObjectPoolConfig<SpeechSynthesizer> config =
                    new GenericObjectPoolConfig<>();
            config.setMaxTotal(objectPoolSize);
            config.setMaxIdle(objectPoolSize);
            config.setMinIdle(objectPoolSize);
            synthesizerPool =
                    new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
        }
        lock.unlock();
        return synthesizerPool;
    }
}

class SynthesizeTaskWithCallback implements Runnable {
    String[] textArray;
    String requestId;
    long timeCost;
    public SynthesizeTaskWithCallback(String[] textArray) {
        this.textArray = textArray;
    }
    @Override
    public void run() {
        SpeechSynthesizer synthesizer = null;
        long startTime = System.currentTimeMillis();

        try {
            class ReactCallback extends ResultCallback<SpeechSynthesisResult> {
                ReactCallback() {}

                @Override
                public void onEvent(SpeechSynthesisResult message) {
                    if (message.getAudioFrame() != null) {
                        try {
                            byte[] bytesArray = message.getAudioFrame().array();
                            System.out.println("收到音频,音频文件流length为:" + bytesArray.length);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }

                @Override
                public void onComplete() {}

                @Override
                public void onError(Exception e) {
                    System.out.println(e.getMessage());
                    e.printStackTrace();
                }
            }

            // 将your-dashscope-api-key替换成您自己的API-KEY
            String dashScopeApiKey = "your-dashscope-api-key";

            SpeechSynthesisParam param =
                    SpeechSynthesisParam.builder()
                            .model("cosyvoice-v1")
                            .voice("longxiaochun")
                            .format(SpeechSynthesisAudioFormat
                                    .MP3_22050HZ_MONO_256KBPS) // 流式合成使用PCM或者MP3
                            .apiKey(dashScopeApiKey)
                            .build();

            try {
                synthesizer = CosyvoiceObjectPool.getInstance().borrowObject();
                synthesizer.updateParamAndCallback(param, new ReactCallback());
                for (String text : textArray) {
                    synthesizer.streamingCall(text);
                }
                Thread.sleep(20);
                synthesizer.streamingComplete(60000);
                requestId = synthesizer.getLastRequestId();
            } catch (Exception e) {
                System.out.println("Exception e: " + e.toString());
                synthesizer.getDuplexApi().close(1000, "bye");
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            if (synthesizer != null) {
                try {
                    // Return the SpeechSynthesizer object to the pool
                    CosyvoiceObjectPool.getInstance().returnObject(synthesizer);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        long endTime = System.currentTimeMillis();
        timeCost = endTime - startTime;
        System.out.println("[线程 " + Thread.currentThread() + "] 语音合成任务结束。耗时 " + timeCost + " ms, RequestId " + requestId);
    }
}

@Slf4j
public class SynthesizeTextToSpeechWithCallbackConcurrently {
    public static void checkoutEnv(String envName, int defaultSize) {
        if (System.getenv(envName) != null) {
            System.out.println("[ENV CHECK]: " + envName + " "
                    + System.getenv(envName));
        } else {
            System.out.println("[ENV CHECK]: " + envName
                    + " Using Default which is " + defaultSize);
        }
    }

    public static void main(String[] args)
            throws InterruptedException, NoApiKeyException {
        // Check for connection pool env
        checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
        checkoutEnv(CosyvoiceObjectPool.COSYVOICE_OBJECTPOOL_SIZE_ENV, CosyvoiceObjectPool.DEFAULT_OBJECT_POOL_SIZE);

        int runTimes = 3;
        // Create the pool of SpeechSynthesis objects
        ExecutorService executorService = Executors.newFixedThreadPool(runTimes);

        for (int i = 0; i < runTimes; i++) {
            // Record the task submission time
            LocalDateTime submissionTime = LocalDateTime.now();
            executorService.submit(new SynthesizeTaskWithCallback(new String[] {
                    "床前明月光,", "疑似地上霜。", "举头望明月,", "低头思故乡。"}));
        }

        // Shut down the ExecutorService and wait for all tasks to complete
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.exit(0);
    }
}

推荐配置

建议您根据服务器的实际规格,参考如下推荐配置,在您的服务器上进行配置。请注意,过高的并发会导致语音合成任务变慢。

其中单机并发数指的是同一时刻正在运行的CosyVoice语音合成任务数,也可以理解为工作线程数。

说明

推荐配置是在服务器上只运行CosyVoice语音合成服务的情况下进行测试后得到的。

常见机器配置(阿里云)

单机最大并发数

对象池大小

连接池大小

48GiB

100

500

2000

816GiB

150

500

2000

1632GiB

200

500

2000

异常处理

  • 在服务出现TaskFailed报错时,不需要额外处理。

  • 在语音合成中途,客户端出现错误(SDK内部异常或业务逻辑异常)导致语音合成任务没有完成时,需要用户主动关闭连接。

    关闭连接方法如下:

    synthesizer.getDuplexApi().close(1000, "bye");

常见异常

异常 1、 业务流量平稳,但是服务器 TCP 连接数持续上升

出错原因:

每一个 SDK 对象创建时都会申请一个连接。如果没有使用对象池,每一次任务结束后对象都被析构。此时这一个连接将进入无引用状态,需要等待 61s 秒后服务端报错连接超时才会真正断开,这会导致这个连接在 61 秒内不可复用。

在高并发场景下,新的任务在发现没有可复用连接时会创建新连接,会造成如下后果:

  1. 连接数持续上升。

  2. 由于连接数过多,服务器资源不足,服务器卡顿。

  3. 连接池被打满、新任务由于启动时需要等待可用连接而阻塞。

解决方法

常见的原因是没有使用对象池复用对象。可以通过使用对象池解决。

异常 2、任务耗时比正常调用多 60 秒

同“异常 1”,连接池已经达到最大连接限制,新的任务需要等待无引用状态的连接 61 秒触发超时后才可以获得连接。

异常 3、服务启动时任务慢,之后慢慢恢复正常

出错原因

在高并发调用时,同一个对象会复用同一个WebSocket连接,因此WebSocket连接只会在服务启动时创建。需要注意的是,任务启动阶段如果立刻开始较高并发调用,同时创建过多的WebSocket连接会导致阻塞。

解决方法

启动服务后逐步提升并发量,或增加预热任务。

异常 4、服务端报错 Invalid action('run-task')! Please follow the protocol!

出错原因

这是由于出现了客户端报错后,服务端不知道客户端出错,连接处于任务中状态。此时连接和对象被复用并开启下一个任务,导致流程错误,下一个任务失败。

解决方法

在抛出异常后主动关闭 WebSocket 连接后归还对象池。

异常 5、业务流量平稳,调用量出现异常尖刺

出错原因

同时创建过多 WebSocket 连接导致阻塞,但业务流量持续打进来,导致任务短时间积压,并且在阻塞后所有积压任务立刻调用。这会造成调用量尖刺,并且有可能造成瞬时超过账号的并发数限制导致部分任务失败、服务器卡顿等。

这种瞬间创建过多 WebSocket 的情况多发生于:

  • 服务启动阶段

  • 网络出现异常,大量 WebSocket 连接同时中断重连

  • 某一时刻出现大量服务端报错,导致大量 WebSocket 重连。常见报错如并发数超过账号限制(“Requests rate limit exceeded, please try again later.”)。

解决方法

  1. 检查网络情况。

  2. 排查尖刺前是否出现大量其他服务端报错。

  3. 提高账号并发限制。

  4. 调小对象池和连接池大小,通过对象池上限限制最大并发数。

  5. 提升服务器配置或扩充机器数。

异常 6、随着并发数提升,所有任务都变慢

解决方法

  1. 检查是否已经达到网络带宽上限。

  2. 检查实际并发数是否已经过高。