本文介绍如何通过WebSocket连接访问Sambert语音合成服务。
DashScope SDK目前仅支持Java和Python。若想使用其他编程语言开发Sambert语音合成应用程序,可以通过WebSocket连接与服务进行通信。
WebSocket是一种支持全双工通信的网络协议。客户端和服务器通过一次握手建立持久连接,双方可以互相主动推送数据,因此在实时性和效率方面具有显著优势。
对于常用编程语言,有许多现成的WebSocket库和示例可供参考,例如:
Go:
gorilla/websocket
PHP:
Ratchet
Node.js:
ws
建议您先了解WebSocket的基本原理和技术细节,再参照本文进行开发。
前提条件
客户端与服务端的交互流程
按时间顺序,客户端与服务端的交互流程如下:
建立连接:客户端与服务端建立WebSocket连接。
开启任务:
客户端发送
run-task
指令以开启任务。客户端收到服务端返回的
task-started
事件,标志着任务已成功开启。
客户端接收服务端持续返回的音频流和
result-generated
事件客户端收到服务端返回的
task-finished
事件,标志着任务结束。关闭连接:客户端关闭WebSocket连接。
WebSocket客户端编程与消息处理
在编写WebSocket客户端代码时,为了同时发送和接收消息,通常采用异步编程。您可以按照以下步骤来编写程序:
建立WebSocket连接:首先,初始化并建立与服务器的WebSocket连接。
异步监听服务器消息:启动一个单独的线程(具体实现方式因编程语言而异)来监听服务器返回的消息,根据消息内容进行相应的操作。
发送消息:在不同于监听服务器消息的线程中(例如主线程,具体实现方式因编程语言而异),向服务器发送消息。
关闭连接:在程序结束前,确保关闭WebSocket连接以释放资源。
当然,编程思路不止这一种,您或许有更好的想法。本文主要介绍通过WebSocket连接访问服务时的鉴权细节及客户端与服务端之间的消息交互。由于篇幅有限,其他思路将不再赘述。
接下来将按照上述思路,为您详细说明。
一、建立WebSocket连接
调用WebSocket库函数(具体实现方式因编程语言或库函数而异),将请求头和URL传入以建立WebSocket连接。
请求头中需添加如下鉴权信息:
{
"Authorization": "bearer <your_dashscope_api_key>", // 将<your_dashscope_api_key>替换成您自己的API Key
"user-agent": "your_platform_info", //可选
"X-DashScope-WorkSpace": workspace, // 可选
"X-DashScope-DataInspection": "enable"
}
WebSocket URL固定如下:
wss://dashscope.aliyuncs.com/api-ws/v1/inference
二、异步监听服务器返回的消息
如上所述,您可以启动一个线程(具体实现因编程语言而异)来监听服务器返回的消息。WebSocket库通常会提供回调函数(观察者模式)来处理这些消息。您可以在回调函数中根据不同的消息类型实现相应的功能。
服务端返回给客户端的消息有两种:
音频流
具体如下:
1、task-started事件:语音合成任务已开启
2、二进制音频流
3、result-generated事件:携带附加信息
4、task-finished事件:语音合成任务已结束
5、task-failed事件:任务失败
三、给服务器发送消息
在与监听服务器消息不同的线程中(比如主线程,具体实现因编程语言而异),向服务器发送消息。
客户端发送给服务端的消息叫做指令,为JSON格式,以Text Frame方式发送,用于控制任务的起止和标识任务边界,由header
和payload
这两部分组成:
payload
:包含基础信息外的其他信息。不同指令的payload
格式可能不同。
向服务器发送的消息只有一种:run-task
指令。
run-task指令:包含所有待合成文本,开启语音合成任务
四、关闭WebSocket连接
在程序正常结束、运行中出现异常或接收到task-finished
、task-failed
事件时,关闭WebSocket连接。通常通过调用工具库中的close
函数来实现。
示例代码
Go
package main
import (
"encoding/json"
"fmt"
"net/http"
"os"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
const (
wsURL = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" // WebSocket服务器地址
outputFile = "output.mp3" // 输出文件路径
)
func main() {
// 若没有将API Key配置到环境变量,可将下行替换为:apiKey := "your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
apiKey := os.Getenv("DASHSCOPE_API_KEY")
// 检查并清空输出文件
if err := clearOutputFile(outputFile); err != nil {
fmt.Println("清空输出文件失败:", err)
return
}
// 连接WebSocket服务
conn, err := connectWebSocket(apiKey)
if err != nil {
fmt.Println("连接WebSocket失败:", err)
return
}
defer closeConnection(conn)
// 创建一个通道用于接收任务完成的通知
done := make(chan struct{})
// 启动异步接收消息的goroutine
go receiveMessage(conn, done)
// 发送run-task指令
if err := sendRunTaskMsg(conn); err != nil {
fmt.Println("发送run-task指令失败:", err)
return
}
// 等待任务完成或超时
select {
case <-done:
fmt.Println("任务结束")
case <-time.After(5 * time.Minute):
fmt.Println("任务超时")
}
}
// 定义消息结构体
type Message struct {
Header Header `json:"header"`
Payload Payload `json:"payload"`
}
// 定义头部信息
type Header struct {
Action string `json:"action,omitempty"`
TaskID string `json:"task_id"`
Streaming string `json:"streaming,omitempty"`
Event string `json:"event,omitempty"`
ErrorCode string `json:"error_code,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
Attributes map[string]interface{} `json:"attributes"`
}
// 定义负载信息
type Payload struct {
Model string `json:"model,omitempty"`
TaskGroup string `json:"task_group,omitempty"`
Task string `json:"task,omitempty"`
Function string `json:"function,omitempty"`
Input Input `json:"input,omitempty"`
Parameters Parameters `json:"parameters,omitempty"`
Output Output `json:"output,omitempty"`
Usage Usage `json:"usage,omitempty"`
}
// 定义输入信息
type Input struct {
Text string `json:"text"`
}
// 定义参数信息
type Parameters struct {
TextType string `json:"text_type"`
Format string `json:"format"`
SampleRate int `json:"sample_rate"`
Volume int `json:"volume"`
Rate float64 `json:"rate"`
Pitch float64 `json:"pitch"`
WordTimestampEnabled bool `json:"word_timestamp_enabled"`
PhonemeTimestampEnabled bool `json:"phoneme_timestamp_enabled"`
}
// 定义输出信息
type Output struct {
Sentence Sentence `json:"sentence"`
}
// 定义句子信息
type Sentence struct {
BeginTime int `json:"begin_time"`
EndTime int `json:"end_time"`
Words []Word `json:"words"`
}
// 定义单词信息
type Word struct {
Text string `json:"text"`
BeginTime int `json:"begin_time"`
EndTime int `json:"end_time"`
Phonemes []Phoneme `json:"phonemes"`
}
// 定义音素信息
type Phoneme struct {
BeginTime int `json:"begin_time"`
EndTime int `json:"end_time"`
Text string `json:"text"`
Tone int `json:"tone"`
}
// 定义使用信息
type Usage struct {
Characters int `json:"characters"`
}
func receiveMessage(conn *websocket.Conn, done chan struct{}) {
for {
msgType, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("解析服务器消息失败:", err)
close(done)
break
}
if msgType == websocket.BinaryMessage {
// 处理二进制音频流
if err := writeBinaryDataToFile(message, outputFile); err != nil {
fmt.Println("写入二进制数据失败:", err)
close(done)
break
}
fmt.Println("音频片段已写入本地文件")
} else {
// 处理文本消息
var msg Message
if err := json.Unmarshal(message, &msg); err != nil {
fmt.Println("解析事件失败:", err)
continue
}
if handleMessage(conn, msg, done) {
break
}
}
}
}
func handleMessage(conn *websocket.Conn, msg Message, done chan struct{}) bool {
switch msg.Header.Event {
case "task-started":
fmt.Println("任务已启动")
case "result-generated":
// 如需获取附加消息,可在此处添加相应代码
case "task-finished":
fmt.Println("任务已完成")
close(done)
return true
case "task-failed":
if msg.Header.ErrorMessage != "" {
fmt.Printf("任务失败:%s\n", msg.Header.ErrorMessage)
} else {
fmt.Println("未知原因导致任务失败")
}
close(done)
return true
default:
fmt.Printf("预料之外的事件:%v\n", msg)
close(done)
}
return false
}
func sendRunTaskMsg(conn *websocket.Conn) error {
runTaskMsg, err := generateRunTaskMsg()
if err != nil {
return err
}
if err := conn.WriteMessage(websocket.TextMessage, []byte(runTaskMsg)); err != nil {
return err
}
return nil
}
func generateRunTaskMsg() (string, error) {
runTaskMessage := Message{
Header: Header{
Action: "run-task",
TaskID: uuid.New().String(),
Streaming: "out",
},
Payload: Payload{
Model: "sambert-zhichu-v1",
TaskGroup: "audio",
Task: "tts",
Function: "SpeechSynthesizer",
Input: Input{
Text: "白日依山尽,黄河入海流。欲穷千里目,更上一层楼。",
},
Parameters: Parameters{
TextType: "PlainText",
Format: "mp3",
SampleRate: 16000,
Volume: 50,
Rate: 1.0,
Pitch: 1.0,
WordTimestampEnabled: true,
PhonemeTimestampEnabled: true,
},
},
}
runTaskMsgJSON, err := json.Marshal(runTaskMessage)
return string(runTaskMsgJSON), err
}
func connectWebSocket(apiKey string) (*websocket.Conn, error) {
header := make(http.Header)
header.Add("X-DashScope-DataInspection", "enable")
header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey))
conn, _, err := websocket.DefaultDialer.Dial(wsURL, header)
if err != nil {
fmt.Println("连接WebSocket失败:", err)
return nil, err
}
return conn, nil
}
func writeBinaryDataToFile(data []byte, filePath string) error {
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
_, err = file.Write(data)
return err
}
func closeConnection(conn *websocket.Conn) {
if conn != nil {
conn.Close()
}
}
func clearOutputFile(filePath string) error {
file, err := os.OpenFile(filePath, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
file.Close()
return nil
}
C#
需要在项目目录下运行如下命令添加依赖:
dotnet add package Newtonsoft.Json
示例代码如下:
using System.Net.WebSockets;
using System.Text;
using Newtonsoft.Json.Linq;
class Program {
// 若没有将API Key配置到环境变量,可将下行替换为:private const string ApiKey="your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
private static readonly string ApiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY") ?? throw new InvalidOperationException("DASHSCOPE_API_KEY environment variable is not set.");
private const string WebSocketUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/"; // WebSocket服务器地址
private const string OutputFilePath = "output.mp3"; // 输出文件路径
static async Task Main(string[] args) {
var ws = new ClientWebSocket();
try {
// 1. 连接WebSocket服务,鉴权
await ConnectWithAuth(ws, WebSocketUrl);
// 2. 启动接收消息的线程
var receiveTask = ReceiveMessages(ws);
// 3. 发送run-task指令
string textToSynthesize = "白日依山尽,黄河入海流。欲穷千里目,更上一层楼。";
string taskId = GenerateTaskId();
await SendRunTaskCommand(ws, textToSynthesize, taskId);
// 4. 等待接收任务完成
await receiveTask;
} catch (Exception ex) {
Console.WriteLine($"错误:{ex.Message}");
} finally {
if (ws.State == WebSocketState.Open) {
await ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "关闭连接", CancellationToken.None);
}
}
}
private static async Task ConnectWithAuth(ClientWebSocket ws, string url) {
var uri = new Uri(url);
ws.Options.SetRequestHeader("Authorization", $"bearer {ApiKey}");
ws.Options.SetRequestHeader("X-DashScope-DataInspection", "enable");
await ws.ConnectAsync(uri, CancellationToken.None);
Console.WriteLine("已连接到WebSocket服务器。");
}
private static string GenerateTaskId() {
return Guid.NewGuid().ToString("N");
}
private static async Task SendRunTaskCommand(ClientWebSocket ws, string text, string taskId) {
var command = CreateRunTaskCommand(text, taskId);
var buffer = Encoding.UTF8.GetBytes(command.ToString());
await ws.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, CancellationToken.None);
Console.WriteLine("已发送run-task指令。");
}
private static JObject CreateRunTaskCommand(string text, string taskId) {
return JObject.FromObject(new {
header = new {
action = "run-task",
task_id = taskId,
streaming = "out"
},
payload = new {
model = "sambert-zhichu-v1",
task_group = "audio",
task = "tts",
function = "SpeechSynthesizer",
input = new {
text = text
},
parameters = new {
text_type = "PlainText",
format = "mp3",
sample_rate = 16000,
volume = 50,
rate = 1,
pitch = 1,
word_timestamp_enabled = true,
phoneme_timestamp_enabled = true
}
}
});
}
private static async Task ReceiveMessages(ClientWebSocket ws) {
var buffer = new byte[1024 * 4];
var fs = new FileStream(OutputFilePath, FileMode.Create, FileAccess.Write);
bool taskStarted = false;
bool taskFinished = false;
while (ws.State == WebSocketState.Open && !taskFinished) {
var result = await ws.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
switch (result.MessageType) {
case WebSocketMessageType.Text:
var message = Encoding.UTF8.GetString(buffer, 0, result.Count);
var jsonMessage = JObject.Parse(message);
ProcessTextMessage(jsonMessage, ref taskStarted, ref taskFinished);
break;
case WebSocketMessageType.Binary:
if (taskStarted) {
await fs.WriteAsync(buffer, 0, result.Count);
Console.WriteLine("收到音频数据。");
}
break;
case WebSocketMessageType.Close:
Console.WriteLine("服务器关闭了连接。");
taskFinished = true;
break;
}
}
fs.Close();
}
private static void ProcessTextMessage(JObject jsonMessage, ref bool taskStarted, ref bool taskFinished) {
if (jsonMessage["header"] is JObject header && header["event"] is JToken eventToken) {
var eventType = eventToken.Value<string>();
switch (eventType) {
case "task-started":
taskStarted = true;
Console.WriteLine("任务开始。");
break;
case "result-generated":
// 如需获取附加消息,可在此处添加相应代码
break;
case "task-finished":
taskFinished = true;
Console.WriteLine("任务完成。");
break;
case "task-failed":
taskFinished = true;
Console.WriteLine("任务失败。");
break;
}
}
}
}
PHP
示例代码目录结构为:
my-php-project/
├── composer.json
├── vendor/
└── index.php
composer.json内容如下,相关依赖的版本号请根据实际情况自行决定:
{
"require": {
"react/event-loop": "^1.3",
"react/socket": "^1.11",
"react/stream": "^1.2",
"react/http": "^1.1",
"ratchet/pawl": "^0.4"
},
"autoload": {
"psr-4": {
"App\\": "src/"
}
}
}
index.php内容如下:
<?php
require 'vendor/autoload.php';
use Ratchet\Client\Connector;
use React\EventLoop\Loop;
use React\Socket\Connector as SocketConnector;
# 若没有将API Key配置到环境变量,可将下行替换为:$api_key="your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
$api_key = getenv("DASHSCOPE_API_KEY");
$websocket_url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // WebSocket服务器地址
$output_file = 'output.mp3'; // 输出文件路径
$loop = Loop::get();
if (file_exists($output_file)) {
// 清空文件内容
file_put_contents($output_file, '');
echo "文件已清空\n";
}
// 创建自定义的连接器
$socketConnector = new SocketConnector($loop, [
'tcp' => [
'bindto' => '0.0.0.0:0',
],
'tls' => [
'verify_peer' => false,
'verify_peer_name' => false,
],
]);
$connector = new Connector($loop, $socketConnector);
$headers = [
'Authorization' => 'bearer ' . $api_key,
'X-DashScope-DataInspection' => 'enable'
];
// 连接WebSocket服务
$connector($websocket_url, [], $headers)
->then(function ($conn) use ($output_file) {
echo "连接成功\n";
// 异步接收WebSocket消息
$conn->on('message', function ($msg) use ($conn, $output_file) {
if ($msg->isBinary()) {
// 写入二进制数据到本地文件
file_put_contents($output_file, $msg->getPayload(), FILE_APPEND);
echo "二进制数据写入文件\n";
} else {
$data = json_decode($msg, true);
switch ($data['header']['event']) {
case 'task-started':
echo "任务开始\n";
break;
case 'result-generated':
// 如需获取附加消息,可在此处添加相应代码
break;
case 'task-finished':
echo "任务完成\n";
$conn->close();
break;
case 'task-failed':
echo "任务失败:" . $data['header']['error_message'] . "\n";
$conn->close();
break;
default:
echo "未知事件:" . $msg . "\n";
}
}
});
// 监听连接关闭
$conn->on('close', function($code = null, $reason = null) {
echo "连接已关闭\n";
if ($code !== null) {
echo "关闭代码:" . $code . "\n";
}
if ($reason !== null) {
echo "关闭原因:" . $reason . "\n";
}
});
// 发送run-task指令
$conn->send(json_encode([
'header' => [
'action' => 'run-task',
'task_id' => bin2hex(random_bytes(16)),
'streaming' => 'out'
],
'payload' => [
'model' => 'sambert-zhichu-v1',
'task_group' => 'audio',
'task' => 'tts',
'function' => 'SpeechSynthesizer',
'input' => [
'text' => '床前明月光,疑是地上霜。举头望明月,低头思故乡。'
],
'parameters' => [
'text_type' => 'PlainText',
'format' => 'mp3',
'sample_rate' => 16000,
'volume' => 50,
'rate' => 1,
'pitch' => 1,
'word_timestamp_enabled' => true,
'phoneme_timestamp_enabled' => true
]
]
]));
echo "run-task指令已发送\n";
}, function (Exception $e) {
echo "连接失败:{$e->getMessage()}\n";
file_put_contents('error.log', $e->getMessage() . "\n", FILE_APPEND);
});
$loop->run();
Node.js
需安装相关依赖:
npm install ws
npm install uuid
示例代码如下:
const WebSocket = require('ws');
const fs = require('fs');
const { v4: uuidv4 } = require('uuid');
// 若没有将API Key配置到环境变量,可将下行替换为:apiKey = 'your_api_key'。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
const apiKey = process.env.DASHSCOPE_API_KEY;
const wsUrl = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // WebSocket服务器地址
const outputFilePath = 'output.mp3'; // 替换为您的音频文件路径
async function main() {
await checkAndClearOutputFile(outputFilePath);
const ws = createWebSocketConnection();
}
function createWebSocketConnection() {
const ws = new WebSocket(wsUrl, {
headers: {
Authorization: `bearer ${apiKey}`,
'X-DashScope-DataInspection': 'enable'
}
});
ws.on('open', () => {
console.log('已连接到WebSocket服务器');
sendRunTaskMessage(ws);
});
ws.on('message', (data, isBinary) => handleWebSocketMessage(data, isBinary, ws));
ws.on('error', (error) => console.error('WebSocket错误:', error));
ws.on('close', () => console.log('WebSocket连接已关闭'));
return ws;
}
function sendRunTaskMessage(ws) {
const taskId = uuidv4();
const runTaskMessage = {
header: {
action: 'run-task',
task_id: taskId,
streaming: 'out'
},
payload: {
model: 'sambert-zhichu-v1',
task_group: 'audio',
task: 'tts',
function: 'SpeechSynthesizer',
input: {
text: '白日依山尽,黄河入海流。欲穷千里目,更上一层楼。'
},
parameters: {
text_type: 'PlainText',
format: 'mp3',
sample_rate: 16000,
volume: 50,
rate: 1,
pitch: 1,
word_timestamp_enabled: true,
phoneme_timestamp_enabled: true
}
}
};
ws.send(JSON.stringify(runTaskMessage));
console.log('run-task指令已发送');
}
function handleWebSocketMessage(data, isBinary, ws) {
if (isBinary) {
fs.appendFile(outputFilePath, data, (err) => {
if (err) throw err;
console.log('音频数据已写入文件');
});
} else {
const message = JSON.parse(data);
handleWebSocketEvent(message, ws);
}
}
function handleWebSocketEvent(message, ws) {
switch (message.header.event) {
case 'task-started':
console.log('任务已启动');
break;
case 'result-generated':
console.log('结果已生成');
break;
case 'task-finished':
console.log('任务已完成');
ws.close();
break;
case 'task-failed':
console.error('任务失败:', message.header.error_message);
ws.close();
break;
default:
console.log('未知事件:', message.header.event);
}
}
function checkAndClearOutputFile(filePath) {
return new Promise((resolve, reject) => {
fs.access(filePath, fs.F_OK, (err) => {
if (!err) {
fs.truncate(filePath, 0, (truncateErr) => {
if (truncateErr) return reject(truncateErr);
console.log('文件已清空');
resolve();
});
} else {
fs.open(filePath, 'w', (openErr) => {
if (openErr) return reject(openErr);
console.log('文件已创建');
resolve();
});
}
});
});
}
main().catch(console.error);