实时多模态

Qwen-Omni 实时 API 提供了低延迟的多模态交互能力,支持音频的流式输入,并能够流式输出文本和音频。

在线体验请参见如何在线体验 Qwen-Omni-Realtime 模型?

相比于 Qwen-Omni 模型,Qwen-Omni 实时模型可以:

  • 音频流式输入:Qwen-Omni 模型只能接收音频文件作为输入,而 Qwen-Omni 实时模型可以实时接收音频流;

  • 语音活动检测:Qwen-Omni 实时模型内置 VAD(Voice Activity Detection,语音活动检测)功能,可自动检测用户语音的开始和结束;

模型支持的音色包括Chelsie、Serena、EthanCherry。

支持的模型

模型名称

版本

上下文长度

最大输入

最大输出

免费额度

(注)

(Token数)

qwen-omni-turbo-realtime

当前能力等同 qwen-omni-turbo-realtime-2025-05-08

稳定版

32,768

30,720

2,048

100Token(不区分模态)

有效期:百炼开通后180天内

qwen-omni-turbo-realtime-latest

能力始终等同最新快照版

最新版

qwen-omni-turbo-realtime-2025-05-08

快照版

免费额度用完后,输入与输出的计费规则如下:

输入计费项

单价(每千 Token)

输入:文本

0.0016

输入:音频

0.025

输入:图片

0.006

输出计费项

单价(每千 Token)

输出:文本

0.0064元(输入仅包含文本时)

0.018元(输入包含图片/音频时)

输出:文本+音频

0.05元(音频)

输出的文本不计费。

访问方式

Qwen-Omni 实时 API 基于WebSocket协议,您可以通过不同编程语言的 WebSocket 库来实现。

配置项

说明

调用地址

需要配置为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime

查询参数

查询参数为model,需指定为要访问的model名称,请参见支持的模型

消息头

使用 Bearer Token 鉴权:Authorization: Bearer DASHSCOPE_API_KEY

DASHSCOPE_API_KEY 是您在百炼上申请的API-KEY。

您可以通过以下代码与 Qwen-Omni 实时 API服务建立WebSocket连接。

建立WebSocket连接

# pip install websocket-client
import json
import websocket
import os

API_KEY=os.getenv("DASHSCOPE_API_KEY")
API_URL = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen-omni-turbo-realtime"

headers = [
    "Authorization: Bearer " + API_KEY
]

def on_open(ws):
    print(f"Connected to server: {API_URL}")
def on_message(ws, message):
    data = json.loads(message)
    print("Received event:", json.dumps(data, indent=2))
def on_error(ws, error):
    print("Error:", error)

ws = websocket.WebSocketApp(
    API_URL,
    header=headers,
    on_open=on_open,
    on_message=on_message,
    on_error=on_error
)

ws.run_forever()

连接后可以接收到以下回调信息:

{
    "event_id": "event_Bj1ixvmDpFda3ahThkOSz",
    "type": "session.created",
    "session": {
        "object": "realtime.session",
        "model": "qwen-omni-turbo-realtime",
        "modalities": [
            "text",
            "audio"
        ],
        "instructions": "...model instructions here...",
        "voice": "Chelsie",
        "input_audio_format": "pcm16",
        "output_audio_format": "pcm16",
        "input_audio_transcription": {
            "model": "gummy-realtime-v1"
        },
        "turn_detection": {
            "type": "server_vad",
            "threshold": 0.5,
            "prefix_padding_ms": 300,
            "silence_duration_ms": 800,
            "create_response": true,
            "interrupt_response": true
        },
        "tools": [],
        "tool_choice": "auto",
        "temperature": 0.8,
        "max_response_output_tokens": "inf",
        "id": "sess_JSqGSEx9RRfReAIWNegSp"
    }
}

快速开始

Qwen-Omni-Realtime系列模型仅支持 OpenAI 兼容方式调用。您需要获取API Key配置API Key到环境变量

您的 Python 版本需要不低于 3.10。

