通过WebSocket连接访问实时语音识别服务

本文介绍如何通过WebSocket连接访问实时语音识别服务。

DashScope SDK目前仅支持JavaPython。若想使用其他编程语言开发Paraformer实时语音识别应用程序,可以通过WebSocket连接与服务进行通信。

WebSocket是一种支持全双工通信的网络协议。客户端和服务器通过一次握手建立持久连接,双方可以互相主动推送数据,因此在实时性和效率方面具有显著优势。

对于常用编程语言,有许多现成的WebSocket库和示例可供参考,例如:

  • Go:gorilla/websocket

  • PHP:Ratchet

建议您先了解WebSocket的基本原理和技术细节,再参照本文进行开发。

前提条件

已开通服务并获取API Key

流程简述

客户端和服务端交互流程如下:

  1. 建立连接。

  2. 发送并接收消息

    1. 客户端发送run-task指令。

    2. 服务端收到run-task指令后返回task-started事件。

    3. 客户端收到task-started事件后,持续发送音频数据至服务端。在此期间,客户端会持续收到服务端返回的result-generated事件,该事件包含语音识别结果。

    4. 客户端发送完音频数据后,发送finish-task指令。之后,客户端会继续收到服务端返回的result-generated事件,直到获取所有语音识别结果。

    5. 服务端在所有识别结果返回后,返回task-finished事件。

    6. 客户端收到task-finished事件,标志着语音识别任务结束。

  3. 关闭连接。

image

流程详情

对于不同的编程语言,尽管实现细节可能有所不同,但基本流程是一致的。

一、建立连接

通过调用工具库,将请求头和URL传入以建立WebSocket连接。

请求头中需添加如下鉴权信息:

{
    "Authorization": "bearer <your-dashscope-api-key>", // 将<your-dashscope-api-key>替换成您自己的API-KEY
    "user-agent": "your-platform-info", //可选
    "X-DashScope-WorkSpace": workspace, // 可选
    "X-DashScope-DataInspection": "enable"
}

WebSocket URL固定如下:

wss://dashscope.aliyuncs.com/api-ws/v1/inference

二、发送并接收消息

什么是指令和事件?

客户端发送给服务端的消息叫做指令,指令以Text Frame方式发送,用于控制语音识别任务的起止和标识任务边界。

服务端返回给客户端的消息叫做事件,事件代表不同的处理阶段。

指令和事件都是JSON格式,由headerpayload这两部分组成。

  • header包含基础信息,格式较为统一。

    指令中的header

    所有指令的header格式统一。

    header示例:

    {
        "header": {
            "action": "run-task",
            "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx", // 随机uuid
            "streaming": "duplex"
        }
    }

    header参数:

    参数

    类型

    是否必选

    说明

    header

    请求头

    -

    -

    header.action

    String

    指令类型,可以选填

    • "run-task"

    • "finish-task"

    用法参见下文。

    header.task_id

    String

    当次任务ID,随机生成的32位唯一ID。

    32位通用唯一识别码(UUID),由32个随机生成的字母和数字组成。可以带横线(如 "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx")或不带横线(如 "2bf83b9abaeb4fda8d9axxxxxxxxxxxx")。大多数编程语言都内置了生成UUIDAPI,例如Python:

    import uuid
    
    def generateTaskId(self):
        # 生成随机UUID
        return uuid.uuid4().hex

    header.streaming

    String

    固定字符串:"duplex"

    事件中的header

    task-failed外,所有事件的header格式统一。

    header示例:

    {
        "header": {
            "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
            "event": "task-started",
            "attributes": {}
        }
    }

    header参数:

    参数

    类型

    说明

    header

    请求头

    -

    header.event

    String

    事件类型

    • task-started

    • result-generated

    • task-finished

    • task-failed

    详细说明参见下文。

    header.task_id

    String

    客户端生成的task_id

  • payload包含基础信息外的其他信息。不同指令或者事件的payload格式可能不同。

消息发送与接收流程

1、发送run-task指令:开启语音识别任务

该指令用于开启语音识别任务。

示例:

