高并发场景调用DashScope Java SDK最佳实践

本文介绍在高并发场景下,如何通过DashScope Java SDK高效调用Sambert语音合成服务。

Sambert语音合成服务使用WebSocket协议,在高并发场景下,频繁创建WebSocket连接会增加连接耗时并消耗大量资源。

在使用DashScope Java SDK时,您可以根据服务器的实际情况,通过合理设置连接池和对象池的大小来降低运行开销。

前提条件

开通模型服务并获取API Key,建议您API Key配置到环境变量

推荐配置

连接池和对象池不是越多越好,过少或过多都会导致程序运行变慢。建议您根据自己服务器的实际规格进行配置。

在服务器上只运行Sambert语音合成服务的情况下,进行测试后得到了如下推荐配置供您参考:

常见机器配置(阿里云)

单机最大并发数

对象池大小

连接池大小

48GiB

600

1200

2000

单机并发数指的是同一时刻正在运行的Sambert语音合成任务的数量,也可以理解为工作线程数。
重要

在高并发调用时,同一个对象会复用同一个WebSocket连接,因此WebSocket连接只会在服务启动时创建。

需要注意的是,同时创建过多的WebSocket连接会导致阻塞,因此在实际启动服务时应逐步提高单机并发数。

可配置参数

连接池

DashScope Java SDK使用了OkHttp3提供的连接池来复用WebSocket连接,从而减少频繁创建WebSocket连接的耗时和资源开销。

连接池是DashScope SDK默认开启的优化项,您需要根据使用场景配置连接池的大小。

请在运行Java服务前,通过环境变量的方式提前按需配置好连接池的相关参数。连接池配置参数如下:

DASHSCOPE_CONNECTION_POOL_SIZE

配置连接池大小。默认值为32。

推荐配置为您的峰值并发数的2倍以上。

DASHSCOPE_MAXIMUM_ASYNC_REQUESTS

配置最大异步请求数。默认值为32。

推荐配置为和连接池大小一致。

更多信息参见参考文档

DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST

配置单host最大异步请求数。默认值为32。

推荐配置为和连接池大小一致。

更多信息参见参考文档

对象池

推荐使用对象池的方式来复用SpeechSynthesizer对象,这样可以进一步降低反复创建和销毁对象带来的内存和时间开销。

请在运行 Java 服务前,通过环境变量或代码的方式提前按需配置好对象池的大小。对象池配置参数如下:

SAMBERT_OBJECTPOOL_SIZE

对象池大小。

推荐配置为您的峰值并发数的1.5~2倍。

对象池大小需要小于或等于连接池大小,否则会出现对象等待连接的情况,导致调用阻塞。

关于如何配置环境变量,可参考配置API Key到环境变量

示例代码

以下为使用资源池的示例代码。其中,对象池为全局单例对象。

  • 每个主账号默认每秒可提交3Sambert语音合成任务。

    如需开通更高QPS联系我们

  • 您需要在项目中引入DashScopeorg.apache.commons.pool2相关的包,DashScope要求版本号>=2.16.9。

    MavenGradle为例,配置如下:

    Maven

    1. 打开您的Maven项目的pom.xml文件。

    2. <dependencies>标签内添加以下依赖信息。

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>dashscope-sdk-java</artifactId>
        <!-- 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java -->
        <version>the-latest-version</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <!-- 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
        <version>the-latest-version</version>
    </dependency>
    1. 保存pom.xml文件。

    2. 使用Maven命令(如mvn clean installmvn compile)来更新项目依赖

    Gradle

    1. 打开您的Gradle项目的build.gradle文件。

    2. dependencies块内添加以下依赖信息。

      dependencies {
          // 请将 'the-latest-version' 替换为2.16.9及以上版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java
          implementation group: 'com.alibaba', name: 'dashscope-sdk-java', version: 'the-latest-version'
          
          // 请将 'the-latest-version' 替换为最新版本,可在如下链接查询相关版本号:https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
          implementation group: 'org.apache.commons', name: 'commons-pool2', version: 'the-latest-version'
      }
    3. 保存build.gradle文件。

    4. 在命令行中,切换到您的项目根目录,执行以下Gradle命令来更新项目依赖。

      ./gradlew build --refresh-dependencies

      或者,如果您使用的是Windows系统,命令应为:

      gradlew build --refresh-dependencies
  • 示例代码中,不同的线程通过等待随机时间来避免同时创建过多的WebSocket连接。

import com.alibaba.dashscope.audio.tts.SpeechSynthesisAudioFormat;
import com.alibaba.dashscope.audio.tts.SpeechSynthesisParam;
import com.alibaba.dashscope.audio.tts.SpeechSynthesisResult;
import com.alibaba.dashscope.audio.tts.SpeechSynthesizer;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import java.time.LocalDateTime;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