通过以下步骤,您可以快速体验与 Realtime 模型实时对话的功能。

  • 准备运行环境

    根据您的操作系统来安装 pyaudio。

    macOS

    brew install portaudio && pip install pyaudio

    Debian/Ubuntu

    sudo apt-get install python3-pyaudio
    
    或者
    
    pip install pyaudio

    CentOS

    sudo yum install -y portaudio portaudio-devel && pip install pyaudio

    Windows

    pip install pyaudio

    安装完成后,通过 pip 安装 websocket 相关的依赖:

    pip install websocket-client==1.8.0 websockets
  • 创建客户端

    在本地新建一个 python 文件,命名为omni_realtime_client.py,并将以下代码复制进文件中:

    omni_realtime_client.py

    # -- coding: utf-8 --
    
    import asyncio
    import websockets
    import json
    import base64
    import time
    from typing import Optional, Callable, List, Dict, Any
    from enum import Enum
    
    class TurnDetectionMode(Enum):
        SERVER_VAD = "server_vad"
        MANUAL = "manual"
    
    class OmniRealtimeClient:
        """
        与 Omni Realtime API 交互的演示客户端。
    
        该类提供了连接 Realtime API、发送文本和音频数据、处理响应以及管理 WebSocket 连接的相关方法。
    
        属性说明:
            base_url (str):
                Realtime API 的基础地址。
            api_key (str):
                用于身份验证的 API Key。
            model (str):
                用于聊天的 Omni 模型名称。
            voice (str):
                服务器合成语音所使用的声音。
            turn_detection_mode (TurnDetectionMode):
                轮次检测模式。
            on_text_delta (Callable[[str], None]):
                文本增量回调函数。
            on_audio_delta (Callable[[bytes], None]):
                音频增量回调函数。
            on_input_transcript (Callable[[str], None]):
                输入转录文本回调函数。
            on_interrupt (Callable[[], None]):
                用户打断回调函数,应在此停止音频播放。
            on_output_transcript (Callable[[str], None]):
                输出转录文本回调函数。
            extra_event_handlers (Dict[str, Callable[[Dict[str, Any]], None]]):
                其他事件处理器,事件名到处理函数的映射。
        """
        def __init__(
            self,
            base_url,
            api_key: str,
            model: str = "",
            voice: str = "Ethan",
            turn_detection_mode: TurnDetectionMode = TurnDetectionMode.MANUAL,
            on_text_delta: Optional[Callable[[str], None]] = None,
            on_audio_delta: Optional[Callable[[bytes], None]] = None,
            on_interrupt: Optional[Callable[[], None]] = None,
            on_input_transcript: Optional[Callable[[str], None]] = None,
            on_output_transcript: Optional[Callable[[str], None]] = None,
            extra_event_handlers: Optional[Dict[str, Callable[[Dict[str, Any]], None]]] = None
        ):
            self.base_url = base_url
            self.api_key = api_key
            self.model = model
            self.voice = voice
            self.ws = None
            self.on_text_delta = on_text_delta
            self.on_audio_delta = on_audio_delta
            self.on_interrupt = on_interrupt
            self.on_input_transcript = on_input_transcript
            self.on_output_transcript = on_output_transcript
            self.turn_detection_mode = turn_detection_mode
            self.extra_event_handlers = extra_event_handlers or {}
    
            # 当前回复状态
            self._current_response_id = None
            self._current_item_id = None
            self._is_responding = False
            # 输入/输出转录打印状态
            self._print_input_transcript = False
            self._output_transcript_buffer = ""
    
        async def connect(self) -> None:
            """与 Realtime API 建立 WebSocket 连接。"""
            url = f"{self.base_url}?model={self.model}"
            headers = {
                "Authorization": f"Bearer {self.api_key}"
            }
    
            self.ws = await websockets.connect(url, additional_headers=headers)
    
            # 设置默认会话配置
            if self.turn_detection_mode == TurnDetectionMode.MANUAL:
                await self.update_session({
                    "modalities": ["text", "audio"],
                    "voice": self.voice,
                    "input_audio_format": "pcm16",
                    "output_audio_format": "pcm16",
                    "input_audio_transcription": {
                        "model": "gummy-realtime-v1"
                    },
                    "turn_detection" : None
                })
            elif self.turn_detection_mode == TurnDetectionMode.SERVER_VAD:
                await self.update_session({
                    "modalities": ["text", "audio"],
                    "voice": self.voice,
                    "input_audio_format": "pcm16",
                    "output_audio_format": "pcm16",
                    "input_audio_transcription": {
                        "model": "gummy-realtime-v1"
                    },
                    "turn_detection": {
                        "type": "server_vad",
                        "threshold": 0.1,
                        "prefix_padding_ms": 500,
                        "silence_duration_ms": 900
                    }
                })
            else:
                raise ValueError(f"Invalid turn detection mode: {self.turn_detection_mode}")
    
        async def send_event(self, event) -> None:
            event['event_id'] = "event_" + str(int(time.time() * 1000))
            print(f" Send event: type={event['type']}, event_id={event['event_id']}")
            await self.ws.send(json.dumps(event))
    
        async def update_session(self, config: Dict[str, Any]) -> None:
            """更新会话配置。"""
            event = {
                "type": "session.update",
                "session": config
            }
            print("update session: ", event)
            await self.send_event(event)
    
        async def stream_audio(self, audio_chunk: bytes) -> None:
            """向 API 流式发送原始音频数据。"""
            # 仅支持 16bit 16kHz 单声道 PCM
            audio_b64 = base64.b64encode(audio_chunk).decode()
    
            append_event = {
                "type": "input_audio_buffer.append",
                "audio": audio_b64
            }
            await self.send_event(append_event)
    
        async def commit_audio_buffer(self) -> None:
            """提交音频缓冲区以触发处理。"""
            event = {
                "type": "input_audio_buffer.commit"
            }
            await self.send_event(event)
    
        async def append_image(self, image_chunk: bytes) -> None:
            """向视频缓冲区追加图像数据。
            图像数据可以来自本地文件,也可以来自实时视频流。
    
            注意:
                - 图像格式必须为 JPG 或 JPEG。推荐分辨率为 480P 或 720P,最高支持 1080P。
                - 单张图片大小不应超过 500KB。
                - 本方法会将图像数据编码为 Base64 后再发送。
                - 建议以每秒 2 帧的频率向服务器发送图像。
                - 在发送图像数据之前,需要先发送音频数据。
            """
            image_b64 = base64.b64encode(image_chunk).decode()
    
            event = {
                "type": "input_image_buffer.append",
                "image": image_b64
            }
            await self.send_event(event)
    
        async def create_response(self) -> None:
            """向 API 请求生成回复(仅在手动模式下需要调用)。"""
            event = {
                "type": "response.create",
                "response": {
                    "instructions": "You are a helpful assistant.",
                    "modalities": ["text", "audio"]
                }
            }
            print("create response: ", event)
            await self.send_event(event)
    
        async def cancel_response(self) -> None:
            """取消当前回复。"""
            event = {
                "type": "response.cancel"
            }
            await self.send_event(event)
    
        async def handle_interruption(self):
            """处理用户对当前回复的打断。"""
            if not self._is_responding:
                return
    
            print(" Handling interruption")
    
            # 1. 取消当前回复
            if self._current_response_id:
                await self.cancel_response()
    
            self._is_responding = False
            self._current_response_id = None
            self._current_item_id = None
    
        async def handle_messages(self) -> None:
            try:
                async for message in self.ws:
                    event = json.loads(message)
                    event_type = event.get("type")
                    
                    if event_type != "response.audio.delta":
                        print(" event: ", event)
                    else:
                        print(" event_type: ", event_type)
    
                    if event_type == "error":
                        print(" Error: ", event['error'])
                        continue
                    elif event_type == "response.created":
                        self._current_response_id = event.get("response", {}).get("id")
                        self._is_responding = True
                    elif event_type == "response.output_item.added":
                        self._current_item_id = event.get("item", {}).get("id")
                    elif event_type == "response.done":
                        self._is_responding = False
                        self._current_response_id = None
                        self._current_item_id = None
                    # Handle interruptions
                    elif event_type == "input_audio_buffer.speech_started":
                        print(" Speech detected")
                        if self._is_responding:
                            print(" Handling interruption")
                            await self.handle_interruption()
    
                        if self.on_interrupt:
                            print(" Handling on_interrupt, stop playback")
                            self.on_interrupt()
                    elif event_type == "input_audio_buffer.speech_stopped":
                        print(" Speech ended")
                    # Handle normal response events
                    elif event_type == "response.text.delta":
                        if self.on_text_delta:
                            self.on_text_delta(event["delta"])
                    elif event_type == "response.audio.delta":
                        if self.on_audio_delta:
                            audio_bytes = base64.b64decode(event["delta"])
                            self.on_audio_delta(audio_bytes)
                    elif event_type == "conversation.item.input_audio_transcription.completed":
                        transcript = event.get("transcript", "")
                        if self.on_input_transcript:
                            await asyncio.to_thread(self.on_input_transcript,transcript)
                            self._print_input_transcript = True
                    elif event_type == "response.audio_transcript.delta":
                        if self.on_output_transcript:
                            delta = event.get("delta", "")
                            if not self._print_input_transcript:
                                self._output_transcript_buffer += delta
                            else:
                                if self._output_transcript_buffer:
                                    await asyncio.to_thread(self.on_output_transcript,self._output_transcript_buffer)
                                    self._output_transcript_buffer = ""
                                await asyncio.to_thread(self.on_output_transcript,delta)
                    elif event_type == "response.audio_transcript.done":
                        self._print_input_transcript = False
                    elif event_type in self.extra_event_handlers:
                        self.extra_event_handlers[event_type](event)
    
            except websockets.exceptions.ConnectionClosed:
                print(" Connection closed")
            except Exception as e:
                print(" Error in message handling: ", str(e))
    
        async def close(self) -> None:
            """关闭 WebSocket 连接。"""
            if self.ws:
                await self.ws.close()
  • 选择交互模式

    • VAD 模式(Voice Activity Detection,自动检测语音起止)

      Realtime API 自动判断用户何时开始与停止说话并作出回应。

    • Manual 模式(按下即说,松开即发送)

      客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。

    VAD 模式

    omni_realtime_client.py的同级目录下新建另一个 python 文件,命名为vad_mode.py,并将以下代码复制进文件中:

    vad_mode.py

    # -- coding: utf-8 --
    import os, time, base64, asyncio
    from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
    import pyaudio
    import queue
    import threading
    
    # 创建一个全局音频队列和播放线程
    audio_queue = queue.Queue()
    audio_player = None
    # 添加全局中断标志
    interrupt_flag = threading.Event()
    
    # 初始化PyAudio
    p = pyaudio.PyAudio()
    RATE = 24000  # 采样率 24kHz
    CHUNK = 3200  # 每个音频块的大小
    FORMAT = pyaudio.paInt16  # 16PCM格式
    CHANNELS = 1  # 单声道
    
    def clear_audio_queue():
        """清空音频队列"""
        with audio_queue.mutex:
            audio_queue.queue.clear()
        print("音频队列已清空")
    
    def handle_interrupt():
        """处理中断事件 - 立即停止音频播放"""
        print("检测到语音输入,停止音频播放")
        interrupt_flag.set()  # 设置中断标志
        clear_audio_queue()  # 清空队列
    
    def audio_player_thread():
        """后台线程用于播放音频数据"""
        stream = p.open(
            format=FORMAT,
            channels=CHANNELS,
            rate=24000,
            output=True,
            frames_per_buffer=CHUNK,
        )
    
        try:
            while True:
                try:
                    # 检查中断标志
                    if interrupt_flag.is_set():
                        print("音频播放被中断")
                        interrupt_flag.clear()  # 清除中断标志
                        continue
                    
                    # 从队列获取音频数据
                    audio_data = audio_queue.get(block=True, timeout=0.5)
                    if audio_data is None:  # 结束信号
                        break
                    
                    # 再次检查中断标志(在播放前)
                    if interrupt_flag.is_set():
                        print("音频播放被中断")
                        interrupt_flag.clear()  # 清除中断标志
                        audio_queue.task_done()
                        continue
                    
                    # 播放音频数据
                    stream.write(audio_data)
                    audio_queue.task_done()
                except queue.Empty:
                    # 如果队列为空,继续等待
                    continue
        finally:
            # 清理
            stream.stop_stream()
            stream.close()
    
    
    def start_audio_player():
        """启动音频播放线程"""
        global audio_player
        if audio_player is None or not audio_player.is_alive():
            audio_player = threading.Thread(target=audio_player_thread, daemon=True)
            audio_player.start()
    
    
    def handle_audio_data(audio_data):
        """处理接收到的音频数据"""
        # 打印接收到的音频数据长度(调试用)
        print(f"\n接收到音频数据: {len(audio_data)} 字节")
        # 将音频数据放入队列
        audio_queue.put(audio_data)
    
    
    async def start_microphone_streaming(client: OmniRealtimeClient):
        CHUNK = 3200
        FORMAT = pyaudio.paInt16
        CHANNELS = 1
        RATE = 24000
    
        p = pyaudio.PyAudio()
        stream = p.open(
            format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK
        )
    
        try:
            print("开始录音,请讲话...")
            while True:
                audio_data = stream.read(CHUNK)
                encoded_data = base64.b64encode(audio_data).decode("utf-8")
    
                eventd = {
                    "event_id": "event_" + str(int(time.time() * 1000)),
                    "type": "input_audio_buffer.append",
                    "audio": encoded_data,
                }
                await client.send_event(eventd)
    
                # 保持较短的等待时间以模拟实时交互
                await asyncio.sleep(0.05)
        finally:
            stream.stop_stream()
            stream.close()
            p.terminate()
    
    
    async def main():
        # 启动音频播放线程
        start_audio_player()
    
        realtime_client = OmniRealtimeClient(
            base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime",
            api_key=os.environ.get("DASHSCOPE_API_KEY"),
            model="qwen-omni-turbo-realtime",
            voice="Chelsie",
            on_text_delta=lambda text: print(f"\nAssistant: {text}", end="", flush=True),
            on_audio_delta=handle_audio_data,
            on_interrupt=handle_interrupt,  # 添加中断回调函数
            turn_detection_mode=TurnDetectionMode.SERVER_VAD,
        )
    
        try:
            await realtime_client.connect()
            # 启动消息处理和麦克风录音
            message_handler = asyncio.create_task(realtime_client.handle_messages())
            streaming_task = asyncio.create_task(
                start_microphone_streaming(realtime_client)
            )
    
            while True:
                await asyncio.Queue().get()
        except Exception as e:
            print(f"Error: {e}")
        finally:
            # 结束音频播放线程
            audio_queue.put(None)
            if audio_player:
                audio_player.join(timeout=1)
            await realtime_client.close()
            p.terminate()
    
    if __name__ == "__main__":
        asyncio.run(main())

    运行vad_mode.py,通过麦克风即可与 Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。

    Manual 模式

    omni_realtime_client.py的同级目录下新建另一个 python 文件,命名为manual_mode.py,并将以下代码复制进文件中:

    manual_mode.py

    # -- coding: utf-8 --
    import os
    import asyncio
    import time
    import threading
    import queue
    import pyaudio
    from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
    
    class AudioPlayer:
        """实时音频播放器类"""
        def __init__(self, sample_rate=24000, channels=1, sample_width=2):
            self.sample_rate = sample_rate
            self.channels = channels
            self.sample_width = sample_width  # 2 bytes for 16-bit
            self.audio_queue = queue.Queue()
            self.is_playing = False
            self.play_thread = None
            self.pyaudio_instance = None
            self.stream = None
            self._lock = threading.Lock()  # 添加锁来同步访问
            self._last_data_time = time.time()  # 记录最后接收数据的时间
            self._response_done = False  # 添加响应完成标志
            self._waiting_for_response = False # 标记是否正在等待服务器响应
            # 记录最后一次向音频流写入数据的时间及最近一次音频块的时长,用于更精确地判断播放结束
            self._last_play_time = time.time()
            self._last_chunk_duration = 0.0
            
        def start(self):
            """启动音频播放器"""
            with self._lock:
                if self.is_playing:
                    return
                    
                self.is_playing = True
                
                try:
                    self.pyaudio_instance = pyaudio.PyAudio()
                    
                    # 创建音频输出流
                    self.stream = self.pyaudio_instance.open(
                        format=pyaudio.paInt16,  # 16-bit
                        channels=self.channels,
                        rate=self.sample_rate,
                        output=True,
                        frames_per_buffer=1024
                    )
                    
                    # 启动播放线程
                    self.play_thread = threading.Thread(target=self._play_audio)
                    self.play_thread.daemon = True
                    self.play_thread.start()
                    
                    print("音频播放器已启动")
                except Exception as e:
                    print(f"启动音频播放器失败: {e}")
                    self._cleanup_resources()
                    raise
        
        def stop(self):
            """停止音频播放器"""
            with self._lock:
                if not self.is_playing:
                    return
                    
                self.is_playing = False
            
            # 清空队列
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except queue.Empty:
                    break
            
            # 等待播放线程结束(在锁外面等待,避免死锁)
            if self.play_thread and self.play_thread.is_alive():
                self.play_thread.join(timeout=2.0)
            
            # 再次获取锁来清理资源
            with self._lock:
                self._cleanup_resources()
            
            print("音频播放器已停止")
        
        def _cleanup_resources(self):
            """清理音频资源(必须在锁内调用)"""
            try:
                # 关闭音频流
                if self.stream:
                    if not self.stream.is_stopped():
                        self.stream.stop_stream()
                    self.stream.close()
                    self.stream = None
            except Exception as e:
                print(f"关闭音频流时出错: {e}")
            
            try:
                if self.pyaudio_instance:
                    self.pyaudio_instance.terminate()
                    self.pyaudio_instance = None
            except Exception as e:
                print(f"终止PyAudio时出错: {e}")
        
        def add_audio_data(self, audio_data):
            """添加音频数据到播放队列"""
            if self.is_playing and audio_data:
                self.audio_queue.put(audio_data)
                with self._lock:
                    self._last_data_time = time.time()  # 更新最后接收数据的时间
                    self._waiting_for_response = False # 收到数据,不再等待
        
        def stop_receiving_data(self):
            """标记不再接收新的音频数据"""
            with self._lock:
                self._response_done = True
                self._waiting_for_response = False # 响应结束,不再等待
        
        def prepare_for_next_turn(self):
            """为下一轮对话重置播放器状态。"""
            with self._lock:
                self._response_done = False
                self._last_data_time = time.time()
                self._last_play_time = time.time()
                self._last_chunk_duration = 0.0
                self._waiting_for_response = True # 开始等待下一轮响应
            
            # 清空上一轮可能残留的音频数据
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except queue.Empty:
                    break
    
        def is_finished_playing(self):
            """检查是否已经播放完所有音频数据"""
            with self._lock:
                queue_size = self.audio_queue.qsize()
                time_since_last_data = time.time() - self._last_data_time
                time_since_last_play = time.time() - self._last_play_time
                
                # ---------------------- 智能结束判定 ----------------------
                # 1. 首选:如果服务器已标记完成且播放队列为空
                #    进一步等待最近一块音频播放完毕(音频块时长 + 0.1s 容错)。
                if self._response_done and queue_size == 0:
                    min_wait = max(self._last_chunk_duration + 0.1, 0.5)  # 至少等待 0.5s
                    if time_since_last_play >= min_wait:
                        return True
    
                # 2. 备用:如果长时间没有新数据且播放队列为空
                #    当服务器没有明确发出 `response.done` 时,此逻辑作为保障
                if not self._waiting_for_response and queue_size == 0 and time_since_last_data > 1.0:
                    print("\n(超时未收到新音频,判定播放结束)")
                    return True
                
                return False
        
        def _play_audio(self):
            """播放音频数据的工作线程"""
            while True:
                # 检查是否应该停止
                with self._lock:
                    if not self.is_playing:
                        break
                    stream_ref = self.stream  # 获取流的引用
                
                try:
                    # 从队列中获取音频数据,超时0.1秒
                    audio_data = self.audio_queue.get(timeout=0.1)
                    
                    # 再次检查状态和流的有效性
                    with self._lock:
                        if self.is_playing and stream_ref and not stream_ref.is_stopped():
                            try:
                                # 播放音频数据
                                stream_ref.write(audio_data)
                                # 更新最近播放信息
                                self._last_play_time = time.time()
                                self._last_chunk_duration = len(audio_data) / (self.channels * self.sample_width) / self.sample_rate
                            except Exception as e:
                                print(f"写入音频流时出错: {e}")
                                break
                        
                    # 标记该数据块已处理完成
                    self.audio_queue.task_done()
    
                except queue.Empty:
                    # 队列为空时继续等待
                    continue
                except Exception as e:
                    print(f"播放音频时出错: {e}")
                    break
    
    class MicrophoneRecorder:
        """实时麦克风录音器"""
        def __init__(self, sample_rate=16000, channels=1, chunk_size=1024):
            self.sample_rate = sample_rate
            self.channels = channels
            self.chunk_size = chunk_size
            self.pyaudio_instance = None
            self.stream = None
            self.frames = []
            self._is_recording = False
            self._record_thread = None
    
        def _recording_thread(self):
            """录音工作线程"""
            # 在 _is_recording 为 True 期间,持续从音频流中读取数据
            while self._is_recording:
                try:
                    # 使用 exception_on_overflow=False 避免因缓冲区溢出而崩溃
                    data = self.stream.read(self.chunk_size, exception_on_overflow=False)
                    self.frames.append(data)
                except (IOError, OSError) as e:
                    # 当流被关闭时,读取操作可能会引发错误
                    print(f"录音流读取错误,可能已关闭: {e}")
                    break
    
        def start(self):
            """开始录音"""
            if self._is_recording:
                print("录音已在进行中。")
                return
            
            self.frames = []
            self._is_recording = True
            
            try:
                self.pyaudio_instance = pyaudio.PyAudio()
                self.stream = self.pyaudio_instance.open(
                    format=pyaudio.paInt16,
                    channels=self.channels,
                    rate=self.sample_rate,
                    input=True,
                    frames_per_buffer=self.chunk_size
                )
                
                self._record_thread = threading.Thread(target=self._recording_thread)
                self._record_thread.daemon = True
                self._record_thread.start()
                print("麦克风录音已开始...")
            except Exception as e:
                print(f"启动麦克风失败: {e}")
                self._is_recording = False
                self._cleanup()
                raise
    
        def stop(self):
            """停止录音并返回音频数据"""
            if not self._is_recording:
                return None
                
            self._is_recording = False
            
            # 等待录音线程安全退出
            if self._record_thread:
                self._record_thread.join(timeout=1.0)
            
            self._cleanup()
            
            print("麦克风录音已停止。")
            return b''.join(self.frames)
    
        def _cleanup(self):
            """安全地清理 PyAudio 资源"""
            if self.stream:
                try:
                    if self.stream.is_active():
                        self.stream.stop_stream()
                    self.stream.close()
                except Exception as e:
                    print(f"关闭音频流时出错: {e}")
            
            if self.pyaudio_instance:
                try:
                    self.pyaudio_instance.terminate()
                except Exception as e:
                    print(f"终止 PyAudio 实例时出错: {e}")
    
            self.stream = None
            self.pyaudio_instance = None
    
    async def interactive_test():
        """
        交互式测试脚本:允许多轮连续对话,每轮可以发送音频和图片。
        """
        # ------------------- 1. 初始化和连接 (一次性) -------------------
        api_key = os.environ.get("DASHSCOPE_API_KEY")
        if not api_key:
            print("请设置DASHSCOPE_API_KEY环境变量")
            return
    
        print("--- 实时多轮音视频对话客户端 ---")
        print("正在初始化音频播放器和客户端...")
        
        audio_player = AudioPlayer()
        audio_player.start()
    
        def on_audio_received(audio_data):
            audio_player.add_audio_data(audio_data)
    
        def on_response_done(event):
            print("\n(收到响应结束标记)")
            audio_player.stop_receiving_data()
    
        realtime_client = OmniRealtimeClient(
            base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime",
            api_key=api_key,
            model="qwen-omni-turbo-realtime",
            voice="Ethan",
            on_text_delta=lambda text: print(f"助手回复: {text}", end="", flush=True),
            on_audio_delta=on_audio_received,
            turn_detection_mode=TurnDetectionMode.MANUAL,
            extra_event_handlers={"response.done": on_response_done}
        )
    
        message_handler_task = None
        try:
            await realtime_client.connect()
            print("已连接到服务器。输入 'q' 或 'quit' 可随时退出程序。")
            message_handler_task = asyncio.create_task(realtime_client.handle_messages())
            await asyncio.sleep(0.5)
    
            turn_counter = 1
            # ------------------- 2. 多轮对话循环 -------------------
            while True:
                print(f"\n--- 第 {turn_counter} 轮对话 ---")
                audio_player.prepare_for_next_turn()
    
                recorded_audio = None
                image_paths = []
                
                # --- 获取用户输入:从麦克风录音 ---
                loop = asyncio.get_event_loop()
                recorder = MicrophoneRecorder(sample_rate=16000) # 推荐使用16k采样率进行语音识别
    
                print("准备录音。按 Enter 键开始录音 (或输入 'q' 退出)...")
                user_input = await loop.run_in_executor(None, input)
                if user_input.strip().lower() in ['q', 'quit']:
                    print("用户请求退出...")
                    return
                
                try:
                    recorder.start()
                except Exception:
                    print("无法启动录音,请检查您的麦克风权限和设备。跳过本轮。")
                    continue
                
                print("录音中... 再次按 Enter 键停止录音。")
                await loop.run_in_executor(None, input)
    
                recorded_audio = recorder.stop()
    
                if not recorded_audio or len(recorded_audio) == 0:
                    print("未录制到有效音频,请重新开始本轮对话。")
                    continue
    
                # --- 获取图片输入 (可选) ---
                # 以下图片输入功能已被注释,暂时禁用。若需启用请取消下方代码注释。
                # print("\n请逐行输入【图片文件】的绝对路径 (可选)。完成后,输入 's' 或按 Enter 发送请求。")
                # while True:
                #     path = input("图片路径: ").strip()
                #     if path.lower() == 's' or path == '':
                #         break
                #     if path.lower() in ['q', 'quit']:
                #         print("用户请求退出...")
                #         return
                #     
                #     if not os.path.isabs(path):
                #         print("错误: 请输入绝对路径。")
                #         continue
                #     if not os.path.exists(path):
                #         print(f"错误: 文件不存在 -> {path}")
                #         continue
                #     image_paths.append(path)
                #     print(f"已添加图片: {os.path.basename(path)}")
                
                # --- 3. 发送数据并获取响应 ---
                print("\n--- 输入确认 ---")
                print(f"待处理音频: 1个 (来自麦克风), 图片: {len(image_paths)}个")
                print("------------------")
    
                # 3.1 发送录制的音频
                try:
                    print(f"发送麦克风录音 ({len(recorded_audio)}字节)")
                    await realtime_client.stream_audio(recorded_audio)
                    await asyncio.sleep(0.1)
                except Exception as e:
                    print(f"发送麦克风录音失败: {e}")
                    continue
    
                # 3.2 发送所有图片文件
                # 以下图片发送代码已被注释,暂时禁用。
                # for i, path in enumerate(image_paths):
                #     try:
                #         with open(path, "rb") as f:
                #             data = f.read()
                #         print(f"发送图片 {i+1}: {os.path.basename(path)} ({len(data)}字节)")
                #         await realtime_client.append_image(data)
                #         await asyncio.sleep(0.1)
                #     except Exception as e:
                #         print(f"发送图片 {os.path.basename(path)} 失败: {e}")
    
                # 3.3 提交并等待响应
                print("提交所有输入,请求服务器响应...")
                await realtime_client.commit_audio_buffer()
                await realtime_client.create_response()
    
                print("等待并播放服务器响应音频...")
                start_time = time.time()
                max_wait_time = 60
                while not audio_player.is_finished_playing():
                    if time.time() - start_time > max_wait_time:
                        print(f"\n等待超时 ({max_wait_time}秒), 进入下一轮。")
                        break
                    await asyncio.sleep(0.2)
                
                print("\n本轮音频播放完成!")
                turn_counter += 1
    
        except (asyncio.CancelledError, KeyboardInterrupt):
            print("\n程序被中断。")
        except Exception as e:
            print(f"发生未处理的错误: {e}")
        finally:
            # ------------------- 4. 清理资源 -------------------
            print("\n正在关闭连接并清理资源...")
            if message_handler_task and not message_handler_task.done():
                message_handler_task.cancel()
            
            if 'realtime_client' in locals() and realtime_client.ws and not realtime_client.ws.close:
                await realtime_client.close()
                print("连接已关闭。")
    
            audio_player.stop()
            print("程序退出。")
    
    if __name__ == "__main__":
        try:
            asyncio.run(interactive_test())
        except KeyboardInterrupt:
            print("\n程序被用户强制退出。") 

    运行manual_mode.py,按 Enter 键开始说话,再按一次获取模型响应的音频。

