实时语音合成-通义千问

更新时间: 2025-08-14 19:52:30

Qwen-TTS Realtime API 具有低延迟语音合成能力,支持文本流式输入与音频流式输出。

相比于Qwen-TTS 模型 API,Realtime API 可以:

  • 流式输入文本

    可无缝对接大模型流式输出,边生成边合成,提升交互式语音应用的实时性。

  • 双向通信

    通过 WebSocket 协议实现文本流式输入与音频流式输出,避免多次建立连接的开销,大幅降低延迟。

模型支持的音色包括Chelsie、Serena、Ethan和Cherry。

Chelsie(女)

Cherry(女)

Ethan(男)

Serena(女)

支持的模型

模型名称

版本

上下文长度

最大输入

最大输出

输入成本

输出成本

免费额度

(注)

(Token数)

(每千Token)

qwen-tts-realtime

当前能力等同 qwen-tts-realtime-2025-07-15

稳定版

8,192

512

7,680

0.0024元

0.012元

各100万Token

有效期:百炼开通后180天内

qwen-tts-realtime-latest

能力始终等同最新快照版

最新版

qwen-tts-realtime-2025-07-15

快照版

访问方式

Qwen-TTS Realtime API 基于WebSocket协议,您可通过不同编程语言的 WebSocket 库连接。

  • 调用地址

    wss://dashscope.aliyuncs.com/api-ws/v1/realtime

  • 查询参数

    查询参数为model,需指定为要访问的model名称,请参见支持的模型

  • 消息头

    使用 Bearer Token 鉴权:Authorization: Bearer DASHSCOPE_API_KEY

    DASHSCOPE_API_KEY 是您在阿里云百炼上申请的API-KEY。

通过以下代码与 Qwen-TTS Realtime API 建立 WebSocket 连接。

建立WebSocket连接

# pip install websocket-client
import json
import websocket
import os

# 若没有配置环境变量,请用百炼API Key将下行替换为:API_KEY="sk-xxx",
API_KEY=os.getenv("DASHSCOPE_API_KEY")
API_URL = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen-tts-realtime"

headers = [
    "Authorization: Bearer " + API_KEY
]

def on_open(ws):
    print(f"Connected to server: {API_URL}")
def on_message(ws, message):
    data = json.loads(message)
    print("Received event:", json.dumps(data, indent=2))
def on_error(ws, error):
    print("Error:", error)

ws = websocket.WebSocketApp(
    API_URL,
    header=headers,
    on_open=on_open,
    on_message=on_message,
    on_error=on_error
)

ws.run_forever()

连接后可以接收到以下回调信息:

{
    "event_id": "event_xxx",
    "type": "session.created",
    "session": {
        "object": "realtime.session",
        "mode": "server_commit",
        "model": "qwen-tts-realtime",
        "voice": "Cherry",
        "response_format": "pcm",
        "sample_rate": 24000,
        "id": "sess_xxx"
    }
}

快速开始

Qwen-TTS Realtime API 仅支持 OpenAI 兼容方式调用。您需要获取API Key配置API Key到环境变量

您的 Python 版本需要不低于 3.10。

