Qwen-Omni 实时 API 提供了低延迟的多模态交互能力,支持音频的流式输入,并能够流式输出文本和音频。
在线体验请参见如何在线体验 Qwen-Omni-Realtime 模型?
相比于 Qwen-Omni 模型,Qwen-Omni 实时模型可以:
音频流式输入:Qwen-Omni 模型只能接收音频文件作为输入,而 Qwen-Omni 实时模型可以实时接收音频流;
语音活动检测:Qwen-Omni 实时模型内置 VAD(Voice Activity Detection,语音活动检测)功能,可自动检测用户语音的开始和结束;
模型支持的音色包括Chelsie、Serena、Ethan和Cherry。
支持的模型
模型名称 | 版本 | 上下文长度 | 最大输入 | 最大输出 | 免费额度 |
(Token数) | |||||
qwen-omni-turbo-realtime 当前能力等同 qwen-omni-turbo-realtime-2025-05-08 | 稳定版 | 32,768 | 30,720 | 2,048 | 各100万Token(不区分模态) 有效期:百炼开通后180天内 |
qwen-omni-turbo-realtime-latest 能力始终等同最新快照版 | 最新版 | ||||
qwen-omni-turbo-realtime-2025-05-08 | 快照版 |
免费额度用完后,输入与输出的计费规则如下:
|
|
访问方式
Qwen-Omni 实时 API 基于WebSocket协议,您可以通过不同编程语言的 WebSocket 库来实现。
配置项 | 说明 |
调用地址 | 需要配置为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime |
查询参数 | 查询参数为model,需指定为要访问的model名称,请参见支持的模型。 |
消息头 | 使用 Bearer Token 鉴权:Authorization: Bearer DASHSCOPE_API_KEY DASHSCOPE_API_KEY 是您在百炼上申请的API-KEY。 |
您可以通过以下代码与 Qwen-Omni 实时 API服务建立WebSocket连接。
建立WebSocket连接
# pip install websocket-client
import json
import websocket
import os
API_KEY=os.getenv("DASHSCOPE_API_KEY")
API_URL = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen-omni-turbo-realtime"
headers = [
"Authorization: Bearer " + API_KEY
]
def on_open(ws):
print(f"Connected to server: {API_URL}")
def on_message(ws, message):
data = json.loads(message)
print("Received event:", json.dumps(data, indent=2))
def on_error(ws, error):
print("Error:", error)
ws = websocket.WebSocketApp(
API_URL,
header=headers,
on_open=on_open,
on_message=on_message,
on_error=on_error
)
ws.run_forever()
连接后可以接收到以下回调信息:
{
"event_id": "event_Bj1ixvmDpFda3ahThkOSz",
"type": "session.created",
"session": {
"object": "realtime.session",
"model": "qwen-omni-turbo-realtime",
"modalities": [
"text",
"audio"
],
"instructions": "...model instructions here...",
"voice": "Chelsie",
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {
"model": "gummy-realtime-v1"
},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 800,
"create_response": true,
"interrupt_response": true
},
"tools": [],
"tool_choice": "auto",
"temperature": 0.8,
"max_response_output_tokens": "inf",
"id": "sess_JSqGSEx9RRfReAIWNegSp"
}
}
快速开始
Qwen-Omni-Realtime系列模型仅支持 OpenAI 兼容方式调用。您需要获取API Key并配置API Key到环境变量。
您的 Python 版本需要不低于 3.10。
通过以下步骤,您可以快速体验与 Realtime 模型实时对话的功能。
准备运行环境
根据您的操作系统来安装 pyaudio。
macOS
brew install portaudio && pip install pyaudio
Debian/Ubuntu
sudo apt-get install python3-pyaudio 或者 pip install pyaudio
CentOS
sudo yum install -y portaudio portaudio-devel && pip install pyaudio
Windows
pip install pyaudio
安装完成后,通过 pip 安装 websocket 相关的依赖:
pip install websocket-client==1.8.0 websockets
创建客户端
在本地新建一个 python 文件,命名为
omni_realtime_client.py
,并将以下代码复制进文件中:omni_realtime_client.py
# -- coding: utf-8 -- import asyncio import websockets import json import base64 import time from typing import Optional, Callable, List, Dict, Any from enum import Enum class TurnDetectionMode(Enum): SERVER_VAD = "server_vad" MANUAL = "manual" class OmniRealtimeClient: """ 与 Omni Realtime API 交互的演示客户端。 该类提供了连接 Realtime API、发送文本和音频数据、处理响应以及管理 WebSocket 连接的相关方法。 属性说明: base_url (str): Realtime API 的基础地址。 api_key (str): 用于身份验证的 API Key。 model (str): 用于聊天的 Omni 模型名称。 voice (str): 服务器合成语音所使用的声音。 turn_detection_mode (TurnDetectionMode): 轮次检测模式。 on_text_delta (Callable[[str], None]): 文本增量回调函数。 on_audio_delta (Callable[[bytes], None]): 音频增量回调函数。 on_input_transcript (Callable[[str], None]): 输入转录文本回调函数。 on_interrupt (Callable[[], None]): 用户打断回调函数,应在此停止音频播放。 on_output_transcript (Callable[[str], None]): 输出转录文本回调函数。 extra_event_handlers (Dict[str, Callable[[Dict[str, Any]], None]]): 其他事件处理器,事件名到处理函数的映射。 """ def __init__( self, base_url, api_key: str, model: str = "", voice: str = "Ethan", turn_detection_mode: TurnDetectionMode = TurnDetectionMode.MANUAL, on_text_delta: Optional[Callable[[str], None]] = None, on_audio_delta: Optional[Callable[[bytes], None]] = None, on_interrupt: Optional[Callable[[], None]] = None, on_input_transcript: Optional[Callable[[str], None]] = None, on_output_transcript: Optional[Callable[[str], None]] = None, extra_event_handlers: Optional[Dict[str, Callable[[Dict[str, Any]], None]]] = None ): self.base_url = base_url self.api_key = api_key self.model = model self.voice = voice self.ws = None self.on_text_delta = on_text_delta self.on_audio_delta = on_audio_delta self.on_interrupt = on_interrupt self.on_input_transcript = on_input_transcript self.on_output_transcript = on_output_transcript self.turn_detection_mode = turn_detection_mode self.extra_event_handlers = extra_event_handlers or {} # 当前回复状态 self._current_response_id = None self._current_item_id = None self._is_responding = False # 输入/输出转录打印状态 self._print_input_transcript = False self._output_transcript_buffer = "" async def connect(self) -> None: """与 Realtime API 建立 WebSocket 连接。""" url = f"{self.base_url}?model={self.model}" headers = { "Authorization": f"Bearer {self.api_key}" } self.ws = await websockets.connect(url, additional_headers=headers) # 设置默认会话配置 if self.turn_detection_mode == TurnDetectionMode.MANUAL: await self.update_session({ "modalities": ["text", "audio"], "voice": self.voice, "input_audio_format": "pcm16", "output_audio_format": "pcm16", "input_audio_transcription": { "model": "gummy-realtime-v1" }, "turn_detection" : None }) elif self.turn_detection_mode == TurnDetectionMode.SERVER_VAD: await self.update_session({ "modalities": ["text", "audio"], "voice": self.voice, "input_audio_format": "pcm16", "output_audio_format": "pcm16", "input_audio_transcription": { "model": "gummy-realtime-v1" }, "turn_detection": { "type": "server_vad", "threshold": 0.1, "prefix_padding_ms": 500, "silence_duration_ms": 900 } }) else: raise ValueError(f"Invalid turn detection mode: {self.turn_detection_mode}") async def send_event(self, event) -> None: event['event_id'] = "event_" + str(int(time.time() * 1000)) print(f" Send event: type={event['type']}, event_id={event['event_id']}") await self.ws.send(json.dumps(event)) async def update_session(self, config: Dict[str, Any]) -> None: """更新会话配置。""" event = { "type": "session.update", "session": config } print("update session: ", event) await self.send_event(event) async def stream_audio(self, audio_chunk: bytes) -> None: """向 API 流式发送原始音频数据。""" # 仅支持 16bit 16kHz 单声道 PCM audio_b64 = base64.b64encode(audio_chunk).decode() append_event = { "type": "input_audio_buffer.append", "audio": audio_b64 } await self.send_event(append_event) async def commit_audio_buffer(self) -> None: """提交音频缓冲区以触发处理。""" event = { "type": "input_audio_buffer.commit" } await self.send_event(event) async def append_image(self, image_chunk: bytes) -> None: """向视频缓冲区追加图像数据。 图像数据可以来自本地文件,也可以来自实时视频流。 注意: - 图像格式必须为 JPG 或 JPEG。推荐分辨率为 480P 或 720P,最高支持 1080P。 - 单张图片大小不应超过 500KB。 - 本方法会将图像数据编码为 Base64 后再发送。 - 建议以每秒 2 帧的频率向服务器发送图像。 - 在发送图像数据之前,需要先发送音频数据。 """ image_b64 = base64.b64encode(image_chunk).decode() event = { "type": "input_image_buffer.append", "image": image_b64 } await self.send_event(event) async def create_response(self) -> None: """向 API 请求生成回复(仅在手动模式下需要调用)。""" event = { "type": "response.create", "response": { "instructions": "You are a helpful assistant.", "modalities": ["text", "audio"] } } print("create response: ", event) await self.send_event(event) async def cancel_response(self) -> None: """取消当前回复。""" event = { "type": "response.cancel" } await self.send_event(event) async def handle_interruption(self): """处理用户对当前回复的打断。""" if not self._is_responding: return print(" Handling interruption") # 1. 取消当前回复 if self._current_response_id: await self.cancel_response() self._is_responding = False self._current_response_id = None self._current_item_id = None async def handle_messages(self) -> None: try: async for message in self.ws: event = json.loads(message) event_type = event.get("type") if event_type != "response.audio.delta": print(" event: ", event) else: print(" event_type: ", event_type) if event_type == "error": print(" Error: ", event['error']) continue elif event_type == "response.created": self._current_response_id = event.get("response", {}).get("id") self._is_responding = True elif event_type == "response.output_item.added": self._current_item_id = event.get("item", {}).get("id") elif event_type == "response.done": self._is_responding = False self._current_response_id = None self._current_item_id = None # Handle interruptions elif event_type == "input_audio_buffer.speech_started": print(" Speech detected") if self._is_responding: print(" Handling interruption") await self.handle_interruption() if self.on_interrupt: print(" Handling on_interrupt, stop playback") self.on_interrupt() elif event_type == "input_audio_buffer.speech_stopped": print(" Speech ended") # Handle normal response events elif event_type == "response.text.delta": if self.on_text_delta: self.on_text_delta(event["delta"]) elif event_type == "response.audio.delta": if self.on_audio_delta: audio_bytes = base64.b64decode(event["delta"]) self.on_audio_delta(audio_bytes) elif event_type == "conversation.item.input_audio_transcription.completed": transcript = event.get("transcript", "") if self.on_input_transcript: await asyncio.to_thread(self.on_input_transcript,transcript) self._print_input_transcript = True elif event_type == "response.audio_transcript.delta": if self.on_output_transcript: delta = event.get("delta", "") if not self._print_input_transcript: self._output_transcript_buffer += delta else: if self._output_transcript_buffer: await asyncio.to_thread(self.on_output_transcript,self._output_transcript_buffer) self._output_transcript_buffer = "" await asyncio.to_thread(self.on_output_transcript,delta) elif event_type == "response.audio_transcript.done": self._print_input_transcript = False elif event_type in self.extra_event_handlers: self.extra_event_handlers[event_type](event) except websockets.exceptions.ConnectionClosed: print(" Connection closed") except Exception as e: print(" Error in message handling: ", str(e)) async def close(self) -> None: """关闭 WebSocket 连接。""" if self.ws: await self.ws.close()
选择交互模式
VAD 模式(Voice Activity Detection,自动检测语音起止)
Realtime API 自动判断用户何时开始与停止说话并作出回应。
Manual 模式(按下即说,松开即发送)
客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。
VAD 模式
在
omni_realtime_client.py
的同级目录下新建另一个 python 文件,命名为vad_mode.py
,并将以下代码复制进文件中:vad_mode.py
# -- coding: utf-8 -- import os, time, base64, asyncio from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode import pyaudio import queue import threading # 创建一个全局音频队列和播放线程 audio_queue = queue.Queue() audio_player = None # 添加全局中断标志 interrupt_flag = threading.Event() # 初始化PyAudio p = pyaudio.PyAudio() RATE = 24000 # 采样率 24kHz CHUNK = 3200 # 每个音频块的大小 FORMAT = pyaudio.paInt16 # 16位PCM格式 CHANNELS = 1 # 单声道 def clear_audio_queue(): """清空音频队列""" with audio_queue.mutex: audio_queue.queue.clear() print("音频队列已清空") def handle_interrupt(): """处理中断事件 - 立即停止音频播放""" print("检测到语音输入,停止音频播放") interrupt_flag.set() # 设置中断标志 clear_audio_queue() # 清空队列 def audio_player_thread(): """后台线程用于播放音频数据""" stream = p.open( format=FORMAT, channels=CHANNELS, rate=24000, output=True, frames_per_buffer=CHUNK, ) try: while True: try: # 检查中断标志 if interrupt_flag.is_set(): print("音频播放被中断") interrupt_flag.clear() # 清除中断标志 continue # 从队列获取音频数据 audio_data = audio_queue.get(block=True, timeout=0.5) if audio_data is None: # 结束信号 break # 再次检查中断标志(在播放前) if interrupt_flag.is_set(): print("音频播放被中断") interrupt_flag.clear() # 清除中断标志 audio_queue.task_done() continue # 播放音频数据 stream.write(audio_data) audio_queue.task_done() except queue.Empty: # 如果队列为空,继续等待 continue finally: # 清理 stream.stop_stream() stream.close() def start_audio_player(): """启动音频播放线程""" global audio_player if audio_player is None or not audio_player.is_alive(): audio_player = threading.Thread(target=audio_player_thread, daemon=True) audio_player.start() def handle_audio_data(audio_data): """处理接收到的音频数据""" # 打印接收到的音频数据长度(调试用) print(f"\n接收到音频数据: {len(audio_data)} 字节") # 将音频数据放入队列 audio_queue.put(audio_data) async def start_microphone_streaming(client: OmniRealtimeClient): CHUNK = 3200 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 24000 p = pyaudio.PyAudio() stream = p.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK ) try: print("开始录音,请讲话...") while True: audio_data = stream.read(CHUNK) encoded_data = base64.b64encode(audio_data).decode("utf-8") eventd = { "event_id": "event_" + str(int(time.time() * 1000)), "type": "input_audio_buffer.append", "audio": encoded_data, } await client.send_event(eventd) # 保持较短的等待时间以模拟实时交互 await asyncio.sleep(0.05) finally: stream.stop_stream() stream.close() p.terminate() async def main(): # 启动音频播放线程 start_audio_player() realtime_client = OmniRealtimeClient( base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime", api_key=os.environ.get("DASHSCOPE_API_KEY"), model="qwen-omni-turbo-realtime", voice="Chelsie", on_text_delta=lambda text: print(f"\nAssistant: {text}", end="", flush=True), on_audio_delta=handle_audio_data, on_interrupt=handle_interrupt, # 添加中断回调函数 turn_detection_mode=TurnDetectionMode.SERVER_VAD, ) try: await realtime_client.connect() # 启动消息处理和麦克风录音 message_handler = asyncio.create_task(realtime_client.handle_messages()) streaming_task = asyncio.create_task( start_microphone_streaming(realtime_client) ) while True: await asyncio.Queue().get() except Exception as e: print(f"Error: {e}") finally: # 结束音频播放线程 audio_queue.put(None) if audio_player: audio_player.join(timeout=1) await realtime_client.close() p.terminate() if __name__ == "__main__": asyncio.run(main())
运行
vad_mode.py
,通过麦克风即可与 Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。Manual 模式
在
omni_realtime_client.py
的同级目录下新建另一个 python 文件,命名为manual_mode.py
,并将以下代码复制进文件中:manual_mode.py
# -- coding: utf-8 -- import os import asyncio import time import threading import queue import pyaudio from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode class AudioPlayer: """实时音频播放器类""" def __init__(self, sample_rate=24000, channels=1, sample_width=2): self.sample_rate = sample_rate self.channels = channels self.sample_width = sample_width # 2 bytes for 16-bit self.audio_queue = queue.Queue() self.is_playing = False self.play_thread = None self.pyaudio_instance = None self.stream = None self._lock = threading.Lock() # 添加锁来同步访问 self._last_data_time = time.time() # 记录最后接收数据的时间 self._response_done = False # 添加响应完成标志 self._waiting_for_response = False # 标记是否正在等待服务器响应 # 记录最后一次向音频流写入数据的时间及最近一次音频块的时长,用于更精确地判断播放结束 self._last_play_time = time.time() self._last_chunk_duration = 0.0 def start(self): """启动音频播放器""" with self._lock: if self.is_playing: return self.is_playing = True try: self.pyaudio_instance = pyaudio.PyAudio() # 创建音频输出流 self.stream = self.pyaudio_instance.open( format=pyaudio.paInt16, # 16-bit channels=self.channels, rate=self.sample_rate, output=True, frames_per_buffer=1024 ) # 启动播放线程 self.play_thread = threading.Thread(target=self._play_audio) self.play_thread.daemon = True self.play_thread.start() print("音频播放器已启动") except Exception as e: print(f"启动音频播放器失败: {e}") self._cleanup_resources() raise def stop(self): """停止音频播放器""" with self._lock: if not self.is_playing: return self.is_playing = False # 清空队列 while not self.audio_queue.empty(): try: self.audio_queue.get_nowait() except queue.Empty: break # 等待播放线程结束(在锁外面等待,避免死锁) if self.play_thread and self.play_thread.is_alive(): self.play_thread.join(timeout=2.0) # 再次获取锁来清理资源 with self._lock: self._cleanup_resources() print("音频播放器已停止") def _cleanup_resources(self): """清理音频资源(必须在锁内调用)""" try: # 关闭音频流 if self.stream: if not self.stream.is_stopped(): self.stream.stop_stream() self.stream.close() self.stream = None except Exception as e: print(f"关闭音频流时出错: {e}") try: if self.pyaudio_instance: self.pyaudio_instance.terminate() self.pyaudio_instance = None except Exception as e: print(f"终止PyAudio时出错: {e}") def add_audio_data(self, audio_data): """添加音频数据到播放队列""" if self.is_playing and audio_data: self.audio_queue.put(audio_data) with self._lock: self._last_data_time = time.time() # 更新最后接收数据的时间 self._waiting_for_response = False # 收到数据,不再等待 def stop_receiving_data(self): """标记不再接收新的音频数据""" with self._lock: self._response_done = True self._waiting_for_response = False # 响应结束,不再等待 def prepare_for_next_turn(self): """为下一轮对话重置播放器状态。""" with self._lock: self._response_done = False self._last_data_time = time.time() self._last_play_time = time.time() self._last_chunk_duration = 0.0 self._waiting_for_response = True # 开始等待下一轮响应 # 清空上一轮可能残留的音频数据 while not self.audio_queue.empty(): try: self.audio_queue.get_nowait() except queue.Empty: break def is_finished_playing(self): """检查是否已经播放完所有音频数据""" with self._lock: queue_size = self.audio_queue.qsize() time_since_last_data = time.time() - self._last_data_time time_since_last_play = time.time() - self._last_play_time # ---------------------- 智能结束判定 ---------------------- # 1. 首选:如果服务器已标记完成且播放队列为空 # 进一步等待最近一块音频播放完毕(音频块时长 + 0.1s 容错)。 if self._response_done and queue_size == 0: min_wait = max(self._last_chunk_duration + 0.1, 0.5) # 至少等待 0.5s if time_since_last_play >= min_wait: return True # 2. 备用:如果长时间没有新数据且播放队列为空 # 当服务器没有明确发出 `response.done` 时,此逻辑作为保障 if not self._waiting_for_response and queue_size == 0 and time_since_last_data > 1.0: print("\n(超时未收到新音频,判定播放结束)") return True return False def _play_audio(self): """播放音频数据的工作线程""" while True: # 检查是否应该停止 with self._lock: if not self.is_playing: break stream_ref = self.stream # 获取流的引用 try: # 从队列中获取音频数据,超时0.1秒 audio_data = self.audio_queue.get(timeout=0.1) # 再次检查状态和流的有效性 with self._lock: if self.is_playing and stream_ref and not stream_ref.is_stopped(): try: # 播放音频数据 stream_ref.write(audio_data) # 更新最近播放信息 self._last_play_time = time.time() self._last_chunk_duration = len(audio_data) / (self.channels * self.sample_width) / self.sample_rate except Exception as e: print(f"写入音频流时出错: {e}") break # 标记该数据块已处理完成 self.audio_queue.task_done() except queue.Empty: # 队列为空时继续等待 continue except Exception as e: print(f"播放音频时出错: {e}") break class MicrophoneRecorder: """实时麦克风录音器""" def __init__(self, sample_rate=16000, channels=1, chunk_size=1024): self.sample_rate = sample_rate self.channels = channels self.chunk_size = chunk_size self.pyaudio_instance = None self.stream = None self.frames = [] self._is_recording = False self._record_thread = None def _recording_thread(self): """录音工作线程""" # 在 _is_recording 为 True 期间,持续从音频流中读取数据 while self._is_recording: try: # 使用 exception_on_overflow=False 避免因缓冲区溢出而崩溃 data = self.stream.read(self.chunk_size, exception_on_overflow=False) self.frames.append(data) except (IOError, OSError) as e: # 当流被关闭时,读取操作可能会引发错误 print(f"录音流读取错误,可能已关闭: {e}") break def start(self): """开始录音""" if self._is_recording: print("录音已在进行中。") return self.frames = [] self._is_recording = True try: self.pyaudio_instance = pyaudio.PyAudio() self.stream = self.pyaudio_instance.open( format=pyaudio.paInt16, channels=self.channels, rate=self.sample_rate, input=True, frames_per_buffer=self.chunk_size ) self._record_thread = threading.Thread(target=self._recording_thread) self._record_thread.daemon = True self._record_thread.start() print("麦克风录音已开始...") except Exception as e: print(f"启动麦克风失败: {e}") self._is_recording = False self._cleanup() raise def stop(self): """停止录音并返回音频数据""" if not self._is_recording: return None self._is_recording = False # 等待录音线程安全退出 if self._record_thread: self._record_thread.join(timeout=1.0) self._cleanup() print("麦克风录音已停止。") return b''.join(self.frames) def _cleanup(self): """安全地清理 PyAudio 资源""" if self.stream: try: if self.stream.is_active(): self.stream.stop_stream() self.stream.close() except Exception as e: print(f"关闭音频流时出错: {e}") if self.pyaudio_instance: try: self.pyaudio_instance.terminate() except Exception as e: print(f"终止 PyAudio 实例时出错: {e}") self.stream = None self.pyaudio_instance = None async def interactive_test(): """ 交互式测试脚本:允许多轮连续对话,每轮可以发送音频和图片。 """ # ------------------- 1. 初始化和连接 (一次性) ------------------- api_key = os.environ.get("DASHSCOPE_API_KEY") if not api_key: print("请设置DASHSCOPE_API_KEY环境变量") return print("--- 实时多轮音视频对话客户端 ---") print("正在初始化音频播放器和客户端...") audio_player = AudioPlayer() audio_player.start() def on_audio_received(audio_data): audio_player.add_audio_data(audio_data) def on_response_done(event): print("\n(收到响应结束标记)") audio_player.stop_receiving_data() realtime_client = OmniRealtimeClient( base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime", api_key=api_key, model="qwen-omni-turbo-realtime", voice="Ethan", on_text_delta=lambda text: print(f"助手回复: {text}", end="", flush=True), on_audio_delta=on_audio_received, turn_detection_mode=TurnDetectionMode.MANUAL, extra_event_handlers={"response.done": on_response_done} ) message_handler_task = None try: await realtime_client.connect() print("已连接到服务器。输入 'q' 或 'quit' 可随时退出程序。") message_handler_task = asyncio.create_task(realtime_client.handle_messages()) await asyncio.sleep(0.5) turn_counter = 1 # ------------------- 2. 多轮对话循环 ------------------- while True: print(f"\n--- 第 {turn_counter} 轮对话 ---") audio_player.prepare_for_next_turn() recorded_audio = None image_paths = [] # --- 获取用户输入:从麦克风录音 --- loop = asyncio.get_event_loop() recorder = MicrophoneRecorder(sample_rate=16000) # 推荐使用16k采样率进行语音识别 print("准备录音。按 Enter 键开始录音 (或输入 'q' 退出)...") user_input = await loop.run_in_executor(None, input) if user_input.strip().lower() in ['q', 'quit']: print("用户请求退出...") return try: recorder.start() except Exception: print("无法启动录音,请检查您的麦克风权限和设备。跳过本轮。") continue print("录音中... 再次按 Enter 键停止录音。") await loop.run_in_executor(None, input) recorded_audio = recorder.stop() if not recorded_audio or len(recorded_audio) == 0: print("未录制到有效音频,请重新开始本轮对话。") continue # --- 获取图片输入 (可选) --- # 以下图片输入功能已被注释,暂时禁用。若需启用请取消下方代码注释。 # print("\n请逐行输入【图片文件】的绝对路径 (可选)。完成后,输入 's' 或按 Enter 发送请求。") # while True: # path = input("图片路径: ").strip() # if path.lower() == 's' or path == '': # break # if path.lower() in ['q', 'quit']: # print("用户请求退出...") # return # # if not os.path.isabs(path): # print("错误: 请输入绝对路径。") # continue # if not os.path.exists(path): # print(f"错误: 文件不存在 -> {path}") # continue # image_paths.append(path) # print(f"已添加图片: {os.path.basename(path)}") # --- 3. 发送数据并获取响应 --- print("\n--- 输入确认 ---") print(f"待处理音频: 1个 (来自麦克风), 图片: {len(image_paths)}个") print("------------------") # 3.1 发送录制的音频 try: print(f"发送麦克风录音 ({len(recorded_audio)}字节)") await realtime_client.stream_audio(recorded_audio) await asyncio.sleep(0.1) except Exception as e: print(f"发送麦克风录音失败: {e}") continue # 3.2 发送所有图片文件 # 以下图片发送代码已被注释,暂时禁用。 # for i, path in enumerate(image_paths): # try: # with open(path, "rb") as f: # data = f.read() # print(f"发送图片 {i+1}: {os.path.basename(path)} ({len(data)}字节)") # await realtime_client.append_image(data) # await asyncio.sleep(0.1) # except Exception as e: # print(f"发送图片 {os.path.basename(path)} 失败: {e}") # 3.3 提交并等待响应 print("提交所有输入,请求服务器响应...") await realtime_client.commit_audio_buffer() await realtime_client.create_response() print("等待并播放服务器响应音频...") start_time = time.time() max_wait_time = 60 while not audio_player.is_finished_playing(): if time.time() - start_time > max_wait_time: print(f"\n等待超时 ({max_wait_time}秒), 进入下一轮。") break await asyncio.sleep(0.2) print("\n本轮音频播放完成!") turn_counter += 1 except (asyncio.CancelledError, KeyboardInterrupt): print("\n程序被中断。") except Exception as e: print(f"发生未处理的错误: {e}") finally: # ------------------- 4. 清理资源 ------------------- print("\n正在关闭连接并清理资源...") if message_handler_task and not message_handler_task.done(): message_handler_task.cancel() if 'realtime_client' in locals() and realtime_client.ws and not realtime_client.ws.close: await realtime_client.close() print("连接已关闭。") audio_player.stop() print("程序退出。") if __name__ == "__main__": try: asyncio.run(interactive_test()) except KeyboardInterrupt: print("\n程序被用户强制退出。")
运行
manual_mode.py
,按 Enter 键开始说话,再按一次获取模型响应的音频。
交互流程
VAD 模式
将session.update事件的session.turn_detection
设为"server_vad"
以启用 VAD 模式。此模式下,服务端自动检测语音起止并进行响应。适用于语音通话场景。
交互流程如下:
服务端检测到语音开始,发送input_audio_buffer.speech_started 事件。
客户端随时发送 input_audio_buffer.append事件追加音频至缓冲区。
服务端检测到语音结束,发送input_audio_buffer.speech_stopped事件。
服务端发送input_audio_buffer.commited 事件提交音频缓冲区。
服务端发送 conversation.item.created事件,包含从缓冲区创建的用户消息项。
生命周期 | 客户端事件 | 服务器事件 |
会话初始化 | 会话配置 | 会话已创建 会话配置已更新 |
用户音频输入 | 添加音频到缓冲区 添加图片到缓冲区 | input_audio_buffer.speech_started 检测到语音开始 input_audio_buffer.speech_stopped 检测到语音结束 服务器收到提交的音频 |
服务器音频输出 | 无 | 服务端开始生成响应 响应时有新的输出内容 对话项被创建 新的输出内容添加到assistant message response.audio_transcript.delta 增量生成的转录文字 模型增量生成的音频 response.audio_transcript.done 文本转录完成 音频生成完成 Assistant mesasge 的文本或音频内容流式输出完成 Assistant mesasge 的整个输出项流式传输完成 响应完成 |
Manual 模式
将session.update事件的session.turn_detection
设为 None以启用 Manual 模式。此模式下,客户端通过显式发送input_audio_buffer.commit
和response.create
事件请求服务器响应。适用于按下即说场景,如聊天软件中的发送语音。
交互流程如下:
客户端发送 input_audio_buffer.append事件追加音频到缓冲区。
也可以发送 input_image_buffer.append 事件追加图像到缓冲区。
客户端发送input_audio_buffer.commit事件提交音频缓冲区与图像缓冲区,创建新用户消息项。
服务端响应 input_audio_buffer.commited事件。
客户端发送response.create事件,等待服务端返回模型的输出。
服务端响应conversation.item.created事件。
生命周期 | 客户端事件 | 服务器事件 |
会话初始化 | 会话配置 | 会话已创建 会话配置已更新 |
用户音频输入 | 添加音频到缓冲区 添加图片到缓冲区 提交音频与图片到服务器 创建模型响应 | 服务器收到提交的音频 |
服务器音频输出 | 清除缓冲区的音频 | 服务端开始生成响应 响应时有新的输出内容 对话项被创建 新的输出内容添加到assistant message 项 response.audio_transcript.delta 增量生成的转录文字 模型增量生成的音频 response.audio_transcript.done 完成文本转录 完成音频生成 Assistant mesasge 的文本或音频内容流式输出完成 Assistant mesasge 的整个输出项流式传输完成 响应完成 |
API 参考
常见问题
Q1:如何在线体验 Qwen-Omni-Realtime 模型?
A:您可以通过以下方式一键部署:
访问函数计算模板,部署类型选择直接部署,百炼 API-KEY 填入您的 API Key;单击创建并部署默认环境。
等待约一分钟,在环境详情的环境信息中获取访问域名,将访问域名的
http
改成https
(示例:https://omni-realtime.fcv3.xxxx.cn-hangzhou.fc.devsapp.net),通过该 HTTPS 链接与模型进行视频/语音通话。
通过资源信息-函数资源查看项目源代码。
函数计算与阿里云百炼均为新用户提供免费额度,可以覆盖简单调试所需成本,额度耗尽后按量计费。只有在访问的情况下会产生费用。
Q2:怎么向模型输入图片?
A:通过客户端发送 input_image_buffer.append 事件。
VAD 模式
该模式会根据语音检测情况自动提交音频与图片,请在服务端响应input_audio_buffer.speech_stopped前发送 input_image_buffer.append 事件。
Manual 模式
参见Manual 模式代码,将图片输入与提交的两部分代码取消注释,即可传入本地图片。
input_image_buffer.append 事件不能早于input_audio_buffer.append事件。
若用于视频通话场景,可以对视频抽帧,以不超过每秒两帧的速度发送 input_image_buffer.append 事件。