交互流程

VAD 模式

session.update事件的session.turn_detection 设为"server_vad"以启用 VAD 模式。此模式下,服务端自动检测语音起止并进行响应。适用于语音通话场景。

交互流程如下:

  1. 服务端检测到语音开始,发送input_audio_buffer.speech_started 事件。

  2. 客户端随时发送 input_audio_buffer.append事件追加音频至缓冲区。

  3. 服务端检测到语音结束,发送input_audio_buffer.speech_stopped事件。

  4. 服务端发送input_audio_buffer.commited 事件提交音频缓冲区。

  5. 服务端发送 conversation.item.created事件,包含从缓冲区创建的用户消息项。

生命周期

客户端事件

服务器事件

会话初始化

session.update

会话配置

session.created

会话已创建

session.updated

会话配置已更新

用户音频输入

input_audio_buffer.append

添加音频到缓冲区

input_image_buffer.append

添加图片到缓冲区

input_audio_buffer.speech_started

检测到语音开始

input_audio_buffer.speech_stopped

检测到语音结束

input_audio_buffer.commited

服务器收到提交的音频

服务器音频输出

response.created

服务端开始生成响应

response.output_item.added

响应时有新的输出内容

conversation.item.created

对话项被创建

response.content_part.added

新的输出内容添加到assistant message