/**
 * Before making high-concurrency calls to the TTS service,
 * please configure the connection pool size through following environment
 * variables.
 *
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS=2000
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST=2000
 * DASHSCOPE_CONNECTION_POOL_SIZE=2000
 *
 * The default is 32, and it is recommended to set it to 2 times the maximum
 * concurrent connections of a single server.
 */

@Slf4j
public class SynthesizeTextToSpeechUsingSambertConcurrently {
    public static void checkoutEnv(String envName, int defaultSize) {
        if (System.getenv(envName) != null) {
            System.out.println("[ENV CHECK]: " + envName + " "
                    + System.getenv(envName));
        } else {
            System.out.println("[ENV CHECK]: " + envName
                    + " Using Default which is " + defaultSize);
        }
    }

    public static void main(String[] args)
            throws InterruptedException, NoApiKeyException {

        // Check for connection pool env
        checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
        checkoutEnv(SambertObjectPool.SAMBERT_OBJECTPOOL_SIZE_ENV, SambertObjectPool.DEFAULT_CONNECTION_POOL_SIZE);
        checkoutEnv(SambertObjectPool.SAMBERT_OBJECTPOOL_SIZE_ENV, 500);

        // Record task start time
        int runTimes = 1;

        // Create the pool of SpeechSynthesis objects
        ExecutorService executorService = Executors.newFixedThreadPool(runTimes);

        for (int i = 0; i < runTimes; i++) {
            // Record the task submission time
            LocalDateTime submissionTime = LocalDateTime.now();
            executorService.submit(new SynthesizeTask(new String[]{
                    "床前明月光,",
                    "疑似地上霜。",
                    "举头望明月,",
                    "低头思故乡。"
            }));
        }

        // Shut down the ExecutorService and wait for all tasks to complete
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.exit(0);
    }
}

class SpeechSynthesizerObjectFactory
        extends BasePooledObjectFactory<SpeechSynthesizer> {
    public SpeechSynthesizerObjectFactory() {
        super();
    }
    @Override
    public SpeechSynthesizer create() throws Exception {
        return new SpeechSynthesizer();
    }

    @Override
    public PooledObject<SpeechSynthesizer> wrap(SpeechSynthesizer obj) {
        return new DefaultPooledObject<>(obj);
    }
}

class SambertObjectPool {
    public static GenericObjectPool<SpeechSynthesizer> synthesizerPool;
    public static String SAMBERT_OBJECTPOOL_SIZE_ENV = "SAMBERT_OBJECTPOOL_SIZE";
    public static int DEFAULT_CONNECTION_POOL_SIZE = 500;
    private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
    public static int getObjectivePoolSize() {
        try {
            Integer n = Integer.parseInt(System.getenv(SAMBERT_OBJECTPOOL_SIZE_ENV));
            return n;
        } catch (NumberFormatException e) {
            return DEFAULT_CONNECTION_POOL_SIZE;
        }
    }
    public static GenericObjectPool<SpeechSynthesizer> getInstance() {
        lock.lock();
        if (synthesizerPool == null) {
            // You can set the object pool size here. or in environment variable
            // SAMBERT_OBJECTPOOL_SIZE It is recommended to set it to 1.5 to 2 times
            // your server's maximum concurrent connections.
            int objectPoolSize = getObjectivePoolSize();
            SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
                    new SpeechSynthesizerObjectFactory();
            GenericObjectPoolConfig<SpeechSynthesizer> config =
                    new GenericObjectPoolConfig<>();
            config.setMaxTotal(objectPoolSize);
            config.setMinIdle(objectPoolSize);
            config.setMinIdle(objectPoolSize);
            synthesizerPool =
                    new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
        }
        lock.unlock();
        return synthesizerPool;
    }
}

