本文介绍在高并发场景下,如何通过DashScope Java SDK,高效调用CosyVoice语音合成大模型服务。
CosyVoice大模型的服务协议为WebSocket,在高并发场景下,如果不断地创建WebSocket连接会增加连接耗时且会有较大资源消耗。您在通过DashScope Java SDK调用CosyVoice语音大模型服务时,可以使用连接池和对象池这两种资源池,降低高并发场景下的程序开销。
您在了解了资源池后,可参照文末的推荐配置,结合示例代码在您自己的服务器上进行试验。
SDK 2.16.6及后续版本针对高并发场景进行了优化,SDK 2.16.6之前的版本不推荐在高并发场景下使用。
前提条件
已开通服务并获取API Key。请配置API Key到环境变量,而非硬编码在代码中,防范因代码泄露导致的安全风险。
连接池
DashScope Java SDK使用了OkHttp3提供的连接池复用WebSocket连接,降低不断创建WebSocket连接的耗时和资源开销。
连接池为SDK默认开启的优化项,您需要根据使用场景配置连接池的大小。
请您在运行Java服务前通过环境变量的方式在机器中提前按需配置好连接池的相关参数。连接池配置参数如下:
DASHSCOPE_CONNECTION_POOL_SIZE  | 配置连接池大小。 推荐配置为您的峰值并发数的2倍以上。默认值为32。 说明  对象池大小需要小于等于连接池大小,不然会出现对象等待连接的情况造成调用阻塞。  | 
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS  | 配置最大异步请求数。 推荐配置为和连接池大小一致。默认值为32。 更多信息参见参考文档。  | 
DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST  | 配置单host最大异步请求数。 推荐配置为和连接池大小一致。默认值为32。 更多信息参见参考文档。  | 
对象池
推荐使用对象池的方式复用SpeechSynthesizer对象,可以进一步降低反复创建、销毁对象带来的内存和时间开销。
请您在运行Java服务前通过环境变量的方式在机器中提前按需配置好对象池的大小。对象池配置参数如下:
COSYVOICE_OBJECTPOOL_SIZE  | 对象池大小。默认值500。 推荐配置为您的峰值并发数的1.5~2倍。  | 
示例代码
下面为使用资源池的示例代码。其中,对象池为全局单例对象。
每个主账号默认每秒可提交3个CosyVoice实时语音合成任务。
package org.alibaba.bailian.example.examples;
import com.alibaba.dashscope.audio.tts.SpeechSynthesisResult;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisAudioFormat;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesisParam;
import com.alibaba.dashscope.audio.ttsv2.SpeechSynthesizer;
import com.alibaba.dashscope.common.ResultCallback;
import com.alibaba.dashscope.exception.NoApiKeyException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/**
 * 您需要在项目中引入org.apache.commons.pool2和DashScope相关的包。
 *
 * DashScope SDK 2.16.6及后续版本针对高并发场景进行了优化,
 * DashScope SDK 2.16.6之前的版本不推荐在高并发场景下使用。
 *
 *
 * 在对TTS服务进行高并发调用之前,
 * 请通过以下环境变量配置连接池的相关参数。
 *
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS
 * DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST
 * DASHSCOPE_CONNECTION_POOL_SIZE
 *
 */