response.audio_transcript.delta

增量生成的转录文字

response.audio.delta

模型增量生成的音频

response.audio_transcript.done

文本转录完成

response.audio.done

音频生成完成

response.content_part.done

Assistant mesasge 的文本或音频内容流式输出完成

response.output_item.done

Assistant mesasge 的整个输出项流式传输完成

response.done

响应完成

Manual 模式

session.update事件的session.turn_detection 设为 None以启用 Manual 模式。此模式下,客户端通过显式发送input_audio_buffer.commitresponse.create事件请求服务器响应。适用于按下即说场景,如聊天软件中的发送语音。

交互流程如下:

  1. 客户端发送 input_audio_buffer.append事件追加音频到缓冲区。

    也可以发送 input_image_buffer.append 事件追加图像到缓冲区。
  2. 客户端发送input_audio_buffer.commit事件提交音频缓冲区与图像缓冲区,创建新用户消息项。

  3. 服务端响应 input_audio_buffer.commited事件。

  4. 客户端发送response.create事件,等待服务端返回模型的输出。

  5. 服务端响应conversation.item.created事件。

生命周期

客户端事件

服务器事件

会话初始化

session.update

会话配置

session.created

会话已创建

session.updated

会话配置已更新

用户音频输入

input_audio_buffer.append

