Go
package main
import (
"encoding/json"
"fmt"
"net/http"
"os"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
const (
wsURL = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" // WebSocket服务端地址
outputFile = "output.mp3" // 输出文件路径
)
func main() {
// 若没有将API Key配置到环境变量,可将下行替换为:apiKey := "your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
apiKey := os.Getenv("DASHSCOPE_API_KEY")
// 检查并清空输出文件
if err := clearOutputFile(outputFile); err != nil {
fmt.Println("清空输出文件失败:", err)
return
}
// 连接WebSocket服务
conn, err := connectWebSocket(apiKey)
if err != nil {
fmt.Println("连接WebSocket失败:", err)
return
}
defer closeConnection(conn)
// 启动一个goroutine来接收结果
done, taskStarted := startResultReceiver(conn)
// 发送run-task指令
taskID, err := sendRunTaskCmd(conn)
if err != nil {
fmt.Println("发送run-task指令失败:", err)
return
}
// 等待task-started事件
for !*taskStarted {
time.Sleep(100 * time.Millisecond)
}
// 发送待合成文本
if err := sendContinueTaskCmd(conn, taskID); err != nil {
fmt.Println("发送待合成文本失败:", err)
return
}
// 发送finish-task指令
if err := sendFinishTaskCmd(conn, taskID); err != nil {
fmt.Println("发送finish-task指令失败:", err)
return
}
// 等待接收结果的goroutine完成
<-done
}
var dialer = websocket.DefaultDialer
// 定义结构体来表示JSON数据
type Header struct {
Action string `json:"action"`
TaskID string `json:"task_id"`
Streaming string `json:"streaming"`
Event string `json:"event"`
ErrorCode string `json:"error_code,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
Attributes map[string]interface{} `json:"attributes"`
}
type Payload struct {
TaskGroup string `json:"task_group"`
Task string `json:"task"`
Function string `json:"function"`
Model string `json:"model"`
Parameters Params `json:"parameters"`
Resources []Resource `json:"resources"`
Input Input `json:"input"`
}
type Params struct {
TextType string `json:"text_type"`
Voice string `json:"voice"`
Format string `json:"format"`
SampleRate int `json:"sample_rate"`
Volume int `json:"volume"`
Rate int `json:"rate"`
Pitch int `json:"pitch"`
}
type Resource struct {
ResourceID string `json:"resource_id"`
ResourceType string `json:"resource_type"`
}
type Input struct {
Text string `json:"text"`
}
type Event struct {
Header Header `json:"header"`
Payload Payload `json:"payload"`
}
// 连接WebSocket服务
func connectWebSocket(apiKey string) (*websocket.Conn, error) {
header := make(http.Header)
header.Add("X-DashScope-DataInspection", "enable")
header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey))
conn, _, err := dialer.Dial(wsURL, header)
if err != nil {
fmt.Println("连接WebSocket失败:", err)
return nil, err
}
return conn, nil
}
// 发送run-task指令
func sendRunTaskCmd(conn *websocket.Conn) (string, error) {
runTaskCmd, taskID, err := generateRunTaskCmd()
if err != nil {
return "", err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(runTaskCmd))
return taskID, err
}
// 生成run-task指令
func generateRunTaskCmd() (string, string, error) {
taskID := uuid.New().String()
runTaskCmd := Event{
Header: Header{
Action: "run-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
TaskGroup: "audio",
Task: "tts",
Function: "SpeechSynthesizer",
Model: "cosyvoice-v2",
Parameters: Params{
TextType: "PlainText",
Voice: "longxiaochun_v2",
Format: "mp3",
SampleRate: 22050,
Volume: 50,
Rate: 1,
Pitch: 1,
},
Input: Input{},
},
}
runTaskCmdJSON, err := json.Marshal(runTaskCmd)
return string(runTaskCmdJSON), taskID, err
}
// 发送待合成文本
func sendContinueTaskCmd(conn *websocket.Conn, taskID string) error {
texts := []string{"床前明月光", "疑是地上霜", "举头望明月", "低头思故乡"}
for _, text := range texts {
runTaskCmd, err := generateContinueTaskCmd(text, taskID)
if err != nil {
return err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(runTaskCmd))
if err != nil {
return err
}
}
return nil
}
// 生成continue-task指令
func generateContinueTaskCmd(text string, taskID string) (string, error) {
runTaskCmd := Event{
Header: Header{
Action: "continue-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
Input: Input{
Text: text,
},
},
}
runTaskCmdJSON, err := json.Marshal(runTaskCmd)
return string(runTaskCmdJSON), err
}
// 启动一个goroutine来接收结果
func startResultReceiver(conn *websocket.Conn) (chan struct{}, *bool) {
done := make(chan struct{})
taskStarted := new(bool)
*taskStarted = false
go func() {
defer close(done)
for {
msgType, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("解析服务器消息失败:", err)
return
}
if msgType == websocket.BinaryMessage {
// 处理二进制音频流
if err := writeBinaryDataToFile(message, outputFile); err != nil {
fmt.Println("写入二进制数据失败:", err)
return
}
} else {
// 处理文本消息
var event Event
err = json.Unmarshal(message, &event)
if err != nil {
fmt.Println("解析事件失败:", err)
continue
}
if handleEvent(conn, event, taskStarted) {
return
}
}
}
}()
return done, taskStarted
}
// 处理事件
func handleEvent(conn *websocket.Conn, event Event, taskStarted *bool) bool {
switch event.Header.Event {
case "task-started":
fmt.Println("收到task-started事件")
*taskStarted = true
case "result-generated":
// 忽略result-generated事件
return false
case "task-finished":
fmt.Println("任务完成")
return true
case "task-failed":
handleTaskFailed(event, conn)
return true
default:
fmt.Printf("预料之外的事件:%v\n", event)
}
return false
}
// 处理任务失败事件
func handleTaskFailed(event Event, conn *websocket.Conn) {
if event.Header.ErrorMessage != "" {
fmt.Printf("任务失败:%s\n", event.Header.ErrorMessage)
} else {
fmt.Println("未知原因导致任务失败")
}
}
// 关闭连接
func closeConnection(conn *websocket.Conn) {
if conn != nil {
conn.Close()
}
}
// 写入二进制数据到文件
func writeBinaryDataToFile(data []byte, filePath string) error {
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer file.Close()
_, err = file.Write(data)
if err != nil {
return err
}
return nil
}
// 发送finish-task指令
func sendFinishTaskCmd(conn *websocket.Conn, taskID string) error {
finishTaskCmd, err := generateFinishTaskCmd(taskID)
if err != nil {
return err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(finishTaskCmd))
return err
}
// 生成finish-task指令
func generateFinishTaskCmd(taskID string) (string, error) {
finishTaskCmd := Event{
Header: Header{
Action: "finish-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
Input: Input{},
},
}
finishTaskCmdJSON, err := json.Marshal(finishTaskCmd)
return string(finishTaskCmdJSON), err
}
// 清空输出文件
func clearOutputFile(filePath string) error {
file, err := os.OpenFile(filePath, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
file.Close()
return nil
}
C#
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
class Program {
// 若没有将API Key配置到环境变量,可将下行替换为:private const string ApiKey="your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
private static readonly string ApiKey = Environment.GetEnvironmentVariable("DASHSCOPE_API_KEY") ?? throw new InvalidOperationException("DASHSCOPE_API_KEY environment variable is not set.");
// WebSocket服务器地址
private const string WebSocketUrl = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/";
// 输出文件路径
private const string OutputFilePath = "output.mp3";
// WebSocket客户端
private static ClientWebSocket _webSocket = new ClientWebSocket();
// 取消令牌源
private static CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
// 任务ID
private static string? _taskId;
// 任务是否已启动
private static TaskCompletionSource<bool> _taskStartedTcs = new TaskCompletionSource<bool>();
static async Task Main(string[] args) {
try {
// 清空输出文件
ClearOutputFile(OutputFilePath);
// 连接WebSocket服务
await ConnectToWebSocketAsync(WebSocketUrl);
// 启动接收消息的任务
Task receiveTask = ReceiveMessagesAsync();
// 发送run-task指令
_taskId = GenerateTaskId();
await SendRunTaskCommandAsync(_taskId);
// 等待task-started事件
await _taskStartedTcs.Task;
// 持续发送continue-task指令
string[] texts = {
"床前明月光",
"疑是地上霜",
"举头望明月",
"低头思故乡"
};
foreach (string text in texts) {
await SendContinueTaskCommandAsync(text);
}
// 发送finish-task指令
await SendFinishTaskCommandAsync(_taskId);
// 等待接收任务完成
await receiveTask;
Console.WriteLine("任务完成,连接已关闭。");
} catch (OperationCanceledException) {
Console.WriteLine("任务被取消。");
} catch (Exception ex) {
Console.WriteLine($"发生错误:{ex.Message}");
} finally {
_cancellationTokenSource.Cancel();
_webSocket.Dispose();
}
}
private static void ClearOutputFile(string filePath) {
if (File.Exists(filePath)) {
File.WriteAllText(filePath, string.Empty);
Console.WriteLine("输出文件已清空。");
} else {
Console.WriteLine("输出文件不存在,无需清空。");
}
}
private static async Task ConnectToWebSocketAsync(string url) {
var uri = new Uri(url);
if (_webSocket.State == WebSocketState.Connecting || _webSocket.State == WebSocketState.Open) {
return;
}
// 设置WebSocket连接的头部信息
_webSocket.Options.SetRequestHeader("Authorization", $"bearer {ApiKey}");
_webSocket.Options.SetRequestHeader("X-DashScope-DataInspection", "enable");
try {
await _webSocket.ConnectAsync(uri, _cancellationTokenSource.Token);
Console.WriteLine("已成功连接到WebSocket服务。");
} catch (OperationCanceledException) {
Console.WriteLine("WebSocket连接被取消。");
} catch (Exception ex) {
Console.WriteLine($"WebSocket连接失败: {ex.Message}");
throw;
}
}
private static async Task SendRunTaskCommandAsync(string taskId) {
var command = CreateCommand("run-task", taskId, "duplex", new {
task_group = "audio",
task = "tts",
function = "SpeechSynthesizer",
model = "cosyvoice-v2",
parameters = new
{
text_type = "PlainText",
voice = "longxiaochun_v2",
format = "mp3",
sample_rate = 22050,
volume = 50,
rate = 1,
pitch = 1
},
input = new { }
});
await SendJsonMessageAsync(command);
Console.WriteLine("已发送run-task指令。");
}
private static async Task SendContinueTaskCommandAsync(string text) {
if (_taskId == null) {
throw new InvalidOperationException("任务ID未初始化。");
}
var command = CreateCommand("continue-task", _taskId, "duplex", new {
input = new {
text
}
});
await SendJsonMessageAsync(command);
Console.WriteLine("已发送continue-task指令。");
}
private static async Task SendFinishTaskCommandAsync(string taskId) {
var command = CreateCommand("finish-task", taskId, "duplex", new {
input = new { }
});
await SendJsonMessageAsync(command);
Console.WriteLine("已发送finish-task指令。");
}
private static async Task SendJsonMessageAsync(string message) {
var buffer = Encoding.UTF8.GetBytes(message);
try {
await _webSocket.SendAsync(new ArraySegment<byte>(buffer), WebSocketMessageType.Text, true, _cancellationTokenSource.Token);
} catch (OperationCanceledException) {
Console.WriteLine("消息发送被取消。");
}
}
private static async Task ReceiveMessagesAsync() {
while (_webSocket.State == WebSocketState.Open) {
var response = await ReceiveMessageAsync();
if (response != null) {
var eventStr = response.RootElement.GetProperty("header").GetProperty("event").GetString();
switch (eventStr) {
case "task-started":
Console.WriteLine("任务已启动。");
_taskStartedTcs.TrySetResult(true);
break;
case "task-finished":
Console.WriteLine("任务已完成。");
_cancellationTokenSource.Cancel();
break;
case "task-failed":
Console.WriteLine("任务失败。");
_cancellationTokenSource.Cancel();
break;
default:
// result-generated可在此处理
break;
}
}
}
}
private static async Task<JsonDocument?> ReceiveMessageAsync() {
var buffer = new byte[1024 * 4];
var segment = new ArraySegment<byte>(buffer);
try {
WebSocketReceiveResult result = await _webSocket.ReceiveAsync(segment, _cancellationTokenSource.Token);
if (result.MessageType == WebSocketMessageType.Close) {
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", _cancellationTokenSource.Token);
return null;
}
if (result.MessageType == WebSocketMessageType.Binary) {
// 处理二进制数据
Console.WriteLine("接收到二进制数据...");
// 将二进制数据保存到文件
using (var fileStream = new FileStream(OutputFilePath, FileMode.Append)) {
fileStream.Write(buffer, 0, result.Count);
}
return null;
}
string message = Encoding.UTF8.GetString(buffer, 0, result.Count);
return JsonDocument.Parse(message);
} catch (OperationCanceledException) {
Console.WriteLine("消息接收被取消。");
return null;
}
}
private static string GenerateTaskId() {
return Guid.NewGuid().ToString("N").Substring(0, 32);
}
private static string CreateCommand(string action, string taskId, string streaming, object payload) {
var command = new {
header = new {
action,
task_id = taskId,
streaming
},
payload
};
return JsonSerializer.Serialize(command);
}
}
PHP
示例代码目录结构为:
my-php-project/
├── composer.json
├── vendor/
└── index.php
composer.json内容如下,相关依赖的版本号请根据实际情况自行决定:
{
"require": {
"react/event-loop": "^1.3",
"react/socket": "^1.11",
"react/stream": "^1.2",
"react/http": "^1.1",
"ratchet/pawl": "^0.4"
},
"autoload": {
"psr-4": {
"App\\": "src/"
}
}
}
index.php内容如下:
<?php
require __DIR__ . '/vendor/autoload.php';
use Ratchet\Client\Connector;
use React\EventLoop\Loop;
use React\Socket\Connector as SocketConnector;
# 若没有将API Key配置到环境变量,可将下行替换为:$api_key="your_api_key"。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
$api_key = getenv("DASHSCOPE_API_KEY");
$websocket_url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/'; // WebSocket服务器地址
$output_file = 'output.mp3'; // 输出文件路径
$loop = Loop::get();
if (file_exists($output_file)) {
// 清空文件内容
file_put_contents($output_file, '');
}
// 创建自定义的连接器
$socketConnector = new SocketConnector($loop, [
'tcp' => [
'bindto' => '0.0.0.0:0',
],
'tls' => [
'verify_peer' => false,
'verify_peer_name' => false,
],
]);
$connector = new Connector($loop, $socketConnector);
$headers = [
'Authorization' => 'bearer ' . $api_key,
'X-DashScope-DataInspection' => 'enable'
];
$connector($websocket_url, [], $headers)->then(function ($conn) use ($loop, $output_file) {
echo "连接到WebSocket服务器\n";
// 生成任务ID
$taskId = generateTaskId();
// 发送 run-task 指令
sendRunTaskMessage($conn, $taskId);
// 定义发送 continue-task 指令的函数
$sendContinueTask = function() use ($conn, $loop, $taskId) {
// 待发送的文本
$texts = ["床前明月光", "疑是地上霜", "举头望明月", "低头思故乡"];
$continueTaskCount = 0;
foreach ($texts as $text) {
$continueTaskMessage = json_encode([
"header" => [
"action" => "continue-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"input" => [
"text" => $text
]
]
]);
echo "准备发送continue-task指令: " . $continueTaskMessage . "\n";
$conn->send($continueTaskMessage);
$continueTaskCount++;
}
echo "发送的continue-task指令个数为:" . $continueTaskCount . "\n";
// 发送 finish-task 指令
sendFinishTaskMessage($conn, $taskId);
};
// 标记是否收到 task-started 事件
$taskStarted = false;
// 监听消息
$conn->on('message', function($msg) use ($conn, $sendContinueTask, $loop, &$taskStarted, $taskId, $output_file) {
if ($msg->isBinary()) {
// 写入二进制数据到本地文件
file_put_contents($output_file, $msg->getPayload(), FILE_APPEND);
} else {
// 处理非二进制消息
$response = json_decode($msg, true);
if (isset($response['header']['event'])) {
handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, $taskStarted);
} else {
echo "未知的消息格式\n";
}
}
});
// 监听连接关闭
$conn->on('close', function($code = null, $reason = null) {
echo "连接已关闭\n";
if ($code !== null) {
echo "关闭代码: " . $code . "\n";
}
if ($reason !== null) {
echo "关闭原因:" . $reason . "\n";
}
});
}, function ($e) {
echo "无法连接:{$e->getMessage()}\n";
});
$loop->run();
/**
* 生成任务ID
* @return string
*/
function generateTaskId(): string {
return bin2hex(random_bytes(16));
}
/**
* 发送 run-task 指令
* @param $conn
* @param $taskId
*/
function sendRunTaskMessage($conn, $taskId) {
$runTaskMessage = json_encode([
"header" => [
"action" => "run-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"task_group" => "audio",
"task" => "tts",
"function" => "SpeechSynthesizer",
"model" => "cosyvoice-v2",
"parameters" => [
"text_type" => "PlainText",
"voice" => "longxiaochun_v2",
"format" => "mp3",
"sample_rate" => 22050,
"volume" => 50,
"rate" => 1,
"pitch" => 1
],
"input" => (object) []
]
]);
echo "准备发送run-task指令: " . $runTaskMessage . "\n";
$conn->send($runTaskMessage);
echo "run-task指令已发送\n";
}
/**
* 读取音频文件
* @param string $filePath
* @return bool|string
*/
function readAudioFile(string $filePath) {
$voiceData = file_get_contents($filePath);
if ($voiceData === false) {
echo "无法读取音频文件\n";
}
return $voiceData;
}
/**
* 分割音频数据
* @param string $data
* @param int $chunkSize
* @return array
*/
function splitAudioData(string $data, int $chunkSize): array {
return str_split($data, $chunkSize);
}
/**
* 发送 finish-task 指令
* @param $conn
* @param $taskId
*/
function sendFinishTaskMessage($conn, $taskId) {
$finishTaskMessage = json_encode([
"header" => [
"action" => "finish-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"input" => (object) []
]
]);
echo "准备发送finish-task指令: " . $finishTaskMessage . "\n";
$conn->send($finishTaskMessage);
echo "finish-task指令已发送\n";
}
/**
* 处理事件
* @param $conn
* @param $response
* @param $sendContinueTask
* @param $loop
* @param $taskId
* @param $taskStarted
*/
function handleEvent($conn, $response, $sendContinueTask, $loop, $taskId, &$taskStarted) {
switch ($response['header']['event']) {
case 'task-started':
echo "任务开始,发送continue-task指令...\n";
$taskStarted = true;
// 发送 continue-task 指令
$sendContinueTask();
break;
case 'result-generated':
// 忽略result-generated事件
break;
case 'task-finished':
echo "任务完成\n";
$conn->close();
break;
case 'task-failed':
echo "任务失败\n";
echo "错误代码:" . $response['header']['error_code'] . "\n";
echo "错误信息:" . $response['header']['error_message'] . "\n";
$conn->close();
break;
case 'error':
echo "错误:" . $response['payload']['message'] . "\n";
break;
default:
echo "未知事件:" . $response['header']['event'] . "\n";
break;
}
// 如果任务已完成,关闭连接
if ($response['header']['event'] == 'task-finished') {
// 等待1秒以确保所有数据都已传输完毕
$loop->addTimer(1, function() use ($conn) {
$conn->close();
echo "客户端关闭连接\n";
});
}
// 如果没有收到 task-started 事件,关闭连接
if (!$taskStarted && in_array($response['header']['event'], ['task-failed', 'error'])) {
$conn->close();
}
}
Node.js
需安装相关依赖:
npm install ws
npm install uuid
示例代码如下:
const WebSocket = require('ws');
const fs = require('fs');
const uuid = require('uuid').v4;
// 若没有将API Key配置到环境变量,可将下行替换为:apiKey = 'your_api_key'。不建议在生产环境中直接将API Key硬编码到代码中,以减少API Key泄露风险。
const apiKey = process.env.DASHSCOPE_API_KEY;
// WebSocket服务器地址
const url = 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/';
// 输出文件路径
const outputFilePath = 'output.mp3';
// 清空输出文件
fs.writeFileSync(outputFilePath, '');
// 创建WebSocket客户端
const ws = new WebSocket(url, {
headers: {
Authorization: `bearer ${apiKey}`,
'X-DashScope-DataInspection': 'enable'
}
});
let taskStarted = false;
let taskId = uuid();
ws.on('open', () => {
console.log('已连接到WebSocket服务器');
// 发送run-task指令
const runTaskMessage = JSON.stringify({
header: {
action: 'run-task',
task_id: taskId,
streaming: 'duplex'
},
payload: {
task_group: 'audio',
task: 'tts',
function: 'SpeechSynthesizer',
model: 'cosyvoice-v2',
parameters: {
text_type: 'PlainText',
voice: 'longxiaochun_v2', // 音色
format: 'mp3', // 音频格式
sample_rate: 22050, // 采样率
volume: 50, // 音量
rate: 1, // 语速
pitch: 1 // 音调
},
input: {}
}
});
ws.send(runTaskMessage);
console.log('已发送run-task消息');
});
const fileStream = fs.createWriteStream(outputFilePath, { flags: 'a' });
ws.on('message', (data, isBinary) => {
if (isBinary) {
// 写入二进制数据到文件
fileStream.write(data);
} else {
const message = JSON.parse(data);
switch (message.header.event) {
case 'task-started':
taskStarted = true;
console.log('任务已开始');
// 发送continue-task指令
sendContinueTasks(ws);
break;
case 'task-finished':
console.log('任务已完成');
ws.close();
fileStream.end(() => {
console.log('文件流已关闭');
});
break;
case 'task-failed':
console.error('任务失败:', message.header.error_message);
ws.close();
fileStream.end(() => {
console.log('文件流已关闭');
});
break;
default:
// 可以在这里处理result-generated
break;
}
}
});
function sendContinueTasks(ws) {
const texts = [
'床前明月光,',
'疑是地上霜。',
'举头望明月,',
'低头思故乡。'
];
texts.forEach((text, index) => {
setTimeout(() => {
if (taskStarted) {
const continueTaskMessage = JSON.stringify({
header: {
action: 'continue-task',
task_id: taskId,
streaming: 'duplex'
},
payload: {
input: {
text: text
}
}
});
ws.send(continueTaskMessage);
console.log(`已发送continue-task,文本:${text}`);
}
}, index * 1000); // 每隔1秒发送一次
});
// 发送finish-task指令
setTimeout(() => {
if (taskStarted) {
const finishTaskMessage = JSON.stringify({
header: {
action: 'finish-task',
task_id: taskId,
streaming: 'duplex'
},
payload: {
input: {}
}
});
ws.send(finishTaskMessage);
console.log('已发送finish-task');
}
}, texts.length * 1000 + 1000); // 在所有continue-task指令发送完毕后1秒发送
}
ws.on('close', () => {
console.log('已断开与WebSocket服务器的连接');
});
Java
如您使用Java编程语言,建议采用Java DashScope SDK进行开发,详情请参见Java SDK。
以下是Java WebSocket的调用示例。在运行示例前,请确保已导入以下依赖:
Java-WebSocket
jackson-databind
推荐您使用Maven或Gradle管理依赖包,其配置如下:
pom.xml
<dependencies>
<!-- WebSocket Client -->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.3</version>
</dependency>
<!-- JSON Processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.0</version>
</dependency>
</dependencies>
build.gradle
// 省略其它代码
dependencies {
// WebSocket Client
implementation 'org.java-websocket:Java-WebSocket:1.5.3'
// JSON Processing
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.0'
}
// 省略其它代码
Java代码如下:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.*;
public class TTSWebSocketClient extends WebSocketClient {
private final String taskId = UUID.randomUUID().toString();
private final String outputFile = "output_" + System.currentTimeMillis() + ".mp3";
private boolean taskFinished = false;
public TTSWebSocketClient(URI serverUri, Map<String, String> headers) {
super(serverUri, headers);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
System.out.println("连接成功");
// 发送run-task指令
String runTaskCommand = "{ \"header\": { \"action\": \"run-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"task_group\": \"audio\", \"task\": \"tts\", \"function\": \"SpeechSynthesizer\", \"model\": \"cosyvoice-v2\", \"parameters\": { \"text_type\": \"PlainText\", \"voice\": \"longxiaochun_v2\", \"format\": \"mp3\", \"sample_rate\": 22050, \"volume\": 50, \"rate\": 1, \"pitch\": 1 }, \"input\": {} }}";
send(runTaskCommand);
}
@Override
public void onMessage(String message) {
System.out.println("收到服务端返回的消息:" + message);
try {
// Parse JSON message
Map<String, Object> messageMap = new ObjectMapper().readValue(message, Map.class);
if (messageMap.containsKey("header")) {
Map<String, Object> header = (Map<String, Object>) messageMap.get("header");
if (header.containsKey("event")) {
String event = (String) header.get("event");
if ("task-started".equals(event)) {
System.out.println("收到服务端返回的task-started事件");
List<String> texts = Arrays.asList(
"床前明月光,疑是地上霜",
"举头望明月,低头思故乡"
);
for (String text : texts) {
// 发送continue-task指令
sendContinueTask(text);
}
// 发送finish-task指令
sendFinishTask();
} else if ("task-finished".equals(event)) {
System.out.println("收到服务端返回的task-finished事件");
taskFinished = true;
closeConnection();
} else if ("task-failed".equals(event)) {
System.out.println("任务失败:" + message);
closeConnection();
}
}
}
} catch (Exception e) {
System.err.println("出现异常:" + e.getMessage());
}
}
@Override
public void onMessage(ByteBuffer message) {
System.out.println("收到的二进制音频数据大小为:" + message.remaining());
try (FileOutputStream fos = new FileOutputStream(outputFile, true)) {
byte[] buffer = new byte[message.remaining()];
message.get(buffer);
fos.write(buffer);
System.out.println("音频数据已写入本地文件" + outputFile + "中");
} catch (IOException e) {
System.err.println("音频数据写入本地文件失败:" + e.getMessage());
}
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("连接关闭:" + reason + " (" + code + ")");
}
@Override
public void onError(Exception ex) {
System.err.println("报错:" + ex.getMessage());
ex.printStackTrace();
}
private void sendContinueTask(String text) {
String command = "{ \"header\": { \"action\": \"continue-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": { \"text\": \"" + text + "\" } }}";
send(command);
}
private void sendFinishTask() {
String command = "{ \"header\": { \"action\": \"finish-task\", \"task_id\": \"" + taskId + "\", \"streaming\": \"duplex\" }, \"payload\": { \"input\": {} }}";
send(command);
}
private void closeConnection() {
if (!isClosed()) {
close();
}
}
public static void main(String[] args) {
try {
String apiKey = System.getenv("DASHSCOPE_API_KEY");
if (apiKey == null || apiKey.isEmpty()) {
System.err.println("请设置 DASHSCOPE_API_KEY 环境变量");
return;
}
Map<String, String> headers = new HashMap<>();
headers.put("Authorization", "bearer " + apiKey);
TTSWebSocketClient client = new TTSWebSocketClient(new URI("wss://dashscope.aliyuncs.com/api-ws/v1/inference/"), headers);
client.connect();
while (!client.isClosed() && !client.taskFinished) {
Thread.sleep(1000);
}
} catch (Exception e) {
System.err.println("连接WebSocket服务失败:" + e.getMessage());
e.printStackTrace();
}
}
}
Python
如您使用Python编程语言,建议采用Python DashScope SDK进行开发,详情请参见Python SDK。
以下是Python WebSocket的调用示例。在运行示例前,请确保通过如下方式导入依赖:
pip uninstall websocket-client
pip uninstall websocket
pip install websocket-client
重要 请不要将运行示例代码的Python命名为“websocket.py”,否则会报错(AttributeError: module 'websocket' has no attribute 'WebSocketApp'. Did you mean: 'WebSocket'?)。
import websocket
import json
import uuid
import os
import time
class TTSClient:
def __init__(self, api_key, uri):
"""
初始化 TTSClient 实例
参数:
api_key (str): 鉴权用的 API Key
uri (str): WebSocket 服务地址
"""
self.api_key = api_key # 替换为你的 API Key
self.uri = uri # 替换为你的 WebSocket 地址
self.task_id = str(uuid.uuid4()) # 生成唯一任务 ID
self.output_file = f"output_{int(time.time())}.mp3" # 输出音频文件路径
self.ws = None # WebSocketApp 实例
self.task_started = False # 是否收到 task-started
self.task_finished = False # 是否收到 task-finished / task-failed
def on_open(self, ws):
"""
WebSocket 连接建立时回调函数
发送 run-task 指令开启语音合成任务
"""
print("WebSocket 已连接")
# 构造 run-task 指令
run_task_cmd = {
"header": {
"action": "run-task",
"task_id": self.task_id,
"streaming": "duplex"
},
"payload": {
"task_group": "audio",
"task": "tts",
"function": "SpeechSynthesizer",
"model": "cosyvoice-v2",
"parameters": {
"text_type": "PlainText",
"voice": "longxiaochun_v2",
"format": "mp3",
"sample_rate": 22050,
"volume": 50,
"rate": 1,
"pitch": 1
},
"input": {}
}
}
# 发送 run-task 指令
ws.send(json.dumps(run_task_cmd))
print("已发送 run-task 指令")
def on_message(self, ws, message):
"""
接收到消息时的回调函数
区分文本和二进制消息处理
"""
if isinstance(message, str):
# 处理 JSON 文本消息
try:
msg_json = json.loads(message)
print(f"收到 JSON 消息: {msg_json}")
if "header" in msg_json:
header = msg_json["header"]
if "event" in header:
event = header["event"]
if event == "task-started":
print("任务已启动")
self.task_started = True
# 发送 continue-task 指令
texts = [
"床前明月光,疑是地上霜",
"举头望明月,低头思故乡"
]
for text in texts:
self.send_continue_task(text)
# 所有 continue-task 发送完成后发送 finish-task
self.send_finish_task()
elif event == "task-finished":
print("任务已完成")
self.task_finished = True
self.close(ws)
elif event == "task-failed":
error_msg = msg_json.get("error_message", "未知错误")
print(f"任务失败: {error_msg}")
self.task_finished = True
self.close(ws)
except json.JSONDecodeError as e:
print(f"JSON 解析失败: {e}")
else:
# 处理二进制消息(音频数据)
print(f"收到二进制消息,大小: {len(message)} 字节")
with open(self.output_file, "ab") as f:
f.write(message)
print(f"已将音频数据写入本地文件{self.output_file}中")
def on_error(self, ws, error):
"""发生错误时的回调"""
print(f"WebSocket 出错: {error}")
def on_close(self, ws, close_status_code, close_msg):
"""连接关闭时的回调"""
print(f"WebSocket 已关闭: {close_msg} ({close_status_code})")
def send_continue_task(self, text):
"""发送 continue-task 指令,附带要合成的文本内容"""
cmd = {
"header": {
"action": "continue-task",
"task_id": self.task_id,
"streaming": "duplex"
},
"payload": {
"input": {
"text": text
}
}
}
self.ws.send(json.dumps(cmd))
print(f"已发送 continue-task 指令,文本内容: {text}")
def send_finish_task(self):
"""发送 finish-task 指令,结束语音合成任务"""
cmd = {
"header": {
"action": "finish-task",
"task_id": self.task_id,
"streaming": "duplex"
},
"payload": {
"input": {}
}
}
self.ws.send(json.dumps(cmd))
print("已发送 finish-task 指令")
def close(self, ws):
"""主动关闭连接"""
if ws and ws.sock and ws.sock.connected:
ws.close()
print("已主动关闭连接")
def run(self):
"""启动 WebSocket 客户端"""
# 设置请求头部(鉴权)
header = {
"Authorization": f"bearer {self.api_key}",
"X-DashScope-DataInspection": "enable"
}
# 创建 WebSocketApp 实例
self.ws = websocket.WebSocketApp(
self.uri,
header=header,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
print("正在监听 WebSocket 消息...")
self.ws.run_forever() # 启动长连接监听
# 示例使用方式
if __name__ == "__main__":
API_KEY = os.environ.get("DASHSCOPE_API_KEY") # 如您未将API Key配置到环境变量,请将API_KEY 设置为您的 API Key
SERVER_URI = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" # 替换为你的 WebSocket 地址
client = TTSClient(API_KEY, SERVER_URI)
client.run()