通过以下步骤快速体验 Realtime API 实时合成音频的功能。

  1. 准备运行环境

    根据您的操作系统安装 pyaudio。

    macOS

    brew install portaudio && pip install pyaudio

    Debian/Ubuntu

    sudo apt-get install python3-pyaudio
    
    或者
    
    pip install pyaudio

    CentOS

    sudo yum install -y portaudio portaudio-devel && pip install pyaudio

    Windows

    pip install pyaudio

    安装完成后,通过 pip 安装 websocket 相关的依赖:

    pip install websocket-client==1.8.0 websockets
  2. 创建客户端

    在本地新建 python 文件,命名为tts_realtime_client.py并复制以下代码到文件中:

    tts_realtime_client.py

    # -- coding: utf-8 --
    
    import asyncio
    import websockets
    import json
    import base64
    import time
    from typing import Optional, Callable, Dict, Any
    from enum import Enum
    
    class SessionMode(Enum):
        SERVER_COMMIT = "server_commit"
        COMMIT = "commit"
    
    class TTSRealtimeClient:
        """
        与 TTS Realtime API 交互的客户端。
    
        该类提供了连接 TTS Realtime API、发送文本数据、获取音频输出以及管理 WebSocket 连接的相关方法。
    
        属性说明:
            base_url (str):
                Realtime API 的基础地址。
            api_key (str):
                用于身份验证的 API Key。
            voice (str):
                服务器合成语音所使用的声音。
            mode (SessionMode):
                会话模式,可选 server_commit 或 commit。
            audio_callback (Callable[[bytes], None]):
                接收音频数据的回调函数。
        """
        def __init__(
            self,
            base_url: str,
            api_key: str,
            voice: str = "Cherry",
            mode: SessionMode = SessionMode.SERVER_COMMIT,
            audio_callback: Optional[Callable[[bytes], None]] = None
        ):
            self.base_url = base_url
            self.api_key = api_key
            self.voice = voice
            self.mode = mode
            self.ws = None
            self.audio_callback = audio_callback
    
            # 当前回复状态
            self._current_response_id = None
            self._current_item_id = None
            self._is_responding = False
    
        async def connect(self) -> None:
            """与 TTS Realtime API 建立 WebSocket 连接。"""
            headers = {
                "Authorization": f"Bearer {self.api_key}"
            }
    
            self.ws = await websockets.connect(self.base_url, additional_headers=headers)
    
            # 设置默认会话配置
            await self.update_session({
                "mode": self.mode.value,
                "voice": self.voice,
                "response_format": "pcm",
                "sample_rate": 24000
            })
    
        async def send_event(self, event) -> None:
            """发送事件到服务器。"""
            event['event_id'] = "event_" + str(int(time.time() * 1000))
            print(f"发送事件: type={event['type']}, event_id={event['event_id']}")
            await self.ws.send(json.dumps(event))
    
        async def update_session(self, config: Dict[str, Any]) -> None:
            """更新会话配置。"""
            event = {
                "type": "session.update",
                "session": config
            }
            print("更新会话配置: ", event)
            await self.send_event(event)
    
        async def append_text(self, text: str) -> None:
            """向 API 发送文本数据。"""
            event = {
                "type": "input_text_buffer.append",
                "text": text
            }
            await self.send_event(event)
    
        async def commit_text_buffer(self) -> None:
            """提交文本缓冲区以触发处理。"""
            event = {
                "type": "input_text_buffer.commit"
            }
            await self.send_event(event)
    
        async def clear_text_buffer(self) -> None:
            """清除文本缓冲区。"""
            event = {
                "type": "input_text_buffer.clear"
            }
            await self.send_event(event)
    
        async def finish_session(self) -> None:
            """结束会话。"""
            event = {
                "type": "session.finish"
            }
            await self.send_event(event)
    
        async def handle_messages(self) -> None:
            """处理来自服务器的消息。"""
            try:
                async for message in self.ws:
                    event = json.loads(message)
                    event_type = event.get("type")
                    
                    if event_type != "response.audio.delta":
                        print(f"收到事件: {event_type}")
                    
                    if event_type == "error":
                        print("错误: ", event.get('error', {}))
                        continue
                    elif event_type == "session.created":
                        print("会话创建,ID: ", event.get('session', {}).get('id'))
                    elif event_type == "session.updated":
                        print("会话更新,ID: ", event.get('session', {}).get('id'))
                    elif event_type == "input_text_buffer.committed":
                        print("文本缓冲区已提交,项目ID: ", event.get('item_id'))
                    elif event_type == "input_text_buffer.cleared":
                        print("文本缓冲区已清除")
                    elif event_type == "response.created":
                        self._current_response_id = event.get("response", {}).get("id")
                        self._is_responding = True
                        print("响应已创建,ID: ", self._current_response_id)
                    elif event_type == "response.output_item.added":
                        self._current_item_id = event.get("item", {}).get("id")
                        print("输出项已添加,ID: ", self._current_item_id)
                    # 处理音频增量
                    elif event_type == "response.audio.delta" and self.audio_callback:
                        audio_bytes = base64.b64decode(event.get("delta", ""))
                        self.audio_callback(audio_bytes)
                    elif event_type == "response.audio.done":
                        print("音频生成完成")
                    elif event_type == "response.done":
                        self._is_responding = False
                        self._current_response_id = None
                        self._current_item_id = None
                        print("响应完成")
                    elif event_type == "session.finished":
                        print("会话已结束")
    
            except websockets.exceptions.ConnectionClosed:
                print("连接已关闭")
            except Exception as e:
                print("消息处理出错: ", str(e))
    
        async def close(self) -> None:
            """关闭 WebSocket 连接。"""
            if self.ws:
                await self.ws.close()
  3. 选择语音合成模式

    Realtime API 支持以下两种模式:

    • server_commit 模式

      客户端仅发送文本。服务端会智能判断文本分段方式与合成时机。适合低延迟且无需手动控制合成节奏的场景,例如 GPS 导航。

    • commit 模式

      客户端先将文本添加至缓冲区,再主动触发服务端合成指定文本。适合需精细控制断句和停顿的场景,例如新闻播报。

    server_commit 模式

    tts_realtime_client.py的同级目录下新建另一个 python 文件,命名为server_commit.py,并将以下代码复制进文件中:

    server_commit.py

    import os
    import asyncio
    import logging
    import wave
    from tts_realtime_client import TTSRealtimeClient, SessionMode
    import pyaudio
    
    # QwenTTS 服务配置
    URL = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen-tts-realtime"
    # 若没有配置环境变量,请用百炼API Key将下行替换为:API_KEY="sk-xxx",
    API_KEY = os.getenv("DASHSCOPE_API_KEY")
    
    if not API_KEY:
        raise ValueError("Please set DASHSCOPE_API_KEY environment variable")
    
    # 收集音频数据
    _audio_chunks = []
    # 实时播放相关
    _AUDIO_SAMPLE_RATE = 24000
    _audio_pyaudio = pyaudio.PyAudio()
    _audio_stream = None  # 将在运行时打开
    
    def _audio_callback(audio_bytes: bytes):
        """TTSRealtimeClient 音频回调: 实时播放并缓存"""
        global _audio_stream
        if _audio_stream is not None:
            try:
                _audio_stream.write(audio_bytes)
            except Exception as exc:
                logging.error(f"PyAudio playback error: {exc}")
        _audio_chunks.append(audio_bytes)
        logging.info(f"Received audio chunk: {len(audio_bytes)} bytes")
    
    def _save_audio_to_file(filename: str = "output.wav", sample_rate: int = 24000) -> bool:
        """将收集到的音频数据保存为 WAV 文件"""
        if not _audio_chunks:
            logging.warning("No audio data to save")
            return False
    
        try:
            audio_data = b"".join(_audio_chunks)
            with wave.open(filename, 'wb') as wav_file:
                wav_file.setnchannels(1)  # 单声道
                wav_file.setsampwidth(2)  # 16-bit
                wav_file.setframerate(sample_rate)
                wav_file.writeframes(audio_data)
            logging.info(f"Audio saved to: {filename}")
            return True
        except Exception as exc:
            logging.error(f"Failed to save audio: {exc}")
            return False
    
    async def _produce_text(client: TTSRealtimeClient):
        """向服务器发送文本片段"""
        text_fragments = [
            "阿里云的大模型服务平台百炼是一站式的大模型开发及应用构建平台。",
            "不论是开发者还是业务人员,都能深入参与大模型应用的设计和构建。", 
            "您可以通过简单的界面操作,在5分钟内开发出一款大模型应用,",
            "或在几小时内训练出一个专属模型,从而将更多精力专注于应用创新。",
        ]
    
        logging.info("Sending text fragments…")
        for text in text_fragments:
            logging.info(f"Sending fragment: {text}")
            await client.append_text(text)
            await asyncio.sleep(0.1)  # 片段间稍作延时
    
        # 等待服务器完成内部处理后结束会话
        await asyncio.sleep(1.0)
        await client.finish_session()
    
    async def _run_demo():
        """运行完整 Demo"""
        global _audio_stream
        # 打开 PyAudio 输出流
        _audio_stream = _audio_pyaudio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=_AUDIO_SAMPLE_RATE,
            output=True,
            frames_per_buffer=1024
        )
    
        client = TTSRealtimeClient(
            base_url=URL,
            api_key=API_KEY,
            voice="Cherry",
            mode=SessionMode.SERVER_COMMIT,
            audio_callback=_audio_callback
        )
    
        # 建立连接
        await client.connect()
    
        # 并行执行消息处理与文本发送
        consumer_task = asyncio.create_task(client.handle_messages())
        producer_task = asyncio.create_task(_produce_text(client))
    
        await producer_task  # 等待文本发送完成
    
        # 额外等待,确保所有音频数据收取完毕
        await asyncio.sleep(5)
    
        # 关闭连接并取消消费者任务
        await client.close()
        consumer_task.cancel()
    
        # 关闭音频流
        if _audio_stream is not None:
            _audio_stream.stop_stream()
            _audio_stream.close()
        _audio_pyaudio.terminate()
    
        # 保存音频数据
        os.makedirs("outputs", exist_ok=True)
        _save_audio_to_file(os.path.join("outputs", "qwen_tts_output.wav"))
    
    def main():
        """同步入口"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s [%(levelname)s] %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        logging.info("Starting QwenTTS Realtime Client demo…")
        asyncio.run(_run_demo())
    
    if __name__ == "__main__":
        main() 

    运行server_commit.py,即可听到 Realtime API 实时生成的音频。

    commit 模式

    tts_realtime_client.py的同级目录下新建另一个 python 文件,命名为commit.py,并将以下代码复制进文件中:

    commit.py

    import os
    import asyncio
    import logging
    import wave
    from tts_realtime_client import TTSRealtimeClient, SessionMode
    import pyaudio
    
    # QwenTTS 服务配置
    URL = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen-tts-realtime"
    # 若没有配置环境变量,请用百炼API Key将下行替换为:API_KEY="sk-xxx",
    API_KEY = os.getenv("DASHSCOPE_API_KEY")
    
    if not API_KEY:
        raise ValueError("Please set DASHSCOPE_API_KEY environment variable")
    
    # 收集音频数据
    _audio_chunks = []
    _AUDIO_SAMPLE_RATE = 24000
    _audio_pyaudio = pyaudio.PyAudio()
    _audio_stream = None
    
    def _audio_callback(audio_bytes: bytes):
        """TTSRealtimeClient 音频回调: 实时播放并缓存"""
        global _audio_stream
        if _audio_stream is not None:
            try:
                _audio_stream.write(audio_bytes)
            except Exception as exc:
                logging.error(f"PyAudio playback error: {exc}")
        _audio_chunks.append(audio_bytes)
        logging.info(f"Received audio chunk: {len(audio_bytes)} bytes")
    
    def _save_audio_to_file(filename: str = "output.wav", sample_rate: int = 24000) -> bool:
        """将收集到的音频数据保存为 WAV 文件"""
        if not _audio_chunks:
            logging.warning("No audio data to save")
            return False
    
        try:
            audio_data = b"".join(_audio_chunks)
            with wave.open(filename, 'wb') as wav_file:
                wav_file.setnchannels(1)  # 单声道
                wav_file.setsampwidth(2)  # 16-bit
                wav_file.setframerate(sample_rate)
                wav_file.writeframes(audio_data)
            logging.info(f"Audio saved to: {filename}")
            return True
        except Exception as exc:
            logging.error(f"Failed to save audio: {exc}")
            return False
    
    async def _user_input_loop(client: TTSRealtimeClient):
        """持续获取用户输入并发送文本,当用户输入空文本时发送commit事件并结束本次会话"""
        print("请输入文本(直接按Enter发送commit事件并结束本次会话,按Ctrl+C或Ctrl+D结束整个程序):")
        
        while True:
            try:
                user_text = input("> ")
                if not user_text:  # 用户输入为空
                    # 空输入视为一次对话的结束: 提交缓冲区 -> 结束会话 -> 跳出循环
                    logging.info("空输入,发送 commit 事件并结束本次会话")
                    await client.commit_text_buffer()
                    # 适当等待服务器处理 commit,防止过早结束会话导致丢失音频
                    await asyncio.sleep(0.3)
                    await client.finish_session()
                    break  # 直接退出用户输入循环,无需再次回车
                else:
                    logging.info(f"发送文本: {user_text}")
                    await client.append_text(user_text)
                    
            except EOFError:  # 用户按下Ctrl+D
                break
            except KeyboardInterrupt:  # 用户按下Ctrl+C
                break
        
        # 结束会话
        logging.info("结束会话...")
    async def _run_demo():
        """运行完整 Demo"""
        global _audio_stream
        # 打开 PyAudio 输出流
        _audio_stream = _audio_pyaudio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=_AUDIO_SAMPLE_RATE,
            output=True,
            frames_per_buffer=1024
        )
    
        client = TTSRealtimeClient(
            base_url=URL,
            api_key=API_KEY,
            voice="Cherry",
            mode=SessionMode.COMMIT,  # 修改为COMMIT模式
            audio_callback=_audio_callback
        )
    
        # 建立连接
        await client.connect()
    
        # 并行执行消息处理与用户输入
        consumer_task = asyncio.create_task(client.handle_messages())
        producer_task = asyncio.create_task(_user_input_loop(client))
    
        await producer_task  # 等待用户输入完成
    
        # 额外等待,确保所有音频数据收取完毕
        await asyncio.sleep(5)
    
        # 关闭连接并取消消费者任务
        await client.close()
        consumer_task.cancel()
    
        # 关闭音频流
        if _audio_stream is not None:
            _audio_stream.stop_stream()
            _audio_stream.close()
        _audio_pyaudio.terminate()
    
        # 保存音频数据
        os.makedirs("outputs", exist_ok=True)
        _save_audio_to_file(os.path.join("outputs", "qwen_tts_output.wav"))
    
    def main():
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s [%(levelname)s] %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        logging.info("Starting QwenTTS Realtime Client demo…")
        asyncio.run(_run_demo())
    
    if __name__ == "__main__":
        main() 

    运行commit.py,可多次输入要合成的文本。在未输入文本的情况下单击 Enter 键,您将从扬声器听到 Realtime API 返回的音频。

交互流程

server_commit 模式

session.update事件的session.mode 设为"server_commit"以启用该模式,服务端会智能处理文本分段和合成时机。

交互流程如下:

  1. 客户端发送session.update事件,服务端响应session.createdsession.updated事件。

  2. 客户端发送 input_text_buffer.append 事件追加文本至服务端缓冲区。

  3. 服务端智能处理文本分段和合成时机,并返回response.createdresponse.output_item.addedresponse.content_part.addedresponse.audio.delta事件。

  4. 服务端响应完成后响应response.audio.doneresponse.content_part.doneresponse.output_item.doneresponse.done

  5. 服务端响应session.finished来结束会话。

生命周期

客户端事件

服务器事件

会话初始化

session.update

会话配置

session.created

会话已创建

session.updated

会话配置已更新

用户文本输入

input_text_buffer.append

添加文本到服务端

input_text_buffer.commit

立即合成服务端缓存的文本

session.finish

通知服务端不再有文本输入

input_text_buffer.committed

服务端收到提交的文本

服务器音频输出

response.created

服务端开始生成响应

response.output_item.added

响应时有新的输出内容

response.content_part.added

新的输出内容添加到assistant message

response.audio.delta

模型增量生成的音频

response.content_part.done

Assistant mesasge 的文本或音频内容流式输出完成

response.output_item.done

Assistant mesasge 的整个输出项流式传输完成

response.audio.done

音频生成完成

response.done

响应完成

commit 模式

session.update事件的session.mode 设为"commit"以启用该模式,客户端需主动提交文本缓冲区至服务端来获取响应。

交互流程如下:

  1. 客户端发送session.update事件,服务端响应session.createdsession.updated事件。

  2. 客户端发送 input_text_buffer.append 事件追加文本至服务端缓冲区。

  3. 客户端发送input_text_buffer.commit事件将缓冲区提交至服务端,并发送 session.finish事件表示后续无文本输入。

  4. 服务端响应response.created,开始生成响应。

  5. 服务端响应response.output_item.addedresponse.content_part.addedresponse.audio.delta事件。

  6. 服务端响应完成后返回response.audio.doneresponse.content_part.doneresponse.output_item.doneresponse.done

  7. 服务端响应session.finished来结束会话。

生命周期

客户端事件

服务器事件

会话初始化

session.update

会话配置

session.created

会话已创建

session.updated

会话配置已更新

用户文本输入

input_text_buffer.append

添加文本到缓冲区

input_text_buffer.commit

提交缓冲区到服务端

input_text_buffer.clear

清除缓冲区

input_text_buffer.commited

服务器收到提交的文本

服务器音频输出

response.created

服务端开始生成响应

response.output_item.added

响应时有新的输出内容

response.content_part.added

新的输出内容添加到assistant message

response.audio.delta

模型增量生成的音频

response.content_part.done

Assistant mesasge 的文本或音频内容流式输出完成

response.output_item.done

Assistant mesasge 的整个输出项流式传输完成

response.audio.done

音频生成完成

response.done

响应完成
上一篇: 语音合成-通义千问 下一篇: 用户指南(应用)