{
    "header": {
        "action": "run-task",
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx", // 随机uuid
        "streaming": "duplex"
    },
    "payload": {
        "task_group": "audio",
        "task": "asr",
        "function": "recognition",
        "model": "paraformer-realtime-v2",
        "parameters": {
            "format": "pcm", // 音频格式
            "sample_rate": 16000, // 采样率
            "vocabulary_id": "vocab-xxx-24ee19fa8cfb4d52902170a0xxxxxxxx", // paraformer-realtime-v2支持的热词ID
            "disfluency_removal_enabled": false, // 过滤语气词
            "language_hints": [
                "en"
            ] // 指定语言,仅支持paraformer-realtime-v2模型
        },
        "resources": [
            {
                "resource_id": "xxxxxxxxxxxx", // paraformer-realtime-v1支持的热词ID
                "resource_type": "asr_phrase"
            }
        ],
        "input": {}
    }
}

payload参数说明:

参数

类型

是否必选

说明

payload.task_group

String

固定字符串:"audio"。

payload.task

String

固定字符串:"asr"。

payload.function

String

固定字符串:"recognition"。

payload.model

String

模型名称:"paraformer-realtime-v2",支持的模型请参考实时语音识别API详情

payload.input

Object

固定格式:{}。

payload.parameters

format

String

音频编码格式,支持'pcm'、'wav'、'opus'、'speex'、'aac'、'amr'等多种音频格式。

sample_rate

Integer

识别音频采样率支持8000Hz以及16000Hz。

  • paraformer-realtime-v2 仅支持16000Hz采样率。

  • paraformer-realtime-v1 仅支持16000Hz采样。

  • paraformer-realtime-8k-v1 仅支持8000Hz采样率。

vocabulary_id

String

最新热词ID,支持最新v2系列模型并配置语种信息,此次语音识别中生效此热词ID对应的热词信息。默认不启用。使用方法请参考定制热词

disfluency_removal_enabled

Boolean

过滤语气词,默认关闭。

language_hints

List[String]

指定识别语音中语言的代码列表。支持的语言代码:

  • zh: 中文。

  • en: 英文。

  • ja: 日语。

  • yue: 粤语。

  • ko: 韩语。

仅对支持多语言的模型生效。如果不填写则模型会自动识别语种。

payload.resources(内容为列表)

resource_id

String

热词ID,此次语音识别中生效此热词ID对应的热词信息。默认不启用。

注:phrase_id为旧版本热词方案,不支持v2及后续系列模型。支持旧版本热词的模型列表请参考Paraformer语音识别热词定制与管理

resource_type

String

固定字符串"asr_phrase",和"resource_id"需同时使用。

2、接收task-started事件:语音合成任务已开启

客户端成功发送run-task指令后,服务端返回task-started事件。

task-started事件的payload没有内容。

示例:

{
    "header": {
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
        "event": "task-started",
        "attributes": {}
    },
    "payload": {}
}
3、发送待识别音频流

客户端需在收到task-started事件后,再发送待识别的音频流。

音频通过WebSocket的二进制通道上传。建议每次发送100ms的音频,并间隔100ms。

4、接收result-generated事件:获取语音识别的实时结果

客户端发送待识别音频时,服务端通过result-generated事件实时返回识别结果。

可以通过payload.sentence.endTime是否为空来判断该结果是中间结果还是最终结果。

示例:

{
  "header": {
    "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
    "event": "result-generated",
    "attributes": {}
  },
  "payload": {
    "output": {
      "sentence": {
        "begin_time": 170,
        "end_time": null,
        "text": "好,我们的一个",
        "words": [
          {
            "begin_time": 170,
            "end_time": 295,
            "text": "好",
            "punctuation": ","
          },
          {
            "begin_time": 295,
            "end_time": 503,
            "text": "我们",
            "punctuation": ""
          },
          {
            "begin_time": 503,
            "end_time": 711,
            "text": "的一",
            "punctuation": ""
          },
          {
            "begin_time": 711,
            "end_time": 920,
            "text": "个",
            "punctuation": ""
          }
        ]
      }
    },
    "usage": null
  }
}

payload参数说明:

参数

类型

说明

output

Object

output.sentence为识别结果,详细内容见下文。

usage

Object

固定为null。

payload.output.sentence格式如下:

参数

类型

说明

begin_time

Long

句子开始时间。

end_time

Long

