本文介绍如何通过WebSocket连接访问CosyVoice语音合成服务。
DashScope SDK目前仅支持Java和Python。若想使用其他编程语言开发CosyVoice语音合成应用程序,可以通过WebSocket连接与服务进行通信。
WebSocket是一种支持全双工通信的网络协议。客户端和服务器通过一次握手建立持久连接,双方可以互相主动推送数据,因此在实时性和效率方面具有显著优势。
对于常用编程语言,有许多现成的WebSocket库和示例可供参考,例如:
Go:
gorilla/websocket
PHP:
Ratchet
Node.js:
ws
建议您先了解WebSocket的基本原理和技术细节,再参照本文进行开发。
前提条件
客户端与服务端的交互流程
按时间顺序,客户端与服务端的交互流程如下:
建立连接:客户端与服务端建立WebSocket连接。
开启任务:
客户端发送
run-task
指令以开启任务。客户端收到服务端返回的
task-started
事件,标志着任务已成功开启,可以进行后续步骤。
发送待合成文本:
客户端按顺序向服务端发送一个或多个包含待合成文本的
continue-task
指令,并同时接收服务端持续返回的音频流。
通知服务端结束任务:
客户端发送
finish-task
指令通知服务端结束任务,并继续接收服务端返回的音频流。
任务结束:
客户端收到服务端返回的
task-finished
事件,标志着任务结束。
关闭连接:客户端关闭WebSocket连接。
WebSocket客户端编程与消息处理
在编写WebSocket客户端代码时,为了同时发送和接收消息,通常采用异步编程。您可以按照以下步骤来编写程序:
建立WebSocket连接:首先,初始化并建立与服务器的WebSocket连接。
异步监听服务器消息:启动一个单独的线程(具体实现方式因编程语言而异)来监听服务器返回的消息,根据消息内容进行相应的操作。
发送消息:在不同于监听服务器消息的线程中(例如主线程,具体实现方式因编程语言而异),向服务器发送消息。
关闭连接:在程序结束前,确保关闭WebSocket连接以释放资源。
当然,编程思路不止这一种,您或许有更好的想法。本文主要介绍通过WebSocket连接访问服务时的鉴权细节及客户端与服务端之间的消息交互。由于篇幅有限,其他思路将不再赘述。
接下来将按照上述思路,为您详细说明。
一、建立WebSocket连接
调用WebSocket库函数(具体实现方式因编程语言或库函数而异),将请求头和URL传入以建立WebSocket连接。
请求头中需添加如下鉴权信息:
{
"Authorization": "bearer <your_dashscope_api_key>", // 将<your_dashscope_api_key>替换成您自己的API Key
"user-agent": "your_platform_info", //可选
"X-DashScope-WorkSpace": workspace, // 可选
"X-DashScope-DataInspection": "enable"
}
WebSocket URL固定如下:
wss://dashscope.aliyuncs.com/api-ws/v1/inference
二、异步监听服务器返回的消息
如上所述,您可以启动一个线程(具体实现因编程语言而异)来监听服务器返回的消息。WebSocket库通常会提供回调函数(观察者模式)来处理这些消息。您可以在回调函数中根据不同的消息类型实现相应的功能。
服务端返回给客户端的消息有两种:
音频流
具体如下:
1、task-started事件:语音合成任务已开启
2、二进制音频流
3、result-generated事件(现阶段忽略该步骤)
4、task-finished事件:语音合成任务已结束
5、task-failed事件:任务失败
三、给服务器发送消息
在与监听服务器消息不同的线程中(比如主线程,具体实现因编程语言而异),向服务器发送消息。
客户端发送给服务端的消息叫做指令,为JSON格式,以Text Frame方式发送,用于控制任务的起止和标识任务边界,由header
和payload
这两部分组成:
payload
:包含基础信息外的其他信息。不同指令的payload
格式可能不同。
向服务器发送消息需要遵循如下时序,否则会导致任务失败:首先发送run-task
指令,待监听到服务器返回的task-started
事件后,再发送continue-task
指令。在continue-task
指令发送结束后,发送finish-task
指令。
1、发送run-task指令:开启语音合成任务
2、按顺序发送一个或多个包含待合成文本的continue-task指令
3、发送finish-task指令:结束语音合成任务
四、关闭WebSocket连接
在程序正常结束、运行中出现异常或接收到task-finished
、task-failed
事件时,关闭WebSocket连接。通常通过调用工具库中的close
函数来实现。
示例代码
Go
package main
import (
"encoding/json"
"fmt"
"net/http"
"os"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
const (
wsURL = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" // WebSocket服务器地址
outputFile = "output.mp3" // 输出文件路径
)
func main() {
// 若没有将API Key配置到环境变量,可将下行替换为:apiKey := "your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
apiKey := os.Getenv("DASHSCOPE_API_KEY")
// 检查并清空输出文件
if err := clearOutputFile(outputFile); err != nil {
fmt.Println("清空输出文件失败:", err)
return
}
// 连接WebSocket服务
conn, err := connectWebSocket(apiKey)
if err != nil {
fmt.Println("连接WebSocket失败:", err)
return
}
defer closeConnection(conn)
// 启动一个goroutine来接收结果
done, taskStarted := startResultReceiver(conn)
// 发送run-task指令
taskID, err := sendRunTaskCmd(conn)
if err != nil {
fmt.Println("发送run-task指令失败:", err)
return
}
// 等待task-started事件
for !*taskStarted {
time.Sleep(100 * time.Millisecond)
}
// 发送待合成文本
if err := sendContinueTaskCmd(conn, taskID); err != nil {
fmt.Println("发送待合成文本失败:", err)
return
}
// 发送finish-task指令
if err := sendFinishTaskCmd(conn, taskID); err != nil {
fmt.Println("发送finish-task指令失败:", err)
return
}
// 等待接收结果的goroutine完成
<-done
}
var dialer = websocket.DefaultDialer
// 定义结构体来表示JSON数据
type Header struct {
Action string `json:"action"`
TaskID string `json:"task_id"`
Streaming string `json:"streaming"`
Event string `json:"event"`
ErrorCode string `json:"error_code,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
Attributes map[string]interface{} `json:"attributes"`
}
type Payload struct {
TaskGroup string `json:"task_group"`
Task string `json:"task"`
Function string `json:"function"`
Model string `json:"model"`
Parameters Params `json:"parameters"`
Resources []Resource `json:"resources"`
Input Input `json:"input"`
}
type Params struct {
TextType string `json:"text_type"`
Voice string `json:"voice"`
Format string `json:"format"`
SampleRate int `json:"sample_rate"`
Volume int `json:"volume"`
Rate int `json:"rate"`
Pitch int `json:"pitch"`
}
type Resource struct {
ResourceID string `json:"resource_id"`
ResourceType string `json:"resource_type"`
}
type Input struct {
Text string `json:"text"`
}
type Event struct {
Header Header `json:"header"`
Payload Payload `json:"payload"`
}
// 连接WebSocket服务
func connectWebSocket(apiKey string) (*websocket.Conn, error) {
header := make(http.Header)
header.Add("X-DashScope-DataInspection", "enable")
header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey))
conn, _, err := dialer.Dial(wsURL, header)
if err != nil {
fmt.Println("连接WebSocket失败:", err)
return nil, err
}
return conn, nil
}
// 发送run-task指令
func sendRunTaskCmd(conn *websocket.Conn) (string, error) {
runTaskCmd, taskID, err := generateRunTaskCmd()
if err != nil {
return "", err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(runTaskCmd))
return taskID, err
}
// 生成run-task指令
func generateRunTaskCmd() (string, string, error) {
taskID := uuid.New().String()
runTaskCmd := Event{
Header: Header{
Action: "run-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
TaskGroup: "audio",
Task: "tts",
Function: "SpeechSynthesizer",
Model: "cosyvoice-v1",
Parameters: Params{
TextType: "PlainText",
Voice: "longxiaochun",
Format: "mp3",
SampleRate: 22050,
Volume: 50,
Rate: 1,
Pitch: 1,
},
Input: Input{},
},
}
runTaskCmdJSON, err := json.Marshal(runTaskCmd)
return string(runTaskCmdJSON), taskID, err
}
// 发送待合成文本
func sendContinueTaskCmd(conn *websocket.Conn, taskID string) error {
texts := []string{"床前明月光", "疑是地上霜", "举头望明月", "低头思故乡"}
for _, text := range texts {
runTaskCmd, err := generateContinueTaskCmd(text, taskID)
if err != nil {
return err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(runTaskCmd))
if err != nil {
return err
}
}
return nil
}
// 生成continue-task指令
func generateContinueTaskCmd(text string, taskID string) (string, error) {
runTaskCmd := Event{
Header: Header{
Action: "continue-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
Input: Input{
Text: text,
},
},
}
runTaskCmdJSON, err := json.Marshal(runTaskCmd)
return string(runTaskCmdJSON), err
}
// 启动一个goroutine来接收结果
func startResultReceiver(conn *websocket.Conn) (chan struct{}, *bool) {
done := make(chan struct{})
taskStarted := new(bool)
*taskStarted = false
go func() {
defer close(done)
for {
msgType, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("解析服务器消息失败:", err)
return
}
if msgType == websocket.BinaryMessage {
// 处理二进制音频流
if err := writeBinaryDataToFile(message, outputFile); err != nil {
fmt.Println("写入二进制数据失败:", err)
return
}
} else {
// 处理文本消息
var event Event
err = json.Unmarshal(message, &event)
if err != nil {
fmt.Println("解析事件失败:", err)
continue
}
if handleEvent(conn, event, taskStarted) {
return
}
}
}
}()
return done, taskStarted
}
// 处理事件
func handleEvent(conn *websocket.Conn, event Event, taskStarted *bool) bool {
switch event.Header.Event {
case "task-started":
fmt.Println("收到task-started事件")
*taskStarted = true
case "result-generated":
// 忽略result-generated事件
return false
case "task-finished":
fmt.Println("任务完成")
return true
case "task-failed":
handleTaskFailed(event, conn)
return true
default:
fmt.Printf("预料之外的事件:%v\n", event)
}
return false
}
// 处理任务失败事件
func handleTaskFailed(event Event, conn *websocket.Conn) {
if event.Header.ErrorMessage != "" {
fmt.Printf("任务失败:%s\n", event.Header.ErrorMessage)
} else {
fmt.Println("未知原因导致任务失败")
}
}
// 关闭连接
func closeConnection(conn *websocket.Conn) {
if conn != nil {
conn.Close()
}
}
// 写入二进制数据到文件
func writeBinaryDataToFile(data []byte, filePath string) error {
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
_, err = file.Write(data)
if err != nil {
return err
}
return nil
}
// 发送finish-task指令
func sendFinishTaskCmd(conn *websocket.Conn, taskID string) error {
finishTaskCmd, err := generateFinishTaskCmd(taskID)
if err != nil {
return err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(finishTaskCmd))
return err
}
// 生成finish-task指令
func generateFinishTaskCmd(taskID string) (string, error) {
finishTaskCmd := Event{
Header: Header{
Action: "finish-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
Input: Input{},
},
}
finishTaskCmdJSON, err := json.Marshal(finishTaskCmd)
return string(finishTaskCmdJSON), err
}
// 清空输出文件
func clearOutputFile(filePath string) error {
file, err := os.OpenFile(filePath, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
file.Close()
return nil
}
C#
需要在项目目录下运行如下命令添加依赖:
dotnet add package Newtonsoft.Json
示例代码如下:
using System.Net.WebSockets;
using System.Text;
using Newtonsoft.Json.Linq;
class Program {
// 若没有将API Key配置到环境变量,可将下行替换为:private const string ApiKey="your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
private static readonly string ApiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY") ?? throw new InvalidOperationException("DASHSCOPE_API_KEY environment variable is not set.");
// WebSocket服务器地址
private const string WebSocketUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/";
// 输出文件路径
private const string OutputFilePath = "output.mp3";
// WebSocket客户端
private static ClientWebSocket _webSocket = new ClientWebSocket();
// 取消令牌源
private static CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
// 任务ID
private static string? _taskId;
// 任务是否已启动
private static TaskCompletionSource<bool> _taskStartedTcs = new TaskCompletionSource<bool>();
static async Task Main(string[] args) {
try {
// 清空输出文件
ClearOutputFile(OutputFilePath);
// 连接WebSocket服务
await ConnectToWebSocketAsync(WebSocketUrl);
// 启动接收消息的任务
Task receiveTask = ReceiveMessagesAsync();
// 发送run-task指令
_taskId = GenerateTaskId();
await SendRunTaskCommandAsync(_taskId);
// 等待task-started事件
await _taskStartedTcs.Task;
// 持续发送continue-task指令
string[] texts = {
"床前明月光",
"疑是地上霜",
"举头望明月",
"低头思故乡"
};
foreach (string text in texts) {
await SendContinueTaskCommandAsync(text);
}
// 发送finish-task指令
await SendFinishTaskCommandAsync(_taskId);
// 等待接收任务完成
await receiveTask;
Console.WriteLine("任务完成,连接已关闭。");
} catch (OperationCanceledException) {
Console.WriteLine("任务被取消。");
} catch (Exception ex) {
Console.WriteLine($"发生错误:{ex.Message}");
} finally {
_cancellationTokenSource.Cancel();
_webSocket.Dispose();
}
}
private static void ClearOutputFile(string filePath) {
if (File.Exists(filePath)) {
File.WriteAllText(filePath, string.Empty);
Console.WriteLine("输出文件已清空。");
} else {
Console.WriteLine("输出文件不存在,无需清空。");
}
}
private static async Task ConnectToWebSocketAsync(string url) {
var uri = new Uri(url);
if (_webSocket.State == WebSocketState.Connecting || _webSocket.State == WebSocketState.Open) {
return;
}
// 设置WebSocket连接的头部信息
_webSocket.Options.SetRequestHeader("Authorization", $"bearer {ApiKey}");
_webSocket.Options.SetRequestHeader("X-DashScope-DataInspection", "enable");
try {
await _webSocket.ConnectAsync(uri, _cancellationTokenSource.Token);
Console.WriteLine("已成功连接到WebSocket服务。");
} catch (OperationCanceledException) {
Console.WriteLine("WebSocket连接被取消。");
} catch (Exception ex) {
Console.WriteLine($"WebSocket连接失败: {ex.Message}");
throw;
}
}
private static async Task SendRunTaskCommandAsync(string taskId) {
var command = CreateCommand("run-task", taskId, "duplex", new {
task_group = "audio",
task = "tts",
function = "SpeechSynthesizer",
model = "cosyvoice-v1",
parameters = new
{
text_type = "PlainText",
voice = "longxiaochun",
format = "mp3",
sample_rate = 22050,
volume = 50,
rate = 1,
pitch = 1
},
input = new { }
});
await SendJsonMessageAsync(command);
Console.WriteLine("已发送run-task指令。");
}
private static async Task SendContinueTaskCommandAsync(string text) {
if (_taskId == null) {
throw new InvalidOperationException("任务ID未初始化。");
}
var command = CreateCommand("continue-task", _taskId, "duplex", new {
input = new {
text
}
});
await SendJsonMessageAsync(command);
Console.WriteLine("已发送continue-task指令。");
}
private static async Task SendFinishTaskCommandAsync(string taskId) {
var command = CreateCommand("finish-task", taskId, "duplex", new {
input = new { }
});
await SendJsonMessageAsync(command);
Console.WriteLine("已发送finish-task指令。");
}
private static async Task SendJsonMessageAsync(string message) {
var buffer = Encoding.UTF8.GetBytes(message);
try {
await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, _cancellationTokenSource.Token);
} catch (OperationCanceledException) {
Console.WriteLine("消息发送被取消。");
}
}
private static async Task ReceiveMessagesAsync() {
while (_webSocket.State == WebSocketState.Open) {
var response = await ReceiveMessageAsync();
if (response != null) {
var eventStr = response["header"]?["event"]?.ToString();
switch (eventStr) {
case "task-started":
Console.WriteLine("任务已启动。");
_taskStartedTcs.TrySetResult(true);
break;
case "task-finished":
Console.WriteLine("任务已完成。");
_cancellationTokenSource.Cancel();
break;
case "task-failed":
Console.WriteLine("任务失败。");
_cancellationTokenSource.Cancel();
break;
default:
// result-generated可在此处理
break;
}
}
}
}
private static async Task<JObject?> ReceiveMessageAsync() {
var buffer = new byte[1024 * 4];
var segment = new ArraySegment<byte>(buffer);
try {
WebSocketReceiveResult result = await _webSocket.ReceiveAsync(segment, _cancellationTokenSource.Token);
if (result.MessageType == WebSocketMessageType.Close) {
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", _cancellationTokenSource.Token);
return null;
}
if (result.MessageType == WebSocketMessageType.Binary) {
// 处理二进制数据
Console.WriteLine("接收到二进制数据...");
// 将二进制数据保存到文件
using (var fileStream = new FileStream(OutputFilePath, FileMode.Append)) {
fileStream.Write(buffer, 0, result.Count);
}
return null;
}
string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
return JObject.Parse(message);
} catch (OperationCanceledException) {
Console.WriteLine("消息接收被取消。");
return null;
}
}
private static string GenerateTaskId() {
return Guid.NewGuid().ToString("N").Substring(0, 32);
}
private static string CreateCommand(string action, string taskId, string streaming, object payload) {
var command = JObject.FromObject(new {
header = new {
action,
task_id = taskId,
streaming
},
payload
});
return command.ToString();
}
}
PHP
示例代码目录结构为:
my-php-project/
├── composer.json
├── vendor/
└── index.php
composer.json内容如下,相关依赖的版本号请根据实际情况自行决定:
{
"require": {
"react/event-loop": "^1.3",
"react/socket": "^1.11",
"react/stream": "^1.2",
"react/http": "^1.1",
"ratchet/pawl": "^0.4"
},
"autoload": {
"psr-4": {
"App\\": "src/"
}
}
}
index.php内容如下:
<?php
require __DIR__ . '/vendor/autoload.php';
use Ratchet\Client\Connector;
use React\EventLoop\Loop;
use React\Socket\Connector as SocketConnector;
# 若没有将API Key配置到环境变量,可将下行替换为:$api_key="your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
$api_key = getenv("DASHSCOPE_API_KEY");
$websocket_url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // WebSocket服务器地址
$output_file = 'output.mp3'; // 输出文件路径
$loop = Loop::get();
if (file_exists($output_file)) {
// 清空文件内容
file_put_contents($output_file, '');
}
// 创建自定义的连接器
$socketConnector = new SocketConnector($loop, [
'tcp' => [
'bindto' => '0.0.0.0:0',
],
'tls' => [
'verify_peer' => false,
'verify_peer_name' => false,
],
]);
$connector = new Connector($loop, $socketConnector);
$headers = [
'Authorization' => 'bearer ' . $api_key,
'X-DashScope-DataInspection' => 'enable'
];
$connector($websocket_url, [], $headers)->then(function ($conn) use ($loop, $output_file) {
echo "连接到WebSocket服务器\n";
// 生成任务ID
$taskId = generateTaskId();
// 发送 run-task 指令
sendRunTaskMessage($conn, $taskId);
// 定义发送 continue-task 指令的函数
$sendContinueTask = function() use ($conn, $loop, $taskId) {
// 待发送的文本
$texts = ["床前明月光", "疑是地上霜", "举头望明月", "低头思故乡"];
foreach ($texts as $text) {
$continueTaskMessage = json_encode([
"header" => [
"action" => "continue-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"input" => [
"text" => $text
]
]
]);
echo "准备发送continue-task指令: " . $continueTaskMessage . "\n";
$conn->send($continueTaskMessage);
}
echo "continue-task指令已发送\n";
// 发送 finish-task 指令
sendFinishTaskMessage($conn, $taskId);
};
// 标记是否收到 task-started 事件
$taskStarted = false;
// 监听消息
$conn->on('message', function($msg) use ($conn, $sendContinueTask, $loop, &$taskStarted, $taskId, $output_file) {
if ($msg->isBinary()) {
// 写入二进制数据到本地文件
file_put_contents($output_file, $msg->getPayload(), FILE_APPEND);
} else {
// 处理非二进制消息
$response = json_decode($msg, true);
if (isset($response['header']['event'])) {
handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, $taskStarted);
} else {
echo "未知的消息格式\n";
}
}
});
// 监听连接关闭
$conn->on('close', function($code = null, $reason = null) {
echo "连接已关闭\n";
if ($code !== null) {
echo "关闭代码: " . $code . "\n";
}
if ($reason !== null) {
echo "关闭原因:" . $reason . "\n";
}
});
}, function ($e) {
echo "无法连接:{$e->getMessage()}\n";
});
$loop->run();
/**
* 生成任务ID
* @return string
*/
function generateTaskId(): string {
return bin2hex(random_bytes(16));
}
/**
* 发送 run-task 指令
* @param $conn
* @param $taskId
*/
function sendRunTaskMessage($conn, $taskId) {
$runTaskMessage = json_encode([
"header" => [
"action" => "run-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"task_group" => "audio",
"task" => "tts",
"function" => "SpeechSynthesizer",
"model" => "cosyvoice-v1",
"parameters" => [
"text_type" => "PlainText",
"voice" => "longxiaochun",
"format" => "mp3",
"sample_rate" => 22050,
"volume" => 50,
"rate" => 1,
"pitch" => 1
],
"input" => ["text" => ""]
]
]);
echo "准备发送run-task指令: " . $runTaskMessage . "\n";
$conn->send($runTaskMessage);
echo "run-task指令已发送\n";
}
/**
* 读取音频文件
* @param string $filePath
* @return bool|string
*/
function readAudioFile(string $filePath) {
$voiceData = file_get_contents($filePath);
if ($voiceData === false) {
echo "无法读取音频文件\n";
}
return $voiceData;
}
/**
* 分割音频数据
* @param string $data
* @param int $chunkSize
* @return array
*/
function splitAudioData(string $data, int $chunkSize): array {
return str_split($data, $chunkSize);
}
/**
* 发送 finish-task 指令
* @param $conn
* @param $taskId
*/
function sendFinishTaskMessage($conn, $taskId) {
$finishTaskMessage = json_encode([
"header" => [
"action" => "finish-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"input" => ["text" => ""]
]
]);
echo "准备发送finish-task指令: " . $finishTaskMessage . "\n";
$conn->send($finishTaskMessage);
echo "finish-task指令已发送\n";
}
/**
* 处理事件
* @param $conn
* @param $response
* @param $sendContinueTask
* @param $loop
* @param $taskId
* @param $taskStarted
*/
function handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, &$taskStarted) {
switch ($response['header']['event']) {
case 'task-started':
echo "任务开始,发送continue-task指令...\n";
$taskStarted = true;
// 发送 continue-task 指令
$sendContinueTask();
break;
case 'result-generated':
// 忽略result-generated事件
break;
case 'task-finished':
echo "任务完成\n";
$conn->close();
break;
case 'task-failed':
echo "任务失败\n";
echo "错误代码:" . $response['header']['error_code'] . "\n";
echo "错误信息:" . $response['header']['error_message'] . "\n";
$conn->close();
break;
case 'error':
echo "错误:" . $response['payload']['message'] . "\n";
break;
default:
echo "未知事件:" . $response['header']['event'] . "\n";
break;
}
// 如果任务已完成,关闭连接
if ($response['header']['event'] == 'task-finished') {
// 等待1秒以确保所有数据都已传输完毕
$loop->addTimer(1, function() use ($conn) {
$conn->close();
echo "客户端关闭连接\n";
});
}
// 如果没有收到 task-started 事件,关闭连接
if (!$taskStarted && in_array($response['header']['event'], ['task-failed', 'error'])) {
$conn->close();
}
}
Node.js
需安装相关依赖:
npm install ws
npm install uuid
示例代码如下:
const WebSocket = require('ws');
const fs = require('fs');
const uuid = require('uuid').v4;
// 若没有将API Key配置到环境变量,可将下行替换为:apiKey = 'your_api_key'。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
const apiKey = process.env.DASHSCOPE_API_KEY;
// WebSocket服务器地址
const url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/';
// 输出文件路径
const outputFilePath = 'output.mp3';
// 清空输出文件
fs.writeFileSync(outputFilePath, '');
// 创建WebSocket客户端
const ws = new WebSocket(url, {
headers: {
Authorization: `bearer ${apiKey}`,
'X-DashScope-DataInspection': 'enable'
}
});
let taskStarted = false;
let taskId = uuid();
ws.on('open', () => {
console.log('已连接到WebSocket服务器');
// 发送run-task指令
const runTaskMessage = JSON.stringify({
header: {
action: 'run-task',
task_id: taskId,
streaming: 'duplex'
},
payload: {
task_group: 'audio',
task: 'tts',
function: 'SpeechSynthesizer',
model: 'cosyvoice-v1',
parameters: {
text_type: 'PlainText',
voice: 'longxiaochun', // 音色
format: 'mp3', // 音频格式
sample_rate: 22050, // 采样率
volume: 50, // 音量
rate: 1, // 语速
pitch: 1 // 音调
},
input: {}
}
});
ws.send(runTaskMessage);
console.log('已发送run-task消息');
});
ws.on('message', (data, isBinary) => {
if (isBinary) {
// 写入二进制数据到文件
fs.appendFile(outputFilePath, data, (err) => {
if (err) {
console.error('写入数据到文件时出错:', err);
} else {
console.log('数据已写入文件');
}
});
} else {
const message = JSON.parse(data);
switch (message.header.event) {
case 'task-started':
taskStarted = true;
console.log('任务已开始');
// 发送continue-task指令
sendContinueTasks(ws);
break;
case 'task-finished':
console.log('任务已完成');
ws.close();
break;
case 'task-failed':
console.error('任务失败:', message.header.error_message);
ws.close();
break;
default:
// 可以在这里处理result-generated
break;
}
}
});
function sendContinueTasks(ws) {
const texts = [
'床前明月光',
'疑是地上霜',
'举头望明月',
'低头思故乡'
];
texts.forEach((text, index) => {
setTimeout(() => {
if (taskStarted) {
const continueTaskMessage = JSON.stringify({
header: {
action: 'continue-task',
task_id: taskId,
streaming: 'duplex'
},
payload: {
input: {
text: text
}
}
});
ws.send(continueTaskMessage);
console.log(`已发送continue-task,文本:${text}`);
}
}, index * 1000); // 每隔1秒发送一次
});
// 发送finish-task指令
setTimeout(() => {
if (taskStarted) {
const finishTaskMessage = JSON.stringify({
header: {
action: 'finish-task',
task_id: taskId,
streaming: 'duplex'
},
payload: {
input: {}
}
});
ws.send(finishTaskMessage);
console.log('已发送finish-task');
}
}, texts.length * 1000 + 1000); // 在所有continue-task指令发送完毕后1秒发送
}
ws.on('close', () => {
console.log('已断开与WebSocket服务器的连接');
});