添加音频到缓冲区

input_image_buffer.append

添加图片到缓冲区

input_audio_buffer.commit

提交音频与图片到服务器

response.create

创建模型响应

input_audio_buffer.commited

服务器收到提交的音频

服务器音频输出

input_audio_buffer.clear

清除缓冲区的音频

response.created

服务端开始生成响应

response.output_item.added

响应时有新的输出内容

conversation.item.created

对话项被创建

response.content_part.added

新的输出内容添加到assistant message 项

response.audio_transcript.delta

增量生成的转录文字

response.audio.delta

模型增量生成的音频

response.audio_transcript.done

完成文本转录

response.audio.done

完成音频生成

response.content_part.done

Assistant mesasge 的文本或音频内容流式输出完成

response.output_item.done

Assistant mesasge 的整个输出项流式传输完成

response.done

响应完成

API 参考

常见问题

Q1:如何在线体验 Qwen-Omni-Realtime 模型?

A:您可以通过以下方式一键部署:

  1. 访问函数计算模板部署类型选择直接部署百炼 API-KEY 填入您的 API Key;单击创建并部署默认环境

  2. 等待约一分钟,在环境详情环境信息中获取访问域名将访问域名的http改成https(示例:https://omni-realtime.fcv3.xxxx.cn-hangzhou.fc.devsapp.net),通过该 HTTPS 链接与模型进行视频/语音通话。

通过资源信息-函数资源查看项目源代码。
函数计算阿里云百炼均为新用户提供免费额度,可以覆盖简单调试所需成本,额度耗尽后按量计费。只有在访问的情况下会产生费用。

Q2:怎么向模型输入图片?

A:通过客户端发送 input_image_buffer.append 事件。

若用于视频通话场景,可以对视频抽帧,以不超过每秒两帧的速度发送 input_image_buffer.append 事件。