句子结束时间,如果为中间识别结果则为null。

text

String

识别文本。

words

List<Word>

字时间戳信息。

payload.output.sentence.words为字时间戳列表,其中每一个word格式如下:

参数

类型

说明

begin_time

Long

字开始时间。

end_time

Long

字结束时间。

text

String

字。

punctuation

String

标点。

5、发送finish-task指令:结束语音识别任务

该指令用于结束语音识别任务。音频发送完毕后,客户端可以发送此指令以结束任务。

示例:

{
    "header": {
        "action": "finish-task",
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
        "streaming": "duplex"
    },
    "payload": {
        "input": {}
    }
}

payload参数说明:

参数

类型

是否必选

说明

payload.input

Object

固定格式:{}。

6、接收task-finished事件:语音识别任务已结束

客户端发送finish-task指令后,服务端在所有识别结果返回完毕后返回task-finished事件,标志着识别任务结束。

示例:

{
    "header": {
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
        "event": "task-finished",
        "attributes": {}
    },
    "payload": {
        "output": {},
        "usage": null
    }
}

任务失败

若接收到task-failed事件,表示任务失败。

task-failed事件:任务失败

如果识别任务失败,客户端会收到task-failed事件。收到该事件后结束连接并处理报错。

示例:

{
    "header": {
        "task_id": "2bf83b9a-baeb-4fda-8d9a-xxxxxxxxxxxx",
        "event": "task-failed",
        "error_code": "CLIENT_ERROR",
        "error_message": "request timeout after 23 seconds.",
        "attributes": {}
    },
    "payload": {}
}

header参数说明:

参数

类型

说明

header.error_code

String

报错类型描述。

header.error_message

String

具体报错原因。

三、关闭连接

接收到task-finished事件后,可以关闭WebSocket连接。通常通过调用工具库中的close函数实现。

示例代码

Go

package main

import (
	"encoding/json"
	"fmt"
	"github.com/google/uuid"
	"github.com/gorilla/websocket"
	"io"
	"log"
	"net/http"
	"os"
	"time"
)

const (
	apiKey    = "your-api-key"                                      // 替换为您的API Key
	wsURL     = "wss://dashscope.aliyuncs.com/api-ws/v1/inference/" // 替换为您的WebSocket URL
	audioFile = "asr_example.wav"                                   // 替换为您的音频文件路径
)

func main() {
	// 连接WebSocket服务
	conn, err := connectWebSocket()
	if err != nil {
		log.Fatal("连接WebSocket失败:", err)
	}
	defer conn.Close()

	// 发送run-task指令
	taskID, err := sendRunTaskCmd(conn)
	if err != nil {
		log.Fatal("发送run-task指令失败:", err)
	}

	// 启动一个goroutine来接收结果
	done, taskStarted := startResultReceiver(conn)

	// 等待task-started事件
	for !*taskStarted {
		time.Sleep(100 * time.Millisecond)
	}

	// 发送待识别音频文件流
	if err := sendAudioData(conn); err != nil {
		log.Fatal("发送音频失败:", err)
	}

	// 发送finish-task指令
	if err := sendFinishTaskCmd(conn, taskID); err != nil {
		log.Fatal("发送finish-task指令失败:", err)
	}

	// 等待接收结果的goroutine完成
	<-done
}

var dialer = websocket.DefaultDialer

// 定义结构体来表示JSON数据
type Header struct {
	Action       string                 `json:"action"`
	TaskID       string                 `json:"task_id"`
	Streaming    string                 `json:"streaming"`
	Event        string                 `json:"event"`
	ErrorCode    string                 `json:"error_code,omitempty"`
	ErrorMessage string                 `json:"error_message,omitempty"`
	Attributes   map[string]interface{} `json:"attributes"`
}

type Output struct {
	Sentence struct {
		BeginTime int64  `json:"begin_time"`
		EndTime   *int64 `json:"end_time"`
		Text      string `json:"text"`
		Words     []struct {
			BeginTime   int64  `json:"begin_time"`
			EndTime     *int64 `json:"end_time"`
			Text        string `json:"text"`
			Punctuation string `json:"punctuation"`
		} `json:"words"`
	} `json:"sentence"`
	Usage interface{} `json:"usage"`
}