class SpeechSynthesizerObjectFactory
        extends BasePooledObjectFactory<SpeechSynthesizer> {
    public SpeechSynthesizerObjectFactory() {
        super();
    }
    @Override
    public SpeechSynthesizer create() throws Exception {
        return new SpeechSynthesizer();
    }
    @Override
    public PooledObject<SpeechSynthesizer> wrap(SpeechSynthesizer obj) {
        return new DefaultPooledObject<>(obj);
    }
}
class CosyvoiceObjectPool {
    public static GenericObjectPool<SpeechSynthesizer> synthesizerPool;
    public static String COSYVOICE_OBJECTPOOL_SIZE_ENV = "COSYVOICE_OBJECTPOOL_SIZE";
    public static int DEFAULT_OBJECT_POOL_SIZE = 500;
    private static Lock lock = new java.util.concurrent.locks.ReentrantLock();
    public static int getObjectivePoolSize() {
        try {
            Integer n = Integer.parseInt(System.getenv(COSYVOICE_OBJECTPOOL_SIZE_ENV));
            System.out.println("Using Object Pool Size In Env: "+ n);
            return n;
        } catch (NumberFormatException e) {
            System.out.println("Using Default Object Pool Size: "+ DEFAULT_OBJECT_POOL_SIZE);
            return DEFAULT_OBJECT_POOL_SIZE;
        }
    }
    public static GenericObjectPool<SpeechSynthesizer> getInstance() {
        lock.lock();
        if (synthesizerPool == null) {
            // 您可以在这里设置对象池的大小。或在环境变量COSYVOICE_OBJECTPOOL_SIZE中设置。
            // 建议设置为服务器最大并发连接数的1.5到2倍。
            int objectPoolSize = getObjectivePoolSize();
            SpeechSynthesizerObjectFactory speechSynthesizerObjectFactory =
                    new SpeechSynthesizerObjectFactory();
            GenericObjectPoolConfig<SpeechSynthesizer> config =
                    new GenericObjectPoolConfig<>();
            config.setMaxTotal(objectPoolSize);
            config.setMaxIdle(objectPoolSize);
            config.setMinIdle(objectPoolSize);
            synthesizerPool =
                    new GenericObjectPool<>(speechSynthesizerObjectFactory, config);
        }
        lock.unlock();
        return synthesizerPool;
    }
}
class SynthesizeTaskWithCallback implements Runnable {
    String[] textArray;
    String requestId;
    long timeCost;
    public SynthesizeTaskWithCallback(String[] textArray) {
        this.textArray = textArray;
    }
    @Override
    public void run() {
        SpeechSynthesizer synthesizer = null;
        long startTime = System.currentTimeMillis();
        // if recv onError
        final boolean[] hasError = {false};
        try {
            class ReactCallback extends ResultCallback<SpeechSynthesisResult> {
                ReactCallback() {}
                @Override
                public void onEvent(SpeechSynthesisResult message) {
                    if (message.getAudioFrame() != null) {
                        try {
                            byte[] bytesArray = message.getAudioFrame().array();
                            System.out.println("收到音频,音频文件流length为:" + bytesArray.length);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
                @Override
                public void onComplete() {}
                @Override
                public void onError(Exception e) {
                    System.out.println(e.getMessage());
                    e.printStackTrace();
                    hasError[0] = true;
                }
            }
            // 将your-dashscope-api-key替换成您自己的API-KEY
            String dashScopeApiKey = "your-dashscope-api-key";
            SpeechSynthesisParam param =
                    SpeechSynthesisParam.builder()
                            .model("cosyvoice-v1")
                            .voice("longxiaochun")
                            .format(SpeechSynthesisAudioFormat
                                    .MP3_22050HZ_MONO_256KBPS) // 流式合成使用PCM或者MP3
                            .apiKey(dashScopeApiKey)
                            .build();
            try {
                synthesizer = CosyvoiceObjectPool.getInstance().borrowObject();
                synthesizer.updateParamAndCallback(param, new ReactCallback());
                for (String text : textArray) {
                    synthesizer.streamingCall(text);
                }
                Thread.sleep(20);
                synthesizer.streamingComplete(60000);
                requestId = synthesizer.getLastRequestId();
            } catch (Exception e) {
                System.out.println("Exception e: " + e.toString());
                hasError[0] = true;
            }
        } catch (Exception e) {
            hasError[0] = true;
            throw new RuntimeException(e);
        }
        if (synthesizer != null) {
            try {
                if (hasError[0] == true) {
                    // 如果出现异常,则关闭连接并在对象池中禁用该对象。
                    synthesizer.getDuplexApi().close(1000, "bye");
                    CosyvoiceObjectPool.getInstance().invalidateObject(synthesizer);
                } else {
                    // 如果任务正常结束,则归还对象。
                    CosyvoiceObjectPool.getInstance().returnObject(synthesizer);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            long endTime = System.currentTimeMillis();
            timeCost = endTime - startTime;
            System.out.println("[线程 " + Thread.currentThread() + "] 语音合成任务结束。耗时 " + timeCost + " ms, RequestId " + requestId);
        }
    }
}
@Slf4j
public class SynthesizeTextToSpeechWithCallbackConcurrently {
    public static void checkoutEnv(String envName, int defaultSize) {
        if (System.getenv(envName) != null) {
            System.out.println("[ENV CHECK]: " + envName + " "
                    + System.getenv(envName));
        } else {
            System.out.println("[ENV CHECK]: " + envName
                    + " Using Default which is " + defaultSize);
        }
    }
    public static void main(String[] args)
            throws InterruptedException, NoApiKeyException {
        // Check for connection pool env
        checkoutEnv("DASHSCOPE_CONNECTION_POOL_SIZE", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS", 32);
        checkoutEnv("DASHSCOPE_MAXIMUM_ASYNC_REQUESTS_PER_HOST", 32);
        checkoutEnv(CosyvoiceObjectPool.COSYVOICE_OBJECTPOOL_SIZE_ENV, CosyvoiceObjectPool.DEFAULT_OBJECT_POOL_SIZE);
        int runTimes = 3;
        // Create the pool of SpeechSynthesis objects
        ExecutorService executorService = Executors.newFixedThreadPool(runTimes);
        for (int i = 0; i < runTimes; i++) {
            // Record the task submission time
            LocalDateTime submissionTime = LocalDateTime.now();
            executorService.submit(new SynthesizeTaskWithCallback(new String[] {
                    "床前明月光,", "疑似地上霜。", "举头望明月,", "低头思故乡。"}));
        }
        // Shut down the ExecutorService and wait for all tasks to complete
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
        System.exit(0);
    }
}推荐配置
建议您根据服务器的实际规格,参考如下推荐配置,在您的服务器上进行配置。请注意,过高的并发会导致语音合成任务变慢。
其中单机并发数指的是同一时刻正在运行的CosyVoice语音合成任务数,也可以理解为工作线程数。
推荐配置是在服务器上只运行CosyVoice语音合成服务的情况下进行测试后得到的。
常见机器配置(阿里云)  | 单机最大并发数  | 对象池大小  | 连接池大小  | 
4核8GiB  | 100  | 500  | 2000  | 
8核16GiB  | 150  | 500  | 2000  | 
16核32GiB  | 200  | 500  | 2000  | 
异常处理
在服务出现TaskFailed报错时,不需要额外处理。
在语音合成中途,客户端出现错误(SDK内部异常或业务逻辑异常)导致语音合成任务没有完成时,需要用户主动关闭连接,同时在对象池中禁用出现异常的
synthesizer。关闭连接方法如下:
synthesizer.getDuplexApi().close(1000, "bye"); // 在对象池中禁用出现异常的synthesizer CosyvoiceObjectPool.getInstance().invalidateObject(synthesizer);需要注意,任务结束后必须执行
invalidateObject和returnObject中的一个:invalidateObject方法:在出错时使用,禁用对象。returnObject方法:在未出错时使用,归还对象。