class SynthesizeTask implements Runnable {
    String[] textList;
    String requestId;
    long timeCost;
    public SynthesizeTask(String[] textList) {
        this.textList = textList;
    }
    @Override
    public void run() {
        // sleep random time before start task, avoid creating too much websocket at the same time.
        Random random = new Random();
        try {
            Thread.sleep(random.nextInt(30*1000));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        for (String text:textList) {
            SpeechSynthesizer synthesizer = null;
            long startTime = System.currentTimeMillis();

            try {
                CountDownLatch latch = new CountDownLatch(1);
                class ReactCallback extends ResultCallback<SpeechSynthesisResult> {
                    ReactCallback() {}

                    @Override
                    public void onEvent(SpeechSynthesisResult message) {
                        if (message.getAudioFrame() != null) {
                            try {
                                byte[] bytesArray = message.getAudioFrame().array();
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }

                    @Override
                    public void onComplete() {
                        latch.countDown();
                    }

                    @Override
                    public void onError(Exception e) {
                        System.out.println(e.getMessage());
                        e.printStackTrace();
                        latch.countDown();
                    }
                }

                // you can set your dashscope apikey here by code or in environment
                // variable DASHSCOPE_API_KEY
                String dashScopeApiKey = "your-dashscope-apikey";

                SpeechSynthesisParam param =
                        SpeechSynthesisParam.builder()
                                .model("sambert-zhichu-v1")
                                .format(SpeechSynthesisAudioFormat.MP3) // 使用PCM或者MP3
                                .text(text)
                                .enablePhonemeTimestamp(true)
                                .enableWordTimestamp(true)
                                .apiKey(dashScopeApiKey)
                                .build();

                try {
                    synthesizer = SambertObjectPool.getInstance().borrowObject();
                    synthesizer.call(param, new ReactCallback());
                    try {
                        latch.await();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    requestId = synthesizer.getLastRequestId();
                } catch (Exception e) {
                    System.out.println("Exception e: " + e.toString());
                    synthesizer.getSyncApi().close(1000, "bye");
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (synthesizer != null) {
                    try {
                        // Return the SpeechSynthesizer object to the pool
                        SambertObjectPool.getInstance().returnObject(synthesizer);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            long endTime = System.currentTimeMillis();
            timeCost = endTime - startTime;
            System.out.println("[线程" + Thread.currentThread() + "] 语音合成任务:(" + text + ")结束。耗时" + timeCost + "ms, RequestId" + requestId);
        }
    }
}

异常处理

  • 在服务出现TaskFailed报错时,不需要额外处理。

  • 如果在语音合成中途,客户端出现错误(如SDK内部异常或业务逻辑异常)导致语音合成任务未完成,则需要您主动关闭连接。

    关闭连接方法如下:

    // 将下面这段代码放在try-catch块中
    synthesizer.getSyncApi().close(1000, "bye");

常见异常

异常 1、 业务流量平稳,但是服务器 TCP 连接数持续上升

出错原因:

每一个 SDK 对象创建时都会申请一个连接。如果没有使用对象池,每一次任务结束后对象都被析构。此时这一个连接将进入无引用状态,需要等待 61s 秒后服务端报错连接超时才会真正断开,这会导致这个连接在 61 秒内不可复用。

在高并发场景下,新的任务在发现没有可复用连接时会创建新连接,会造成如下后果:

  1. 连接数持续上升。

  2. 由于连接数过多,服务器资源不足,服务器卡顿。

  3. 连接池被打满、新任务由于启动时需要等待可用连接而阻塞。

解决方法

常见的原因是没有使用对象池复用对象。可以通过使用对象池解决。

异常 2、任务耗时比正常调用多 60 秒

同“异常 1”,连接池已经达到最大连接限制,新的任务需要等待无引用状态的连接 61 秒触发超时后才可以获得连接。

异常 3、服务启动时任务慢,之后慢慢恢复正常

出错原因

在高并发调用时,同一个对象会复用同一个WebSocket连接,因此WebSocket连接只会在服务启动时创建。需要注意的是,任务启动阶段如果立刻开始较高并发调用,同时创建过多的WebSocket连接会导致阻塞。

解决方法

启动服务后逐步提升并发量,或增加预热任务。

异常 4、服务端报错 Invalid action('run-task')! Please follow the protocol!

出错原因

这是由于出现了客户端报错后,服务端不知道客户端出错,连接处于任务中状态。此时连接和对象被复用并开启下一个任务,导致流程错误,下一个任务失败。

解决方法

在抛出异常后主动关闭 WebSocket 连接后归还对象池。

异常 5、业务流量平稳,调用量出现异常尖刺

出错原因

同时创建过多 WebSocket 连接导致阻塞,但业务流量持续打进来,导致任务短时间积压,并且在阻塞后所有积压任务立刻调用。这会造成调用量尖刺,并且有可能造成瞬时超过账号的并发数限制导致部分任务失败、服务器卡顿等。

这种瞬间创建过多 WebSocket 的情况多发生于:

  • 服务启动阶段

  • 网络出现异常,大量 WebSocket 连接同时中断重连

  • 某一时刻出现大量服务端报错,导致大量 WebSocket 重连。常见报错如并发数超过账号限制(“Requests rate limit exceeded, please try again later.”)。

解决方法

  1. 检查网络情况。

  2. 排查尖刺前是否出现大量其他服务端报错。

  3. 提高账号并发限制。

  4. 调小对象池和连接池大小,通过对象池上限限制最大并发数。

  5. 提升服务器配置或扩充机器数。

异常 6、随着并发数提升,所有任务都变慢

解决方法

  1. 检查是否已经达到网络带宽上限。

  2. 检查实际并发数是否已经过高。