type Payload struct {
	TaskGroup  string     `json:"task_group"`
	Task       string     `json:"task"`
	Function   string     `json:"function"`
	Model      string     `json:"model"`
	Parameters Params     `json:"parameters"`
	Resources  []Resource `json:"resources"`
	Input      Input      `json:"input"`
	Output     Output     `json:"output,omitempty"`
}

type Params struct {
	Format                   string   `json:"format"`
	SampleRate               int      `json:"sample_rate"`
	VocabularyID             string   `json:"vocabulary_id"`
	DisfluencyRemovalEnabled bool     `json:"disfluency_removal_enabled"`
	LanguageHints            []string `json:"language_hints"`
}

type Resource struct {
	ResourceID   string `json:"resource_id"`
	ResourceType string `json:"resource_type"`
}

type Input struct {
}

type Event struct {
	Header  Header  `json:"header"`
	Payload Payload `json:"payload"`
}

// 连接WebSocket服务
func connectWebSocket() (*websocket.Conn, error) {
	header := make(http.Header)
	header.Add("X-DashScope-DataInspection", "enable")
	header.Add("Authorization", fmt.Sprintf("bearer %s", apiKey))
	conn, _, err := dialer.Dial(wsURL, header)
	return conn, err
}

// 发送run-task指令
func sendRunTaskCmd(conn *websocket.Conn) (string, error) {
	runTaskCmd, taskID, err := generateRunTaskCmd()
	if err != nil {
		return "", err
	}
	err = conn.WriteMessage(websocket.TextMessage, []byte(runTaskCmd))
	return taskID, err
}

// 生成run-task指令
func generateRunTaskCmd() (string, string, error) {
	taskID := uuid.New().String()
	runTaskCmd := Event{
		Header: Header{
			Action:    "run-task",
			TaskID:    taskID,
			Streaming: "duplex",
		},
		Payload: Payload{
			TaskGroup: "audio",
			Task:      "asr",
			Function:  "recognition",
			Model:     "paraformer-realtime-v2",
			Parameters: Params{
				Format:     "pcm",
				SampleRate: 16000,
			},
			Input: Input{},
		},
	}
	runTaskCmdJSON, err := json.Marshal(runTaskCmd)
	return string(runTaskCmdJSON), taskID, err
}

// 启动一个goroutine来接收结果
func startResultReceiver(conn *websocket.Conn) (chan struct{}, *bool) {
	done := make(chan struct{})
	taskStarted := new(bool)
	*taskStarted = false

	go func() {
		defer close(done)
		for {
			_, message, err := conn.ReadMessage()
			if err != nil {
				log.Println("解析服务器消息失败:", err)
				closeConnection(conn)
				return
			}
			var event Event
			err = json.Unmarshal(message, &event)
			if err != nil {
				log.Println("解析事件失败:", err)
				closeConnection(conn)
				continue
			}
			if handleEvent(conn, event, taskStarted) {
				return
			}
		}
	}()

	return done, taskStarted
}

// 处理事件
func handleEvent(conn *websocket.Conn, event Event, taskStarted *bool) bool {
	switch event.Header.Event {
	case "task-started":
		fmt.Println("收到task-started事件")
		*taskStarted = true
	case "result-generated":
		if event.Payload.Output.Sentence.Text != "" {
			fmt.Println("识别结果:", event.Payload.Output.Sentence.Text)
		}
	case "task-finished":
		fmt.Println("任务完成")
		closeConnection(conn)
		return true
	case "task-failed":
		handleTaskFailed(event, conn)
		return true
	default:
		log.Printf("预料之外的事件:%v", event)
		closeConnection(conn)
	}
	return false
}

// 处理任务失败事件
func handleTaskFailed(event Event, conn *websocket.Conn) {
	if event.Header.ErrorMessage != "" {
		log.Fatalf("任务失败:%s", event.Header.ErrorMessage)
	} else {
		log.Fatal("未知原因导致任务失败")
	}
	closeConnection(conn)
}

// 关闭连接
func closeConnection(conn *websocket.Conn) {
	if conn != nil {
		conn.Close()
	}
}

