本文介绍如何通过WebSocket连接访问实时语音识别服务。
DashScope SDK目前仅支持Java和Python。若想使用其他编程语言开发Paraformer实时语音识别应用程序,可以通过WebSocket连接与服务进行通信。
WebSocket是一种支持全双工通信的网络协议。客户端和服务器通过一次握手建立持久连接,双方可以互相主动推送数据,因此在实时性和效率方面具有显著优势。
对于常用编程语言,有许多现成的WebSocket库和示例可供参考,例如:
Go:
gorilla/websocket
PHP:
Ratchet
建议您先了解WebSocket的基本原理和技术细节,再参照本文进行开发。
前提条件
流程简述
客户端和服务端交互流程如下:
建立连接。
发送并接收消息
客户端发送run-task指令。
服务端收到run-task指令后返回task-started事件。
客户端收到task-started事件后,持续发送音频数据至服务端。在此期间,客户端会持续收到服务端返回的result-generated事件,该事件包含语音识别结果。
客户端发送完音频数据后,发送finish-task指令。之后,客户端会继续收到服务端返回的result-generated事件,直到获取所有语音识别结果。
服务端在所有识别结果返回后,返回task-finished事件。
客户端收到task-finished事件,标志着语音识别任务结束。
关闭连接。
流程详情
对于不同的编程语言,尽管实现细节可能有所不同,但基本流程是一致的。
一、建立连接
通过调用工具库,将请求头和URL传入以建立WebSocket连接。
请求头中需添加如下鉴权信息:
{
"Authorization": "bearer <your-dashscope-api-key>", // 将<your-dashscope-api-key>替换成您自己的API-KEY
"user-agent": "your-platform-info", //可选
"X-DashScope-WorkSpace": workspace, // 可选
"X-DashScope-DataInspection": "enable"
}
WebSocket URL固定如下:
wss://dashscope.aliyuncs.com/api-ws/v1/inference
二、发送并接收消息
消息发送与接收流程
1、发送run-task指令:开启语音识别任务
2、接收task-started事件:语音合成任务已开启
3、发送待识别音频流
4、接收result-generated事件:获取语音识别的实时结果
5、发送finish-task指令:结束语音识别任务
6、接收task-finished事件:语音识别任务已结束
任务失败
若接收到task-failed事件,表示任务失败。
task-failed事件:任务失败
三、关闭连接
接收到task-finished事件后,可以关闭WebSocket连接。通常通过调用工具库中的close
函数实现。
示例代码
Go
package main
import (
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"io"
"log"
"net/http"
"os"
"time"
)
const (
apiKey = "your-api-key" // 替换为您的API Key
wsURL = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" // 替换为您的WebSocket URL
audioFile = "asr_example.wav" // 替换为您的音频文件路径
)
func main() {
// 连接WebSocket服务
conn, err := connectWebSocket()
if err != nil {
log.Fatal("连接WebSocket失败:", err)
}
defer conn.Close()
// 发送run-task指令
taskID, err := sendRunTaskCmd(conn)
if err != nil {
log.Fatal("发送run-task指令失败:", err)
}
// 启动一个goroutine来接收结果
done, taskStarted := startResultReceiver(conn)
// 等待task-started事件
for !*taskStarted {
time.Sleep(100 * time.Millisecond)
}
// 发送待识别音频文件流
if err := sendAudioData(conn); err != nil {
log.Fatal("发送音频失败:", err)
}
// 发送finish-task指令
if err := sendFinishTaskCmd(conn, taskID); err != nil {
log.Fatal("发送finish-task指令失败:", err)
}
// 等待接收结果的goroutine完成
<-done
}
var dialer = websocket.DefaultDialer
// 定义结构体来表示JSON数据
type Header struct {
Action string `json:"action"`
TaskID string `json:"task_id"`
Streaming string `json:"streaming"`
Event string `json:"event"`
ErrorCode string `json:"error_code,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
Attributes map[string]interface{} `json:"attributes"`
}
type Output struct {
Sentence struct {
BeginTime int64 `json:"begin_time"`
EndTime *int64 `json:"end_time"`
Text string `json:"text"`
Words []struct {
BeginTime int64 `json:"begin_time"`
EndTime *int64 `json:"end_time"`
Text string `json:"text"`
Punctuation string `json:"punctuation"`
} `json:"words"`
} `json:"sentence"`
Usage interface{} `json:"usage"`
}
type Payload struct {
TaskGroup string `json:"task_group"`
Task string `json:"task"`
Function string `json:"function"`
Model string `json:"model"`
Parameters Params `json:"parameters"`
Resources []Resource `json:"resources"`
Input Input `json:"input"`
Output Output `json:"output,omitempty"`
}
type Params struct {
Format string `json:"format"`
SampleRate int `json:"sample_rate"`
VocabularyID string `json:"vocabulary_id"`
DisfluencyRemovalEnabled bool `json:"disfluency_removal_enabled"`
LanguageHints []string `json:"language_hints"`
}
type Resource struct {
ResourceID string `json:"resource_id"`
ResourceType string `json:"resource_type"`
}
type Input struct {
}
type Event struct {
Header Header `json:"header"`
Payload Payload `json:"payload"`
}
// 连接WebSocket服务
func connectWebSocket() (*websocket.Conn, error) {
header := make(http.Header)
header.Add("X-DashScope-DataInspection", "enable")
header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey))
conn, _, err := dialer.Dial(wsURL, header)
return conn, err
}
// 发送run-task指令
func sendRunTaskCmd(conn *websocket.Conn) (string, error) {
runTaskCmd, taskID, err := generateRunTaskCmd()
if err != nil {
return "", err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(runTaskCmd))
return taskID, err
}
// 生成run-task指令
func generateRunTaskCmd() (string, string, error) {
taskID := uuid.New().String()
runTaskCmd := Event{
Header: Header{
Action: "run-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
TaskGroup: "audio",
Task: "asr",
Function: "recognition",
Model: "paraformer-realtime-v2",
Parameters: Params{
Format: "pcm",
SampleRate: 16000,
},
Input: Input{},
},
}
runTaskCmdJSON, err := json.Marshal(runTaskCmd)
return string(runTaskCmdJSON), taskID, err
}
// 启动一个goroutine来接收结果
func startResultReceiver(conn *websocket.Conn) (chan struct{}, *bool) {
done := make(chan struct{})
taskStarted := new(bool)
*taskStarted = false
go func() {
defer close(done)
for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Println("解析服务器消息失败:", err)
closeConnection(conn)
return
}
var event Event
err = json.Unmarshal(message, &event)
if err != nil {
log.Println("解析事件失败:", err)
closeConnection(conn)
continue
}
if handleEvent(conn, event, taskStarted) {
return
}
}
}()
return done, taskStarted
}
// 处理事件
func handleEvent(conn *websocket.Conn, event Event, taskStarted *bool) bool {
switch event.Header.Event {
case "task-started":
fmt.Println("收到task-started事件")
*taskStarted = true
case "result-generated":
if event.Payload.Output.Sentence.Text != "" {
fmt.Println("识别结果:", event.Payload.Output.Sentence.Text)
}
case "task-finished":
fmt.Println("任务完成")
closeConnection(conn)
return true
case "task-failed":
handleTaskFailed(event, conn)
return true
default:
log.Printf("预料之外的事件:%v", event)
closeConnection(conn)
}
return false
}
// 处理任务失败事件
func handleTaskFailed(event Event, conn *websocket.Conn) {
if event.Header.ErrorMessage != "" {
log.Fatalf("任务失败:%s", event.Header.ErrorMessage)
} else {
log.Fatal("未知原因导致任务失败")
}
closeConnection(conn)
}
// 关闭连接
func closeConnection(conn *websocket.Conn) {
if conn != nil {
conn.Close()
}
}
// 发送音频数据
func sendAudioData(conn *websocket.Conn) error {
file, err := os.Open(audioFile)
if err != nil {
return err
}
defer file.Close()
buf := make([]byte, 1024) // 假设100ms的音频数据大约为1024字节
for {
n, err := file.Read(buf)
if n == 0 {
break
}
if err != nil && err != io.EOF {
return err
}
err = conn.WriteMessage(websocket.BinaryMessage, buf[:n])
if err != nil {
return err
}
time.Sleep(100 * time.Millisecond)
}
return nil
}
// 发送finish-task指令
func sendFinishTaskCmd(conn *websocket.Conn, taskID string) error {
finishTaskCmd, err := generateFinishTaskCmd(taskID)
if err != nil {
return err
}
err = conn.WriteMessage(websocket.TextMessage, []byte(finishTaskCmd))
return err
}
// 生成finish-task指令
func generateFinishTaskCmd(taskID string) (string, error) {
finishTaskCmd := Event{
Header: Header{
Action: "finish-task",
TaskID: taskID,
Streaming: "duplex",
},
Payload: Payload{
Input: Input{},
},
}
finishTaskCmdJSON, err := json.Marshal(finishTaskCmd)
return string(finishTaskCmdJSON), err
}
PHP
示例代码目录结构为:
my-php-project/
├── composer.json
├── vendor/
└── index.php
composer.json内容如下:
{
"require": {
"react/event-loop": "^1.3",
"react/socket": "^1.11",
"react/stream": "^1.2",
"react/http": "^1.1",
"ratchet/pawl": "^0.4"
},
"autoload": {
"psr-4": {
"App\\": "src/"
}
}
}
index.php内容如下:
<?php
require __DIR__ . '/vendor/autoload.php';
use Ratchet\Client\Connector;
use React\EventLoop\Factory;
use React\Socket\Connector as SocketConnector;
use Ratchet\rfc6455\Messaging\Frame;
// 配置常量
define('API_KEY', 'your-api-key'); // 替换为您的API-KEY
define('WEBSOCKET_URL', 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/');
define('AUDIO_FILE_PATH', 'asr_example.wav'); // 替换为您的音频文件路径
$loop = Factory::create();
// 创建自定义的连接器
$socketConnector = new SocketConnector($loop, [
'tcp' => [
'bindto' => '0.0.0.0:0',
],
'tls' => [
'verify_peer' => false,
'verify_peer_name' => false,
],
]);
$connector = new Connector($loop, $socketConnector);
$headers = [
'Authorization' => 'bearer ' . API_KEY,
'X-DashScope-DataInspection' => 'enable'
];
$connector(WEBSOCKET_URL, [], $headers)->then(function ($conn) use ($loop) {
echo "连接到WebSocket服务器\n";
// 生成任务ID
$taskId = generateTaskId();
// 发送 run-task 指令
sendRunTaskMessage($conn, $taskId);
// 读取音频文件
$voiceData = readAudioFile(AUDIO_FILE_PATH);
if ($voiceData === false) {
echo "无法读取音频文件\n";
return;
}
// 分割音频数据
$chunks = splitAudioData($voiceData, 1024);
// 标记是否所有音频数据已发送
$allChunksSent = false;
// 定义发送函数
$sendChunk = function() use ($conn, &$chunks, $loop, &$sendChunk, &$allChunksSent, $taskId) {
if (!empty($chunks)) {
$chunk = array_shift($chunks);
$binaryMsg = new Frame($chunk, true, Frame::OP_BINARY);
$conn->send($binaryMsg);
// 100ms后发送下一个片段
$loop->addTimer(0.1, $sendChunk);
} else {
echo "所有数据块已发送\n";
$allChunksSent = true;
// 发送 finish-task 指令
sendFinishTaskMessage($conn, $taskId);
}
};
// 监听消息
$conn->on('message', function($msg) use ($conn, $chunks, $sendChunk, $loop, &$allChunksSent, $taskId) {
$response = json_decode($msg, true);
if (isset($response['header']['event'])) {
handleEvent($conn, $response, $sendChunk, $loop, $allChunksSent, $taskId);
} else {
echo "未知的消息格式\n";
}
});
// 监听连接关闭
$conn->on('close', function($code = null, $reason = null) use (&$allChunksSent) {
echo "连接已关闭\n";
if ($code !== null) {
echo "关闭代码: " . $code . "\n";
}
if ($reason !== null) {
echo "关闭原因: " . $reason . "\n";
}
if (!$allChunksSent) {
echo "警告: 连接在所有数据块发送前关闭\n";
}
});
}, function ($e) {
echo "无法连接: {$e->getMessage()}\n";
});
$loop->run();
/**
* 生成任务ID
* @return string
*/
function generateTaskId(): string {
return bin2hex(random_bytes(16));
}
/**
* 发送 run-task 指令
* @param $conn
* @param $taskId
*/
function sendRunTaskMessage($conn, $taskId) {
$runTaskMessage = json_encode([
"header" => [
"action" => "run-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"task_group" => "audio",
"task" => "asr",
"function" => "recognition",
"model" => "paraformer-realtime-v2",
"parameters" => [
"format" => "pcm",
"sample_rate" => 16000
],
"resources" => [
[
"resource_id" => "xxxxxxxxxxxx",
"resource_type" => "asr_phrase"
]
],
"input" => []
]
]);
echo "准备发送run-task指令: " . $runTaskMessage . "\n";
$conn->send($runTaskMessage);
echo "run-task指令已发送\n";
}
/**
* 读取音频文件
* @param string $filePath
* @return bool|string
*/
function readAudioFile(string $filePath) {
$voiceData = file_get_contents($filePath);
if ($voiceData === false) {
echo "无法读取音频文件\n";
}
return $voiceData;
}
/**
* 分割音频数据
* @param string $data
* @param int $chunkSize
* @return array
*/
function splitAudioData(string $data, int $chunkSize): array {
return str_split($data, $chunkSize);
}
/**
* 发送 finish-task 指令
* @param $conn
* @param $taskId
*/
function sendFinishTaskMessage($conn, $taskId) {
$finishTaskMessage = json_encode([
"header" => [
"action" => "finish-task",
"task_id" => $taskId,
"streaming" => "duplex"
],
"payload" => [
"input" => []
]
]);
echo "准备发送finish-task指令: " . $finishTaskMessage . "\n";
$conn->send($finishTaskMessage);
echo "finish-task指令已发送\n";
}
/**
* 处理事件
* @param $conn
* @param $response
* @param $sendChunk
* @param $loop
* @param $allChunksSent
* @param $taskId
*/
function handleEvent($conn, $response, $sendChunk, $loop, &$allChunksSent, $taskId) {
switch ($response['header']['event']) {
case 'task-started':
echo "任务开始,发送音频数据...\n";
// 开始发送音频数据
$sendChunk();
break;
case 'result-generated':
$result = $response['payload']['output']['sentence'];
echo "识别结果: " . $result['text'] . "\n";
break;
case 'task-finished':
echo "任务完成\n";
$conn->close();
break;
case 'task-failed':
echo "任务失败\n";
echo "错误代码: " . $response['header']['error_code'] . "\n";
echo "错误信息: " . $response['header']['error_message'] . "\n";
$conn->close();
break;
case 'error':
echo "错误: " . $response['payload']['message'] . "\n";
break;
default:
echo "未知事件: " . $response['header']['event'] . "\n";
break;
}
// 如果所有数据已发送且任务已完成,关闭连接
if ($allChunksSent && $response['header']['event'] == 'task-finished') {
// 等待1秒以确保所有数据都已传输完毕
$loop->addTimer(1, function() use ($conn) {
$conn->close();
echo "客户端关闭连接\n";
});
}
}