// 发送音频数据
func sendAudioData(conn *websocket.Conn) error {
	file, err := os.Open(audioFile)
	if err != nil {
		return err
	}
	defer file.Close()

	buf := make([]byte, 1024) // 假设100ms的音频数据大约为1024字节
	for {
		n, err := file.Read(buf)
		if n == 0 {
			break
		}
		if err != nil && err != io.EOF {
			return err
		}
		err = conn.WriteMessage(websocket.BinaryMessage, buf[:n])
		if err != nil {
			return err
		}
		time.Sleep(100 * time.Millisecond)
	}
	return nil
}

// 发送finish-task指令
func sendFinishTaskCmd(conn *websocket.Conn, taskID string) error {
	finishTaskCmd, err := generateFinishTaskCmd(taskID)
	if err != nil {
		return err
	}
	err = conn.WriteMessage(websocket.TextMessage, []byte(finishTaskCmd))
	return err
}

// 生成finish-task指令
func generateFinishTaskCmd(taskID string) (string, error) {
	finishTaskCmd := Event{
		Header: Header{
			Action:    "finish-task",
			TaskID:    taskID,
			Streaming: "duplex",
		},
		Payload: Payload{
			Input: Input{},
		},
	}
	finishTaskCmdJSON, err := json.Marshal(finishTaskCmd)
	return string(finishTaskCmdJSON), err
}

PHP

示例代码目录结构为:

my-php-project/

├── composer.json

├── vendor/

└── index.php

composer.json内容如下:

{
    "require": {
        "react/event-loop": "^1.3",
        "react/socket": "^1.11",
        "react/stream": "^1.2",
        "react/http": "^1.1",
        "ratchet/pawl": "^0.4"
    },
    "autoload": {
        "psr-4": {
            "App\\": "src/"
        }
    }
}

index.php内容如下:

<?php

require __DIR__ . '/vendor/autoload.php';

use Ratchet\Client\Connector;
use React\EventLoop\Factory;
use React\Socket\Connector as SocketConnector;
use Ratchet\rfc6455\Messaging\Frame;

// 配置常量
define('API_KEY', 'your-api-key'); // 替换为您的API-KEY
define('WEBSOCKET_URL', 'wss://dashscope.aliyuncs.com/api-ws/v1/inference/');
define('AUDIO_FILE_PATH', 'asr_example.wav'); // 替换为您的音频文件路径

$loop = Factory::create();

// 创建自定义的连接器
$socketConnector = new SocketConnector($loop, [
    'tcp' => [
        'bindto' => '0.0.0.0:0',
    ],
    'tls' => [
        'verify_peer' => false,
        'verify_peer_name' => false,
    ],
]);

$connector = new Connector($loop, $socketConnector);

$headers = [
    'Authorization' => 'bearer ' . API_KEY,
    'X-DashScope-DataInspection' => 'enable'
];

$connector(WEBSOCKET_URL, [], $headers)->then(function ($conn) use ($loop) {
    echo "连接到WebSocket服务器\n";

    // 生成任务ID
    $taskId = generateTaskId();

    // 发送 run-task 指令
    sendRunTaskMessage($conn, $taskId);

    // 读取音频文件
    $voiceData = readAudioFile(AUDIO_FILE_PATH);
    if ($voiceData === false) {
        echo "无法读取音频文件\n";
        return;
    }

    // 分割音频数据
    $chunks = splitAudioData($voiceData, 1024);

    // 标记是否所有音频数据已发送
    $allChunksSent = false;

    // 定义发送函数
    $sendChunk = function() use ($conn, &$chunks, $loop, &$sendChunk, &$allChunksSent, $taskId) {
        if (!empty($chunks)) {
            $chunk = array_shift($chunks);
            $binaryMsg = new Frame($chunk, true, Frame::OP_BINARY);
            $conn->send($binaryMsg);
            // 100ms后发送下一个片段
            $loop->addTimer(0.1, $sendChunk);
        } else {
            echo "所有数据块已发送\n";
            $allChunksSent = true;

            // 发送 finish-task 指令
            sendFinishTaskMessage($conn, $taskId);
        }
    };

    // 监听消息
    $conn->on('message', function($msg) use ($conn, $chunks, $sendChunk, $loop, &$allChunksSent, $taskId) {
        $response = json_decode($msg, true);

        if (isset($response['header']['event'])) {
            handleEvent($conn, $response, $sendChunk, $loop, $allChunksSent, $taskId);
        } else {
            echo "未知的消息格式\n";
        }
    });

    // 监听连接关闭
    $conn->on('close', function($code = null, $reason = null) use (&$allChunksSent) {
        echo "连接已关闭\n";
        if ($code !== null) {
            echo "关闭代码: " . $code . "\n";
        }
        if ($reason !== null) {
            echo "关闭原因: " . $reason . "\n";
        }
        if (!$allChunksSent) {
            echo "警告: 连接在所有数据块发送前关闭\n";
        }
    });
}, function ($e) {
    echo "无法连接: {$e->getMessage()}\n";
});

$loop->run();

/**
 * 生成任务ID
 * @return string
 */
function generateTaskId(): string {
    return bin2hex(random_bytes(16));
}

/**
 * 发送 run-task 指令
 * @param $conn
 * @param $taskId
 */
function sendRunTaskMessage($conn, $taskId) {
    $runTaskMessage = json_encode([
        "header" => [
            "action" => "run-task",
            "task_id" => $taskId,
            "streaming" => "duplex"
        ],
        "payload" => [
            "task_group" => "audio",
            "task" => "asr",
            "function" => "recognition",
            "model" => "paraformer-realtime-v2",
            "parameters" => [
                "format" => "pcm",
                "sample_rate" => 16000
            ],
            "resources" => [
                [
                    "resource_id" => "xxxxxxxxxxxx",
                    "resource_type" => "asr_phrase"
                ]
            ],
            "input" => []
        ]
    ]);
    echo "准备发送run-task指令: " . $runTaskMessage . "\n";
    $conn->send($runTaskMessage);
    echo "run-task指令已发送\n";
}

/**
 * 读取音频文件
 * @param string $filePath
 * @return bool|string
 */
function readAudioFile(string $filePath) {
    $voiceData = file_get_contents($filePath);
    if ($voiceData === false) {
        echo "无法读取音频文件\n";
    }
    return $voiceData;
}

/**
 * 分割音频数据
 * @param string $data
 * @param int $chunkSize
 * @return array
 */
function splitAudioData(string $data, int $chunkSize): array {
    return str_split($data, $chunkSize);
}

/**
 * 发送 finish-task 指令
 * @param $conn
 * @param $taskId
 */
function sendFinishTaskMessage($conn, $taskId) {
    $finishTaskMessage = json_encode([
        "header" => [
            "action" => "finish-task",
            "task_id" => $taskId,
            "streaming" => "duplex"
        ],
        "payload" => [
            "input" => []
        ]
    ]);
    echo "准备发送finish-task指令: " . $finishTaskMessage . "\n";
    $conn->send($finishTaskMessage);
    echo "finish-task指令已发送\n";
}

/**
 * 处理事件
 * @param $conn
 * @param $response
 * @param $sendChunk
 * @param $loop
 * @param $allChunksSent
 * @param $taskId
 */
function handleEvent($conn, $response, $sendChunk, $loop, &$allChunksSent, $taskId) {
    switch ($response['header']['event']) {
        case 'task-started':
            echo "任务开始,发送音频数据...\n";
            // 开始发送音频数据
            $sendChunk();
            break;
        case 'result-generated':
            $result = $response['payload']['output']['sentence'];
            echo "识别结果: " . $result['text'] . "\n";
            break;
        case 'task-finished':
            echo "任务完成\n";
            $conn->close();
            break;
        case 'task-failed':
            echo "任务失败\n";
            echo "错误代码: " . $response['header']['error_code'] . "\n";
            echo "错误信息: " . $response['header']['error_message'] . "\n";
            $conn->close();
            break;
        case 'error':
            echo "错误: " . $response['payload']['message'] . "\n";
            break;
        default:
            echo "未知事件: " . $response['header']['event'] . "\n";
            break;
    }

    // 如果所有数据已发送且任务已完成,关闭连接
    if ($allChunksSent && $response['header']['event'] == 'task-finished') {
        // 等待1秒以确保所有数据都已传输完毕
        $loop->addTimer(1, function() use ($conn) {
            $conn->close();
            echo "客户端关闭连接\n";
        });
    }
}