实时多模态(Qwen-Omni-Realtime)

Qwen-Omni 实时 API 提供了低延迟的多模态交互能力,支持音频的流式输入,并能够流式输出文本和音频。

在线体验请参见如何在线体验 Qwen-Omni-Realtime 模型?

相比于 Qwen-Omni 模型,Qwen-Omni 实时模型可以:

  • 音频流式输入:Qwen-Omni 模型只能接收音频文件作为输入,而 Qwen-Omni 实时模型可以实时接收音频流;

  • 语音活动检测:Qwen-Omni 实时模型内置 VAD(Voice Activity Detection,语音活动检测)功能,可自动检测用户语音的开始和结束;

对于Qwen-Omni 实时模型输出的音频:

  • 支持的音色可参见音色列表

  • 支持的语言:

    • Qwen3-Omni-Flash-Realtime:汉语(普通话及上海话、粤语、四川话等),英语,法语、德语、俄语、意语、西语、葡语、日语、韩语

    • Qwen-Omni-Turbo-Realtime:仅支持汉语(普通话)和英语

支持的模型

建议优先使用Qwen3-Omni-Flash-Realtime 模型相较于Qwen-Omni-Turbo-Realtime(后续不再更新),模型的能力得到大幅提升,对于模型输出的音频:

  • 支持的音色增加至 17 种,Qwen-Omni-Turbo-Realtime 仅支持 4 种

  • 支持的语言增加至 10 种,Qwen-Omni-Turbo-Realtime 仅支持 2 种

中国大陆(北京)

Qwen3-Omni-Flash-Realtime

模型名称

版本

上下文长度

最大输入

最大输出

免费额度

(注)

(Token数)

qwen3-omni-flash-realtime

当前能力等同 qwen3-omni-flash-realtime-2025-09-15

稳定版

65,536

49,152

16,384

100Token(不区分模态)

有效期:百炼开通后90天内

qwen3-omni-flash-realtime-2025-09-15

快照版

Qwen-Omni-Turbo-Realtime(基于Qwen2.5)

模型名称

版本

上下文长度

最大输入

最大输出

免费额度

(注)

(Token数)

qwen-omni-turbo-realtime

当前能力等同 qwen-omni-turbo-realtime-2025-05-08

稳定版

32,768

30,720

2,048

100Token(不区分模态)

有效期:百炼开通后90天内

qwen-omni-turbo-realtime-latest

能力始终等同最新快照版

最新版

qwen-omni-turbo-realtime-2025-05-08

快照版

国际站(新加坡)

Qwen-Omni-Turbo-Realtime(基于Qwen2.5)

模型名称

版本

上下文长度

最大输入

最大输出

免费额度

(注)

(Token数)

qwen-omni-turbo-realtime

当前能力等同 qwen-omni-turbo-realtime-2025-05-08

稳定版

32,768

30,720

2,048

无免费额度

qwen-omni-turbo-realtime-latest

能力始终等同最新快照版

最新版

qwen-omni-turbo-realtime-2025-05-08

快照版

访问方式

Qwen-Omni 实时 API 基于 WebSocket 协议,可通过 DashScope 的 Python 或 Java SDK 接入,也支持使用各类语言的 WebSocket 库实现通信。

通过以下代码与 Qwen-Omni 实时 API服务建立WebSocket连接。

建立WebSocket连接

DashScope Python SDK

# SDK 版本不低于1.23.9
import os
import json
from dashscope.audio.qwen_omni import OmniRealtimeConversation,OmniRealtimeCallback
import dashscope
# 若没有配置 API Key,请将下行改为 dashscope.api_key = "sk-xxx"
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")

class PrintCallback(OmniRealtimeCallback):
    def on_open(self) -> None:
        print("Connected Successfully")
    def on_event(self, response: dict) -> None:
        print("Received event:")
        print(json.dumps(response, indent=2, ensure_ascii=False))
    def on_close(self, close_status_code: int, close_msg: str) -> None:
        print(f"Connection closed (code={close_status_code}, msg={close_msg}).")

callback = PrintCallback()
conversation = OmniRealtimeConversation(
    model="qwen3-omni-flash-realtime",
    callback=callback,
)
conversation.connect()
conversation.thread.join()

DashScope Java SDK

// SDK 版本不低于 2.20.9

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;

public class Main {
    public static void main(String[] args) throws InterruptedException, NoApiKeyException {
        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen3-omni-flash-realtime")
                // 若没有配置环境变量,请用您的 API Key 将下行修改为.apikey("sk-xxx")
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                .build();

        OmniRealtimeConversation conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("Connected Successfully");
            }
            @Override
            public void onEvent(JsonObject message) {
                System.out.println(message);
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
            }
        });

        conversation.connect();
        Thread.sleep(Long.MAX_VALUE);
        conversation.close(1000, "bye");
        System.exit(0);
    }
}

WebSocket(Python)

配置项

说明

调用地址

需要配置为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime

查询参数

查询参数为model,需指定为要访问的model名称,请参见支持的模型

消息头

使用 Bearer Token 鉴权:Authorization: Bearer DASHSCOPE_API_KEY

DASHSCOPE_API_KEY 是您在百炼上申请的API-KEY。
# pip install websocket-client
import json
import websocket
import os

API_KEY=os.getenv("DASHSCOPE_API_KEY")
API_URL = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen3-omni-flash-realtime"

headers = [
    "Authorization: Bearer " + API_KEY
]

def on_open(ws):
    print(f"Connected to server: {API_URL}")
def on_message(ws, message):
    data = json.loads(message)
    print("Received event:", json.dumps(data, indent=2))
def on_error(ws, error):
    print("Error:", error)

ws = websocket.WebSocketApp(
    API_URL,
    header=headers,
    on_open=on_open,
    on_message=on_message,
    on_error=on_error
)

ws.run_forever()

连接后可以接收到以下回调信息:

{
    "event_id": "event_Bj1ixvmDpFda3ahThkOSz",
    "type": "session.created",
    "session": {
        "object": "realtime.session",
        "model": "qwen3-omni-flash-realtime",
        "modalities": [
            "text",
            "audio"
        ],
        "voice": "Cherry",
        "input_audio_format": "pcm16",
        "output_audio_format": "pcm24",
        "input_audio_transcription": {
            "model": "gummy-realtime-v1"
        },
        "turn_detection": {
            "type": "server_vad",
            "threshold": 0.5,
            "prefix_padding_ms": 300,
            "silence_duration_ms": 800,
            "create_response": true,
            "interrupt_response": true
        },
        "tools": [],
        "tool_choice": "auto",
        "temperature": 0.8,
        "max_response_output_tokens": "inf",
        "id": "sess_JSqGSEx9RRfReAIWNegSp"
    }
}

快速开始

您需要获取API Key配置API Key到环境变量

请选择您熟悉的编程语言,通过以下步骤快速体验与 Realtime 模型实时对话的功能。

DashScope Python SDK

  • 准备运行环境

您的 Python 版本需要不低于 3.10。

首先根据您的操作系统来安装 pyaudio。

macOS

brew install portaudio && pip install pyaudio

Debian/Ubuntu

sudo apt-get install python3-pyaudio

或者

pip install pyaudio

CentOS

sudo yum install -y portaudio portaudio-devel && pip install pyaudio

Windows

pip install pyaudio

安装完成后,通过 pip 安装依赖:

pip install websocket-client==1.8.0 websockets dashscope==1.23.9
  • 选择交互模式

    • VAD 模式(Voice Activity Detection,自动检测语音起止)

      Realtime API 自动判断用户何时开始与停止说话并作出回应。

    • Manual 模式(按下即说,松开即发送)

      客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。

    VAD 模式

    新建一个 python 文件,命名为vad_dash.py,并将以下代码复制到文件中:

    vad_dash.py

    # dashscope SDK 版本需不低于 1.23.9
    import os
    import base64
    import signal
    import sys
    import time
    import pyaudio
    import contextlib
    import threading
    import queue
    from dashscope.audio.qwen_omni import *
    import dashscope
    # 如果没有设置环境变量,请用您的 API Key 将下行替换为dashscope.api_key = "sk-xxx"
    dashscope.api_key = os.getenv('DASHSCOPE_API_KEY')
    voice = 'Cherry'
    conversation = None
    
    class B64PCMPlayer:
        def __init__(self, pya: pyaudio.PyAudio, sample_rate=24000, chunk_size_ms=100):
            self.pya = pya
            self.sample_rate = sample_rate
            self.chunk_size_bytes = chunk_size_ms * sample_rate *2 // 1000
            self.player_stream = pya.open(format=pyaudio.paInt16,
                                          channels=1,
                                          rate=sample_rate,
                                          output=True)
    
            self.raw_audio_buffer: queue.Queue = queue.Queue()
            self.b64_audio_buffer: queue.Queue = queue.Queue()
            self.status_lock = threading.Lock()
            self.status = 'playing'
            self.decoder_thread = threading.Thread(target=self.decoder_loop)
            self.player_thread = threading.Thread(target=self.player_loop)
            self.decoder_thread.start()
            self.player_thread.start()
            self.complete_event: threading.Event = None
    
        def decoder_loop(self):
            while self.status != 'stop':
              recv_audio_b64 = None
              with contextlib.suppress(queue.Empty):
                recv_audio_b64 = self.b64_audio_buffer.get(timeout=0.1)
              if recv_audio_b64 is None:
                continue
              recv_audio_raw = base64.b64decode(recv_audio_b64)
              # 将原始音频数据推入队列,按块处理
              for i in range(0, len(recv_audio_raw), self.chunk_size_bytes):
                chunk = recv_audio_raw[i:i + self.chunk_size_bytes]
                self.raw_audio_buffer.put(chunk)
    
        def player_loop(self):
            while self.status != 'stop':
              recv_audio_raw = None
              with contextlib.suppress(queue.Empty):
                recv_audio_raw = self.raw_audio_buffer.get(timeout=0.1)
              if recv_audio_raw is None:
                if self.complete_event:
                  self.complete_event.set()
                continue
                # 将块写入pyaudio音频播放器,等待播放完这个块
              self.player_stream.write(recv_audio_raw)
    
        def cancel_playing(self):
            self.b64_audio_buffer.queue.clear()
            self.raw_audio_buffer.queue.clear()
    
        def add_data(self, data):
            self.b64_audio_buffer.put(data)
    
        def wait_for_complete(self):
            self.complete_event = threading.Event()
            self.complete_event.wait()
            self.complete_event = None
    
        def shutdown(self):
            self.status = 'stop'
            self.decoder_thread.join()
            self.player_thread.join()
            self.player_stream.close()
    
    
    
    class MyCallback(OmniRealtimeCallback):
        def on_open(self) -> None:
            global pya
            global mic_stream
            global b64_player
            print('connection opened, init microphone')
            pya = pyaudio.PyAudio()
            mic_stream = pya.open(format=pyaudio.paInt16,
                                channels=1,
                                rate=16000,
                                input=True)
            b64_player = B64PCMPlayer(pya)
        def on_close(self, close_status_code, close_msg) -> None:
            print('connection closed with code: {}, msg: {}, destroy microphone'.format(close_status_code, close_msg))
            sys.exit(0)
    
        def on_event(self, response: str) -> None:
            try:
                global conversation
                global b64_player
                type = response['type']
                if 'session.created' == type:
                    print('start session: {}'.format(response['session']['id']))
                if 'conversation.item.input_audio_transcription.completed' == type:
                    print('question: {}'.format(response['transcript']))
                if 'response.audio_transcript.delta' == type:
                    text = response['delta']
                    print("got llm response delta: {}".format(text))
                if 'response.audio.delta' == type:
                    recv_audio_b64 = response['delta']
                    b64_player.add_data(recv_audio_b64)
                if 'input_audio_buffer.speech_started' == type:
                    print('======VAD Speech Start======')
                    b64_player.cancel_playing()
                if 'response.done' == type:
                    print('======RESPONSE DONE======')
                    print('[Metric] response: {}, first text delay: {}, first audio delay: {}'.format(
                                    conversation.get_last_response_id(),
                                    conversation.get_last_first_text_delay(),
                                    conversation.get_last_first_audio_delay(),
                                    ))
            except Exception as e:
                print('[Error] {}'.format(e))
                return
    
    if __name__  == '__main__':
        print('Initializing ...')
        callback = MyCallback()
        conversation = OmniRealtimeConversation(
            model='qwen3-omni-flash-realtime',
            callback=callback,
            )
        conversation.connect()
        conversation.update_session(
            output_modalities=[MultiModality.AUDIO, MultiModality.TEXT],
            voice=voice,
            input_audio_format=AudioFormat.PCM_16000HZ_MONO_16BIT,
            output_audio_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
            enable_input_audio_transcription=True,
            input_audio_transcription_model='gummy-realtime-v1',
            enable_turn_detection=True,
            turn_detection_type='server_vad',
            instructions="你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。" # 设定模型的角色
        )
        def signal_handler(sig, frame):
            print('Ctrl+C pressed, stop recognition ...')
            conversation.close()
            b64_player.shutdown()
            print('omni realtime stopped.')
            sys.exit(0)
        signal.signal(signal.SIGINT, signal_handler)
        print("Press 'Ctrl+C' to stop conversation...")
        last_photo_time = time.time()*1000
        while True:
            if mic_stream:
                audio_data = mic_stream.read(3200, exception_on_overflow=False)
                audio_b64 = base64.b64encode(audio_data).decode('ascii')
                conversation.append_audio(audio_b64)
            else:
                break

    运行vad_dash.py,通过麦克风即可与 Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。

    Manual 模式

    新建一个 python 文件,命名为manual_dash.py,并将以下代码复制进文件中:

    manual_dash.py

    # dashscope SDK 版本需不低于 1.23.9
    import os
    import base64
    import contextlib
    import queue
    import sys
    import threading
    import pyaudio
    from dashscope.audio.qwen_omni import *
    import dashscope
    # 如果没有设置环境变量,请用您的 API Key 将下行替换为dashscope.api_key = "sk-xxx"
    dashscope.api_key = os.getenv('DASHSCOPE_API_KEY')
    
    voice = 'Cherry'
    conversation = None
    
    class B64PCMPlayer:
        def __init__(self, pya: pyaudio.PyAudio, sample_rate=24000, chunk_size_ms=100):
            self.pya = pya
            self.sample_rate = sample_rate
            self.chunk_size_bytes = chunk_size_ms * sample_rate *2 // 1000
            self.player_stream = pya.open(format=pyaudio.paInt16,
                    channels=1,
                    rate=sample_rate,
                    output=True)
    
            self.raw_audio_buffer: queue.Queue = queue.Queue()
            self.b64_audio_buffer: queue.Queue = queue.Queue()
            self.status_lock = threading.Lock()
            self.status = 'playing'
            self.decoder_thread = threading.Thread(target=self.decoder_loop)
            self.player_thread = threading.Thread(target=self.player_loop)
            self.decoder_thread.start()
            self.player_thread.start()
            self.complete_event: threading.Event = None
    
        def decoder_loop(self):
            while self.status != 'stop':
                recv_audio_b64 = None
                with contextlib.suppress(queue.Empty):
                    recv_audio_b64 = self.b64_audio_buffer.get(timeout=0.1)
                if recv_audio_b64 is None:
                    continue
                recv_audio_raw = base64.b64decode(recv_audio_b64)
                # push raw audio data into queue by chunk
                for i in range(0, len(recv_audio_raw), self.chunk_size_bytes):
                    chunk = recv_audio_raw[i:i + self.chunk_size_bytes]
                    self.raw_audio_buffer.put(chunk)
    
        def player_loop(self):
            while self.status != 'stop':
                recv_audio_raw = None
                with contextlib.suppress(queue.Empty):
                    recv_audio_raw = self.raw_audio_buffer.get(timeout=0.1)
                if recv_audio_raw is None:
                    if self.complete_event:
                        self.complete_event.set()
                    continue
                # write chunk to pyaudio audio player, wait until finish playing this chunk.
                self.player_stream.write(recv_audio_raw)
    
        def cancel_playing(self):
            self.b64_audio_buffer.queue.clear()
            self.raw_audio_buffer.queue.clear()
    
        def add_data(self, data):
            self.b64_audio_buffer.put(data)
    
        def wait_for_complete(self):
            self.complete_event = threading.Event()
            self.complete_event.wait()
            self.complete_event = None
    
        def shutdown(self):
            self.status = 'stop'
            self.decoder_thread.join()
            self.player_thread.join()
            self.player_stream.close()
    
    class MyCallback(OmniRealtimeCallback):
        def __init__(self):
            super().__init__()
            self.complete_event: threading.Event = threading.Event()
        
        def wait_for_complete(self):
            self.complete_event.wait()
            self.complete_event = threading.Event()
    
        def on_open(self) -> None:
            global b64_player
            print('connection opened, init microphone')
            self.pya = pyaudio.PyAudio()
            b64_player = B64PCMPlayer(self.pya)
    
        def on_close(self, close_status_code, close_msg) -> None:
            print('connection closed with code: {}, msg: {}, destroy microphone'.format(close_status_code, close_msg))
            sys.exit(0)
    
        def on_event(self, response: str) -> None:
            global conversation
            global b64_player
            try:
                type = response['type']
                if 'session.created' == type:
                    print('start session: {}'.format(response['session']['id']))
                if 'conversation.item.input_audio_transcription.completed' == type:
                    print('question: {}'.format(response['transcript']))
                if 'response.audio_transcript.delta' == type:
                    text = response['delta']
                    print("got llm response delta: {}".format(text))
                if 'response.audio.delta' == type:
                    recv_audio_b64 = response['delta']
                    b64_player.add_data(recv_audio_b64)
                if 'response.done' == type:
                    print('======RESPONSE DONE======')
                    print('[Metric] response: {}, first text delay: {}, first audio delay: {}'.format(
                                    conversation.get_last_response_id(), 
                                    conversation.get_last_first_text_delay(), 
                                    conversation.get_last_first_audio_delay(),
                                    ))
                    if self.complete_event:
                        self.complete_event.set()
            except Exception as e:
                print('[Error] {}'.format(e))
                return
    
    class MicrophoneRecorder:
        """实时麦克风录音器"""
        def __init__(self, sample_rate=16000, channels=1, chunk_size=1024):
            self.sample_rate = sample_rate
            self.channels = channels
            self.chunk_size = chunk_size
            self.pyaudio_instance = None
            self.stream = None
            self.frames = []
            self._is_recording = False
            self._record_thread = None
    
        def _recording_thread(self):
            """录音工作线程"""
            # 在 _is_recording 为 True 期间,持续从音频流中读取数据
            while self._is_recording:
                try:
                    # 使用 exception_on_overflow=False 避免因缓冲区溢出而崩溃
                    data = self.stream.read(self.chunk_size, exception_on_overflow=False)
                    self.frames.append(data)
                except (IOError, OSError) as e:
                    # 当流被关闭时,读取操作可能会引发错误
                    print(f"录音流读取错误,可能已关闭: {e}")
                    break
    
        def start(self):
            """开始录音"""
            if self._is_recording:
                print("录音已在进行中。")
                return
            self.frames = []
            self._is_recording = True
            
            try:
                self.pyaudio_instance = pyaudio.PyAudio()
                self.stream = self.pyaudio_instance.open(
                    format=pyaudio.paInt16,
                    channels=self.channels,
                    rate=self.sample_rate,
                    input=True,
                    frames_per_buffer=self.chunk_size
                )
                self._record_thread = threading.Thread(target=self._recording_thread)
                self._record_thread.daemon = True
                self._record_thread.start()
                print("麦克风录音已开始...")
            except Exception as e:
                print(f"启动麦克风失败: {e}")
                self._is_recording = False
                self._cleanup()
                raise
    
        def stop(self):
            """停止录音并返回音频数据"""
            if not self._is_recording:
                return None 
            self._is_recording = False
            # 等待录音线程安全退出
            if self._record_thread:
                self._record_thread.join(timeout=1.0)
            self._cleanup()
            print("麦克风录音已停止。")
            return b''.join(self.frames)
    
        def _cleanup(self):
            """安全地清理 PyAudio 资源"""
            if self.stream:
                try:
                    if self.stream.is_active():
                        self.stream.stop_stream()
                    self.stream.close()
                except Exception as e:
                    print(f"关闭音频流时出错: {e}")
            
            if self.pyaudio_instance:
                try:
                    self.pyaudio_instance.terminate()
                except Exception as e:
                    print(f"终止 PyAudio 实例时出错: {e}")
            self.stream = None
            self.pyaudio_instance = None
    
    
    if __name__  == '__main__':
        print('Initializing ...')
        callback = MyCallback()
        conversation = OmniRealtimeConversation(
            model='qwen3-omni-flash-realtime',
            callback=callback, 
            )
        conversation.connect()
        conversation.update_session(
            output_modalities=[MultiModality.AUDIO, MultiModality.TEXT],
            voice=voice,
            input_audio_format=AudioFormat.PCM_16000HZ_MONO_16BIT,
            output_audio_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
            enable_input_audio_transcription=True,
            input_audio_transcription_model='gummy-realtime-v1',
            enable_turn_detection=False,
            instructions="你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。" # 设定模型的角色
        )
        try:
            turn_counter = 1
            while True:
                print(f"\n--- 第 {turn_counter} 轮对话 ---")  
                # 初始化麦克风录音
                recorder = MicrophoneRecorder(sample_rate=16000)  # 16k采样率进行语音识别
                print("准备录音。按 Enter 键开始录音 (或输入 'q' 退出)...")
                user_input = input()
                if user_input.strip().lower() in ['q', 'quit']:
                    print("用户请求退出...")
                    break
                try:
                    recorder.start()
                except Exception as e:
                    print(f"无法启动录音,请检查您的麦克风权限和设备: {e}")
                    continue
                print("录音中... 再次按 Enter 键停止录音。")
                input()
                recorded_audio = recorder.stop()           
                if not recorded_audio or len(recorded_audio) == 0:
                    print("未录制到有效音频,请重新开始本轮对话。")
                    continue 
                print(f"成功录制音频: {len(recorded_audio)}字节")
                # 将录制的音频分块发送
                chunk_size = 3200  # 与原始代码保持一致的块大小
                for i in range(0, len(recorded_audio), chunk_size):
                    audio_chunk = recorded_audio[i:i+chunk_size]
                    if audio_chunk:
                        conversation.append_audio(base64.b64encode(audio_chunk).decode('ascii'))   
                print("发送录音完成,等待响应...")
                conversation.commit()
                conversation.create_response()
                callback.wait_for_complete()
                b64_player.wait_for_complete()
                print('播放音频完成')
                turn_counter += 1 
        except KeyboardInterrupt:
            print("\n程序被用户中断")
        finally:
            print("程序退出")

    运行manual_dash.py,按 Enter 键开始说话,再按一次获取模型响应的音频。

DashScope Java SDK

选择交互模式

  • VAD 模式(Voice Activity Detection,自动检测语音起止)

    Realtime API 自动判断用户何时开始与停止说话并作出回应。

  • Manual 模式(按下即说,松开即发送)

    客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。

VAD 模式

OmniServerVad.java

// DashScope Java SDK 版本不低于2.20.9

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class OmniServerVad {
    // RealtimePcmPlayer 类定义开始
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();
    
        // 构造函数初始化音频格式和音频线路
        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }
    
        // 播放一个音频块并阻塞直到播放完成
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;
    
            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // 等待缓冲区中的音频播放完成
            Thread.sleep(audioLength - 10);
        }
    
        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }
    
        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }
    
        public void waitForComplete() throws InterruptedException {
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            line.drain();
        }
    
        public void shutdown() throws InterruptedException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();
            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    } // RealtimePcmPlayer 类定义结束
    
    public static void main(String[] args) throws InterruptedException, LineUnavailableException {
        String imageB64 = null;

        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen3-omni-flash-realtime")
                // 如果没有配置环境变量,请用您的 API Key 将下行修改为.apikey("sk-xxx")
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                .build();

        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);
        OmniRealtimeConversation conversation = null;
        final AtomicReference<OmniRealtimeConversation> conversationRef = new AtomicReference<>(null);
        conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("connection opened");
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        System.out.println("start session: " + message.get("session").getAsJsonObject().get("id").getAsString());
                        break;
                    case "conversation.item.input_audio_transcription.completed":
                        System.out.println("question: " + message.get("transcript").getAsString());
                        break;
                    case "response.audio_transcript.delta":
                        System.out.println("got llm response delta: " + message.get("delta").getAsString());
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "input_audio_buffer.speech_started":
                        System.out.println("======VAD Speech Start======");
                        audioPlayer.cancel();
                        break;
                    case "response.done":
                        System.out.println("======RESPONSE DONE======");
                        if (conversationRef.get() != null) {
                            System.out.println("[Metric] response: " + conversationRef.get().getResponseId() +
                                    ", first text delay: " + conversationRef.get().getFirstTextDelay() +
                                    " ms, first audio delay: " + conversationRef.get().getFirstAudioDelay() + " ms");
                        }
                        break;
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
            }
        });
        conversationRef.set(conversation);
        try {
            conversation.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        OmniRealtimeConfig config = OmniRealtimeConfig.builder()
        .modalities(Arrays.asList(OmniRealtimeModality.AUDIO, OmniRealtimeModality.TEXT))
        .voice("Cherry")
        .enableTurnDetection(true)
        .enableInputAudioTranscription(true)
        .InputAudioTranscription("gummy-realtime-v1")
        // 设定模型的角色
        .parameters(new HashMap<String, Object>() {{
            put("instructions","你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。"); 
        }})
        .build();
        conversation.updateSession(config);
        long last_photo_time = System.currentTimeMillis();
        try {
            // 创建音频格式
            AudioFormat audioFormat = new AudioFormat(16000, 16, 1, true, false);
            // 根据格式匹配默认录音设备
            TargetDataLine targetDataLine =
                    AudioSystem.getTargetDataLine(audioFormat);
            targetDataLine.open(audioFormat);
            // 开始录音
            targetDataLine.start();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            long start = System.currentTimeMillis();
            // 录音50s并进行实时转写
            while (System.currentTimeMillis() - start < 50000) {
                int read = targetDataLine.read(buffer.array(), 0, buffer.capacity());
                if (read > 0) {
                    buffer.limit(read);
                    String audioB64 = Base64.getEncoder().encodeToString(buffer.array());
                    // 将录音音频数据发送给流式识别服务
                    conversation.appendAudio(audioB64);
                    buffer = ByteBuffer.allocate(1024);
                    // 录音速率有限,防止cpu占用过高,休眠一小会儿
                    Thread.sleep(20);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        conversation.commit();
        conversation.createResponse(null, null);
        conversation.close(1000, "bye");
        audioPlayer.waitForComplete();
        audioPlayer.shutdown();
        System.exit(0);
    }
}

运行OmniServerVad.main()方法,通过麦克风即可与 Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。

Manual 模式

OmniWithoutServerVad.java

// DashScope Java SDK 版本不低于2.20.9

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class OmniWithoutServerVad {
    // RealtimePcmPlayer 类定义开始
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();

        // 构造函数初始化音频格式和音频线路
        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }

        // 播放一个音频块并阻塞直到播放完成
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;

            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // 等待缓冲区中的音频播放完成
            Thread.sleep(audioLength - 10);
        }

        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }

        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }

        public void waitForComplete() throws InterruptedException {
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            line.drain();
        }

        public void shutdown() throws InterruptedException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();
            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    } // RealtimePcmPlayer 类定义结束
    // 新增录音方法
    private static void recordAndSend(TargetDataLine line, OmniRealtimeConversation conversation) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        byte[] buffer = new byte[3200];
        AtomicBoolean stopRecording = new AtomicBoolean(false);

        // 启动监听Enter键的线程
        Thread enterKeyListener = new Thread(() -> {
            try {
                System.in.read();
                stopRecording.set(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        enterKeyListener.start();

        // 录音循环
        while (!stopRecording.get()) {
            int count = line.read(buffer, 0, buffer.length);
            if (count > 0) {
                out.write(buffer, 0, count);
            }
        }

        // 发送录音数据
        byte[] audioData = out.toByteArray();
        String audioB64 = Base64.getEncoder().encodeToString(audioData);
        conversation.appendAudio(audioB64);
        out.close();
    }

    public static void main(String[] args) throws InterruptedException, LineUnavailableException {
        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen3-omni-flash-realtime")
                // .apikey("your-dashscope-api-key")
                .build();
        AtomicReference<CountDownLatch> responseDoneLatch = new AtomicReference<>(null);
        responseDoneLatch.set(new CountDownLatch(1));

        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);
        final AtomicReference<OmniRealtimeConversation> conversationRef = new AtomicReference<>(null);
        OmniRealtimeConversation conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("connection opened");
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        System.out.println("start session: " + message.get("session").getAsJsonObject().get("id").getAsString());
                        break;
                    case "conversation.item.input_audio_transcription.completed":
                        System.out.println("question: " + message.get("transcript").getAsString());
                        break;
                    case "response.audio_transcript.delta":
                        System.out.println("got llm response delta: " + message.get("delta").getAsString());
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "response.done":
                        System.out.println("======RESPONSE DONE======");
                        if (conversationRef.get() != null) {
                            System.out.println("[Metric] response: " + conversationRef.get().getResponseId() +
                                    ", first text delay: " + conversationRef.get().getFirstTextDelay() +
                                    " ms, first audio delay: " + conversationRef.get().getFirstAudioDelay() + " ms");
                        }
                        responseDoneLatch.get().countDown();
                        break;
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
            }
        });
        conversationRef.set(conversation);
        try {
            conversation.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        OmniRealtimeConfig config = OmniRealtimeConfig.builder()
                .modalities(Arrays.asList(OmniRealtimeModality.AUDIO, OmniRealtimeModality.TEXT))
                .voice("Cherry")
                .enableTurnDetection(false)
                // 设定模型角色
                .parameters(new HashMap<String, Object>() {{
                    put("instructions","你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。");
                }})
                .build();
        conversation.updateSession(config);

        // 新增麦克风录音功能
        AudioFormat format = new AudioFormat(16000, 16, 1, true, false);
        DataLine.Info info = new DataLine.Info(TargetDataLine.class, format);
        
        if (!AudioSystem.isLineSupported(info)) {
            System.out.println("Line not supported");
            return;
        }
        
        TargetDataLine line = null;
        try {
            line = (TargetDataLine) AudioSystem.getLine(info);
            line.open(format);
            line.start();
            
            while (true) {
                System.out.println("按Enter开始录音...");
                System.in.read();
                System.out.println("开始录音,请说话...再次按Enter停止录音并发送");
                recordAndSend(line, conversation);
                conversation.commit();
                conversation.createResponse(null, null);
                // 重置latch以便下次等待
                responseDoneLatch.set(new CountDownLatch(1));
            }
        } catch (LineUnavailableException | IOException e) {
            e.printStackTrace();
        } finally {
            if (line != null) {
                line.stop();
                line.close();
            }
        }
    }}

运行OmniWithoutServerVad.main()方法,按 Enter 键开始说话,再按一次获取模型响应的音频。

WebSocket(Python)

  • 准备运行环境

    您的 Python 版本需要不低于 3.10。

    首先根据您的操作系统来安装 pyaudio。

    macOS

    brew install portaudio && pip install pyaudio

    Debian/Ubuntu

    sudo apt-get install python3-pyaudio
    
    或者
    
    pip install pyaudio

    CentOS

    sudo yum install -y portaudio portaudio-devel && pip install pyaudio

    Windows

    pip install pyaudio

    安装完成后,通过 pip 安装 websocket 相关的依赖:

    pip install websocket-client==1.8.0 websockets
  • 创建客户端

    在本地新建一个 python 文件,命名为omni_realtime_client.py,并将以下代码复制进文件中:

    omni_realtime_client.py

    # -- coding: utf-8 --
    
    import asyncio
    import websockets
    import json
    import base64
    import time
    from typing import Optional, Callable, List, Dict, Any
    from enum import Enum
    
    
    class TurnDetectionMode(Enum):
        SERVER_VAD = "server_vad"
        MANUAL = "manual"
    
    
    class OmniRealtimeClient:
        """
        与 Omni Realtime API 交互的演示客户端。
    
        该类提供了连接 Realtime API、发送文本和音频数据、处理响应以及管理 WebSocket 连接的相关方法。
    
        属性说明:
            base_url (str):
                Realtime API 的基础地址。
            api_key (str):
                用于身份验证的 API Key。
            model (str):
                用于聊天的 Omni 模型名称。
            voice (str):
                服务器合成语音所使用的声音。
            turn_detection_mode (TurnDetectionMode):
                轮次检测模式。
            on_text_delta (Callable[[str], None]):
                文本增量回调函数。
            on_audio_delta (Callable[[bytes], None]):
                音频增量回调函数。
            on_input_transcript (Callable[[str], None]):
                输入转录文本回调函数。
            on_interrupt (Callable[[], None]):
                用户打断回调函数,应在此停止音频播放。
            on_output_transcript (Callable[[str], None]):
                输出转录文本回调函数。
            extra_event_handlers (Dict[str, Callable[[Dict[str, Any]], None]]):
                其他事件处理器,事件名到处理函数的映射。
        """
    
        def __init__(
                self,
                base_url,
                api_key: str,
                model: str = "",
                voice: str = "Ethan",
                instructions: str = "You are a helpful assistant.",
                turn_detection_mode: TurnDetectionMode = TurnDetectionMode.MANUAL,
                on_text_delta: Optional[Callable[[str], None]] = None,
                on_audio_delta: Optional[Callable[[bytes], None]] = None,
                on_interrupt: Optional[Callable[[], None]] = None,
                on_input_transcript: Optional[Callable[[str], None]] = None,
                on_output_transcript: Optional[Callable[[str], None]] = None,
                extra_event_handlers: Optional[Dict[str, Callable[[Dict[str, Any]], None]]] = None
        ):
            self.base_url = base_url
            self.api_key = api_key
            self.model = model
            self.voice = voice
            self.instructions = instructions
            self.ws = None
            self.on_text_delta = on_text_delta
            self.on_audio_delta = on_audio_delta
            self.on_interrupt = on_interrupt
            self.on_input_transcript = on_input_transcript
            self.on_output_transcript = on_output_transcript
            self.turn_detection_mode = turn_detection_mode
            self.extra_event_handlers = extra_event_handlers or {}
    
            # 当前回复状态
            self._current_response_id = None
            self._current_item_id = None
            self._is_responding = False
            # 输入/输出转录打印状态
            self._print_input_transcript = False
            self._output_transcript_buffer = ""
    
        async def connect(self) -> None:
            """与 Realtime API 建立 WebSocket 连接。"""
            url = f"{self.base_url}?model={self.model}"
            headers = {
                "Authorization": f"Bearer {self.api_key}"
            }
    
            self.ws = await websockets.connect(url, additional_headers=headers)
    
            # 设置默认会话配置
            if self.turn_detection_mode == TurnDetectionMode.MANUAL:
                await self.update_session({
                    "modalities": ["text", "audio"],
                    "voice": self.voice,
                    "instructions": self.instructions,
                    "input_audio_format": "pcm16",
                    "output_audio_format": "pcm16",
                    "input_audio_transcription": {
                        "model": "gummy-realtime-v1"
                    },
                    "turn_detection": None
                })
            elif self.turn_detection_mode == TurnDetectionMode.SERVER_VAD:
                await self.update_session({
                    "modalities": ["text", "audio"],
                    "voice": self.voice,
                    "instructions": self.instructions,
                    "input_audio_format": "pcm16",
                    "output_audio_format": "pcm16",
                    "input_audio_transcription": {
                        "model": "gummy-realtime-v1"
                    },
                    "turn_detection": {
                        "type": "server_vad",
                        "threshold": 0.1,
                        "prefix_padding_ms": 500,
                        "silence_duration_ms": 900
                    }
                })
            else:
                raise ValueError(f"Invalid turn detection mode: {self.turn_detection_mode}")
    
        async def send_event(self, event) -> None:
            event['event_id'] = "event_" + str(int(time.time() * 1000))
            print(f" Send event: type={event['type']}, event_id={event['event_id']}")
            await self.ws.send(json.dumps(event))
    
        async def update_session(self, config: Dict[str, Any]) -> None:
            """更新会话配置。"""
            event = {
                "type": "session.update",
                "session": config
            }
            print("update session: ", event)
            await self.send_event(event)
    
        async def stream_audio(self, audio_chunk: bytes) -> None:
            """向 API 流式发送原始音频数据。"""
            # 仅支持 16bit 16kHz 单声道 PCM
            audio_b64 = base64.b64encode(audio_chunk).decode()
    
            append_event = {
                "type": "input_audio_buffer.append",
                "audio": audio_b64
            }
            await self.send_event(append_event)
    
        async def commit_audio_buffer(self) -> None:
            """提交音频缓冲区以触发处理。"""
            event = {
                "type": "input_audio_buffer.commit"
            }
            await self.send_event(event)
    
        async def append_image(self, image_chunk: bytes) -> None:
            """向视频缓冲区追加图像数据。
            图像数据可以来自本地文件,也可以来自实时视频流。
    
            注意:
                - 图像格式必须为 JPG 或 JPEG。推荐分辨率为 480P 或 720P,最高支持 1080P。
                - 单张图片大小不应超过 500KB。
                - 本方法会将图像数据编码为 Base64 后再发送。
                - 建议以每秒 2 帧的频率向服务器发送图像。
                - 在发送图像数据之前,需要先发送音频数据。
            """
            image_b64 = base64.b64encode(image_chunk).decode()
    
            event = {
                "type": "input_image_buffer.append",
                "image": image_b64
            }
            await self.send_event(event)
    
        async def create_response(self) -> None:
            """向 API 请求生成回复(仅在手动模式下需要调用)。"""
            event = {
                "type": "response.create",
                "response": {
                    "instructions": "You are a helpful assistant.",
                    "modalities": ["text", "audio"]
                }
            }
            print("create response: ", event)
            await self.send_event(event)
    
        async def cancel_response(self) -> None:
            """取消当前回复。"""
            event = {
                "type": "response.cancel"
            }
            await self.send_event(event)
    
        async def handle_interruption(self):
            """处理用户对当前回复的打断。"""
            if not self._is_responding:
                return
    
            print(" Handling interruption")
    
            # 1. 取消当前回复
            if self._current_response_id:
                await self.cancel_response()
    
            self._is_responding = False
            self._current_response_id = None
            self._current_item_id = None
    
        async def handle_messages(self) -> None:
            try:
                async for message in self.ws:
                    event = json.loads(message)
                    event_type = event.get("type")
    
                    if event_type != "response.audio.delta":
                        print(" event: ", event)
                    else:
                        print(" event_type: ", event_type)
    
                    if event_type == "error":
                        print(" Error: ", event['error'])
                        continue
                    elif event_type == "response.created":
                        self._current_response_id = event.get("response", {}).get("id")
                        self._is_responding = True
                    elif event_type == "response.output_item.added":
                        self._current_item_id = event.get("item", {}).get("id")
                    elif event_type == "response.done":
                        self._is_responding = False
                        self._current_response_id = None
                        self._current_item_id = None
                    # Handle interruptions
                    elif event_type == "input_audio_buffer.speech_started":
                        print(" Speech detected")
                        if self._is_responding:
                            print(" Handling interruption")
                            await self.handle_interruption()
    
                        if self.on_interrupt:
                            print(" Handling on_interrupt, stop playback")
                            self.on_interrupt()
                    elif event_type == "input_audio_buffer.speech_stopped":
                        print(" Speech ended")
                    # Handle normal response events
                    elif event_type == "response.text.delta":
                        if self.on_text_delta:
                            self.on_text_delta(event["delta"])
                    elif event_type == "response.audio.delta":
                        if self.on_audio_delta:
                            audio_bytes = base64.b64decode(event["delta"])
                            self.on_audio_delta(audio_bytes)
                    elif event_type == "conversation.item.input_audio_transcription.completed":
                        transcript = event.get("transcript", "")
                        if self.on_input_transcript:
                            await asyncio.to_thread(self.on_input_transcript, transcript)
                            self._print_input_transcript = True
                    elif event_type == "response.audio_transcript.delta":
                        if self.on_output_transcript:
                            delta = event.get("delta", "")
                            if not self._print_input_transcript:
                                self._output_transcript_buffer += delta
                            else:
                                if self._output_transcript_buffer:
                                    await asyncio.to_thread(self.on_output_transcript, self._output_transcript_buffer)
                                    self._output_transcript_buffer = ""
                                await asyncio.to_thread(self.on_output_transcript, delta)
                    elif event_type == "response.audio_transcript.done":
                        self._print_input_transcript = False
                    elif event_type in self.extra_event_handlers:
                        self.extra_event_handlers[event_type](event)
    
            except websockets.exceptions.ConnectionClosed:
                print(" Connection closed")
            except Exception as e:
                print(" Error in message handling: ", str(e))
    
        async def close(self) -> None:
            """关闭 WebSocket 连接。"""
            if self.ws:
                await self.ws.close()
  • 选择交互模式

    • VAD 模式(Voice Activity Detection,自动检测语音起止)

      Realtime API 自动判断用户何时开始与停止说话并作出回应。

    • Manual 模式(按下即说,松开即发送)

      客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。

    VAD 模式

    omni_realtime_client.py的同级目录下新建另一个 python 文件,命名为vad_mode.py,并将以下代码复制进文件中:

    vad_mode.py

    # -- coding: utf-8 --
    import os, time, base64, asyncio
    from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
    import pyaudio
    import queue
    import threading
    
    # 创建一个全局音频队列和播放线程
    audio_queue = queue.Queue()
    audio_player = None
    # 添加全局中断标志
    interrupt_flag = threading.Event()
    
    # 初始化PyAudio
    p = pyaudio.PyAudio()
    RATE = 24000  # 采样率 24kHz
    CHUNK = 3200  # 每个音频块的大小
    FORMAT = pyaudio.paInt16  # 16PCM格式
    CHANNELS = 1  # 单声道
    
    
    def clear_audio_queue():
        """清空音频队列"""
        with audio_queue.mutex:
            audio_queue.queue.clear()
        print("音频队列已清空")
    
    
    def handle_interrupt():
        """处理中断事件 - 立即停止音频播放"""
        print("检测到语音输入,停止音频播放")
        interrupt_flag.set()  # 设置中断标志
        clear_audio_queue()  # 清空队列
    
    
    def audio_player_thread():
        """后台线程用于播放音频数据"""
        stream = p.open(
            format=FORMAT,
            channels=CHANNELS,
            rate=24000,
            output=True,
            frames_per_buffer=CHUNK,
        )
    
        try:
            while True:
                try:
                    # 检查中断标志
                    if interrupt_flag.is_set():
                        print("音频播放被中断")
                        interrupt_flag.clear()  # 清除中断标志
                        continue
    
                    # 从队列获取音频数据
                    audio_data = audio_queue.get(block=True, timeout=0.5)
                    if audio_data is None:  # 结束信号
                        break
    
                    # 再次检查中断标志(在播放前)
                    if interrupt_flag.is_set():
                        print("音频播放被中断")
                        interrupt_flag.clear()  # 清除中断标志
                        audio_queue.task_done()
                        continue
    
                    # 播放音频数据
                    stream.write(audio_data)
                    audio_queue.task_done()
                except queue.Empty:
                    # 如果队列为空,继续等待
                    continue
        finally:
            # 清理
            stream.stop_stream()
            stream.close()
    
    
    def start_audio_player():
        """启动音频播放线程"""
        global audio_player
        if audio_player is None or not audio_player.is_alive():
            audio_player = threading.Thread(target=audio_player_thread, daemon=True)
            audio_player.start()
    
    
    def handle_audio_data(audio_data):
        """处理接收到的音频数据"""
        # 打印接收到的音频数据长度(调试用)
        print(f"\n接收到音频数据: {len(audio_data)} 字节")
        # 将音频数据放入队列
        audio_queue.put(audio_data)
    
    
    async def start_microphone_streaming(client: OmniRealtimeClient):
        CHUNK = 3200
        FORMAT = pyaudio.paInt16
        CHANNELS = 1
        RATE = 24000
    
        p = pyaudio.PyAudio()
        stream = p.open(
            format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK
        )
    
        try:
            print("开始录音,请讲话...")
            while True:
                audio_data = stream.read(CHUNK)
                encoded_data = base64.b64encode(audio_data).decode("utf-8")
    
                eventd = {
                    "event_id": "event_" + str(int(time.time() * 1000)),
                    "type": "input_audio_buffer.append",
                    "audio": encoded_data,
                }
                await client.send_event(eventd)
    
                # 保持较短的等待时间以模拟实时交互
                await asyncio.sleep(0.05)
        finally:
            stream.stop_stream()
            stream.close()
            p.terminate()
    
    
    async def main():
        # 启动音频播放线程
        start_audio_player()
    
        realtime_client = OmniRealtimeClient(
            base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime",
            api_key=os.environ.get("DASHSCOPE_API_KEY"),
            model="qwen3-omni-flash-realtime",
            voice="Cherry",
            on_text_delta=lambda text: print(f"\nAssistant: {text}", end="", flush=True),
            on_audio_delta=handle_audio_data,
            on_interrupt=handle_interrupt,  # 添加中断回调函数
            turn_detection_mode=TurnDetectionMode.SERVER_VAD,
            # 设定模型角色
            instructions="你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。"
        )
    
        try:
            await realtime_client.connect()
            # 启动消息处理和麦克风录音
            message_handler = asyncio.create_task(realtime_client.handle_messages())
            streaming_task = asyncio.create_task(
                start_microphone_streaming(realtime_client)
            )
    
            while True:
                await asyncio.Queue().get()
        except Exception as e:
            print(f"Error: {e}")
        finally:
            # 结束音频播放线程
            audio_queue.put(None)
            if audio_player:
                audio_player.join(timeout=1)
            await realtime_client.close()
            p.terminate()
    
    
    if __name__ == "__main__":
        asyncio.run(main())

    运行vad_mode.py,通过麦克风即可与 Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。

    Manual 模式

    omni_realtime_client.py的同级目录下新建另一个 python 文件,命名为manual_mode.py,并将以下代码复制进文件中:

    manual_mode.py

    # -- coding: utf-8 --
    import os
    import asyncio
    import time
    import threading
    import queue
    import pyaudio
    from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
    
    class AudioPlayer:
        """实时音频播放器类"""
        def __init__(self, sample_rate=24000, channels=1, sample_width=2):
            self.sample_rate = sample_rate
            self.channels = channels
            self.sample_width = sample_width  # 2 bytes for 16-bit
            self.audio_queue = queue.Queue()
            self.is_playing = False
            self.play_thread = None
            self.pyaudio_instance = None
            self.stream = None
            self._lock = threading.Lock()  # 添加锁来同步访问
            self._last_data_time = time.time()  # 记录最后接收数据的时间
            self._response_done = False  # 添加响应完成标志
            self._waiting_for_response = False # 标记是否正在等待服务器响应
            # 记录最后一次向音频流写入数据的时间及最近一次音频块的时长,用于更精确地判断播放结束
            self._last_play_time = time.time()
            self._last_chunk_duration = 0.0
            
        def start(self):
            """启动音频播放器"""
            with self._lock:
                if self.is_playing:
                    return
                    
                self.is_playing = True
                
                try:
                    self.pyaudio_instance = pyaudio.PyAudio()
                    
                    # 创建音频输出流
                    self.stream = self.pyaudio_instance.open(
                        format=pyaudio.paInt16,  # 16-bit
                        channels=self.channels,
                        rate=self.sample_rate,
                        output=True,
                        frames_per_buffer=1024
                    )
                    
                    # 启动播放线程
                    self.play_thread = threading.Thread(target=self._play_audio)
                    self.play_thread.daemon = True
                    self.play_thread.start()
                    
                    print("音频播放器已启动")
                except Exception as e:
                    print(f"启动音频播放器失败: {e}")
                    self._cleanup_resources()
                    raise
        
        def stop(self):
            """停止音频播放器"""
            with self._lock:
                if not self.is_playing:
                    return
                    
                self.is_playing = False
            
            # 清空队列
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except queue.Empty:
                    break
            
            # 等待播放线程结束(在锁外面等待,避免死锁)
            if self.play_thread and self.play_thread.is_alive():
                self.play_thread.join(timeout=2.0)
            
            # 再次获取锁来清理资源
            with self._lock:
                self._cleanup_resources()
            
            print("音频播放器已停止")
        
        def _cleanup_resources(self):
            """清理音频资源(必须在锁内调用)"""
            try:
                # 关闭音频流
                if self.stream:
                    if not self.stream.is_stopped():
                        self.stream.stop_stream()
                    self.stream.close()
                    self.stream = None
            except Exception as e:
                print(f"关闭音频流时出错: {e}")
            
            try:
                if self.pyaudio_instance:
                    self.pyaudio_instance.terminate()
                    self.pyaudio_instance = None
            except Exception as e:
                print(f"终止PyAudio时出错: {e}")
        
        def add_audio_data(self, audio_data):
            """添加音频数据到播放队列"""
            if self.is_playing and audio_data:
                self.audio_queue.put(audio_data)
                with self._lock:
                    self._last_data_time = time.time()  # 更新最后接收数据的时间
                    self._waiting_for_response = False # 收到数据,不再等待
        
        def stop_receiving_data(self):
            """标记不再接收新的音频数据"""
            with self._lock:
                self._response_done = True
                self._waiting_for_response = False # 响应结束,不再等待
        
        def prepare_for_next_turn(self):
            """为下一轮对话重置播放器状态。"""
            with self._lock:
                self._response_done = False
                self._last_data_time = time.time()
                self._last_play_time = time.time()
                self._last_chunk_duration = 0.0
                self._waiting_for_response = True # 开始等待下一轮响应
            
            # 清空上一轮可能残留的音频数据
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except queue.Empty:
                    break
    
        def is_finished_playing(self):
            """检查是否已经播放完所有音频数据"""
            with self._lock:
                queue_size = self.audio_queue.qsize()
                time_since_last_data = time.time() - self._last_data_time
                time_since_last_play = time.time() - self._last_play_time
                
                # ---------------------- 智能结束判定 ----------------------
                # 1. 首选:如果服务器已标记完成且播放队列为空
                #    进一步等待最近一块音频播放完毕(音频块时长 + 0.1s 容错)。
                if self._response_done and queue_size == 0:
                    min_wait = max(self._last_chunk_duration + 0.1, 0.5)  # 至少等待 0.5s
                    if time_since_last_play >= min_wait:
                        return True
    
                # 2. 备用:如果长时间没有新数据且播放队列为空
                #    当服务器没有明确发出 `response.done` 时,此逻辑作为保障
                if not self._waiting_for_response and queue_size == 0 and time_since_last_data > 1.0:
                    print("\n(超时未收到新音频,判定播放结束)")
                    return True
                
                return False
        
        def _play_audio(self):
            """播放音频数据的工作线程"""
            while True:
                # 检查是否应该停止
                with self._lock:
                    if not self.is_playing:
                        break
                    stream_ref = self.stream  # 获取流的引用
                
                try:
                    # 从队列中获取音频数据,超时0.1秒
                    audio_data = self.audio_queue.get(timeout=0.1)
                    
                    # 再次检查状态和流的有效性
                    with self._lock:
                        if self.is_playing and stream_ref and not stream_ref.is_stopped():
                            try:
                                # 播放音频数据
                                stream_ref.write(audio_data)
                                # 更新最近播放信息
                                self._last_play_time = time.time()
                                self._last_chunk_duration = len(audio_data) / (self.channels * self.sample_width) / self.sample_rate
                            except Exception as e:
                                print(f"写入音频流时出错: {e}")
                                break
                        
                    # 标记该数据块已处理完成
                    self.audio_queue.task_done()
    
                except queue.Empty:
                    # 队列为空时继续等待
                    continue
                except Exception as e:
                    print(f"播放音频时出错: {e}")
                    break
    
    class MicrophoneRecorder:
        """实时麦克风录音器"""
        def __init__(self, sample_rate=16000, channels=1, chunk_size=1024):
            self.sample_rate = sample_rate
            self.channels = channels
            self.chunk_size = chunk_size
            self.pyaudio_instance = None
            self.stream = None
            self.frames = []
            self._is_recording = False
            self._record_thread = None
    
        def _recording_thread(self):
            """录音工作线程"""
            # 在 _is_recording 为 True 期间,持续从音频流中读取数据
            while self._is_recording:
                try:
                    # 使用 exception_on_overflow=False 避免因缓冲区溢出而崩溃
                    data = self.stream.read(self.chunk_size, exception_on_overflow=False)
                    self.frames.append(data)
                except (IOError, OSError) as e:
                    # 当流被关闭时,读取操作可能会引发错误
                    print(f"录音流读取错误,可能已关闭: {e}")
                    break
    
        def start(self):
            """开始录音"""
            if self._is_recording:
                print("录音已在进行中。")
                return
            
            self.frames = []
            self._is_recording = True
            
            try:
                self.pyaudio_instance = pyaudio.PyAudio()
                self.stream = self.pyaudio_instance.open(
                    format=pyaudio.paInt16,
                    channels=self.channels,
                    rate=self.sample_rate,
                    input=True,
                    frames_per_buffer=self.chunk_size
                )
                
                self._record_thread = threading.Thread(target=self._recording_thread)
                self._record_thread.daemon = True
                self._record_thread.start()
                print("麦克风录音已开始...")
            except Exception as e:
                print(f"启动麦克风失败: {e}")
                self._is_recording = False
                self._cleanup()
                raise
    
        def stop(self):
            """停止录音并返回音频数据"""
            if not self._is_recording:
                return None
                
            self._is_recording = False
            
            # 等待录音线程安全退出
            if self._record_thread:
                self._record_thread.join(timeout=1.0)
            
            self._cleanup()
            
            print("麦克风录音已停止。")
            return b''.join(self.frames)
    
        def _cleanup(self):
            """安全地清理 PyAudio 资源"""
            if self.stream:
                try:
                    if self.stream.is_active():
                        self.stream.stop_stream()
                    self.stream.close()
                except Exception as e:
                    print(f"关闭音频流时出错: {e}")
            
            if self.pyaudio_instance:
                try:
                    self.pyaudio_instance.terminate()
                except Exception as e:
                    print(f"终止 PyAudio 实例时出错: {e}")
    
            self.stream = None
            self.pyaudio_instance = None
    
    async def interactive_test():
        """
        交互式测试脚本:允许多轮连续对话,每轮可以发送音频和图片。
        """
        # ------------------- 1. 初始化和连接 (一次性) -------------------
        api_key = os.environ.get("DASHSCOPE_API_KEY")
        if not api_key:
            print("请设置DASHSCOPE_API_KEY环境变量")
            return
    
        print("--- 实时多轮音视频对话客户端 ---")
        print("正在初始化音频播放器和客户端...")
        
        audio_player = AudioPlayer()
        audio_player.start()
    
        def on_audio_received(audio_data):
            audio_player.add_audio_data(audio_data)
    
        def on_response_done(event):
            print("\n(收到响应结束标记)")
            audio_player.stop_receiving_data()
    
        realtime_client = OmniRealtimeClient(
            base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime",
            api_key=api_key,
            model="qwen3-omni-flash-realtime",
            voice="Ethan",
            instructions="你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。", # 设定模型角色
            on_text_delta=lambda text: print(f"助手回复: {text}", end="", flush=True),
            on_audio_delta=on_audio_received,
            turn_detection_mode=TurnDetectionMode.MANUAL,
            extra_event_handlers={"response.done": on_response_done}
        )
    
        message_handler_task = None
        try:
            await realtime_client.connect()
            print("已连接到服务器。输入 'q' 或 'quit' 可随时退出程序。")
            message_handler_task = asyncio.create_task(realtime_client.handle_messages())
            await asyncio.sleep(0.5)
    
            turn_counter = 1
            # ------------------- 2. 多轮对话循环 -------------------
            while True:
                print(f"\n--- 第 {turn_counter} 轮对话 ---")
                audio_player.prepare_for_next_turn()
    
                recorded_audio = None
                image_paths = []
                
                # --- 获取用户输入:从麦克风录音 ---
                loop = asyncio.get_event_loop()
                recorder = MicrophoneRecorder(sample_rate=16000) # 推荐使用16k采样率进行语音识别
    
                print("准备录音。按 Enter 键开始录音 (或输入 'q' 退出)...")
                user_input = await loop.run_in_executor(None, input)
                if user_input.strip().lower() in ['q', 'quit']:
                    print("用户请求退出...")
                    return
                
                try:
                    recorder.start()
                except Exception:
                    print("无法启动录音,请检查您的麦克风权限和设备。跳过本轮。")
                    continue
                
                print("录音中... 再次按 Enter 键停止录音。")
                await loop.run_in_executor(None, input)
    
                recorded_audio = recorder.stop()
    
                if not recorded_audio or len(recorded_audio) == 0:
                    print("未录制到有效音频,请重新开始本轮对话。")
                    continue
    
                # --- 获取图片输入 (可选) ---
                # 以下图片输入功能已被注释,暂时禁用。若需启用请取消下方代码注释。
                # print("\n请逐行输入【图片文件】的绝对路径 (可选)。完成后,输入 's' 或按 Enter 发送请求。")
                # while True:
                #     path = input("图片路径: ").strip()
                #     if path.lower() == 's' or path == '':
                #         break
                #     if path.lower() in ['q', 'quit']:
                #         print("用户请求退出...")
                #         return
                #     
                #     if not os.path.isabs(path):
                #         print("错误: 请输入绝对路径。")
                #         continue
                #     if not os.path.exists(path):
                #         print(f"错误: 文件不存在 -> {path}")
                #         continue
                #     image_paths.append(path)
                #     print(f"已添加图片: {os.path.basename(path)}")
                
                # --- 3. 发送数据并获取响应 ---
                print("\n--- 输入确认 ---")
                print(f"待处理音频: 1个 (来自麦克风), 图片: {len(image_paths)}个")
                print("------------------")
    
                # 3.1 发送录制的音频
                try:
                    print(f"发送麦克风录音 ({len(recorded_audio)}字节)")
                    await realtime_client.stream_audio(recorded_audio)
                    await asyncio.sleep(0.1)
                except Exception as e:
                    print(f"发送麦克风录音失败: {e}")
                    continue
    
                # 3.2 发送所有图片文件
                # 以下图片发送代码已被注释,暂时禁用。
                # for i, path in enumerate(image_paths):
                #     try:
                #         with open(path, "rb") as f:
                #             data = f.read()
                #         print(f"发送图片 {i+1}: {os.path.basename(path)} ({len(data)}字节)")
                #         await realtime_client.append_image(data)
                #         await asyncio.sleep(0.1)
                #     except Exception as e:
                #         print(f"发送图片 {os.path.basename(path)} 失败: {e}")
    
                # 3.3 提交并等待响应
                print("提交所有输入,请求服务器响应...")
                await realtime_client.commit_audio_buffer()
                await realtime_client.create_response()
    
                print("等待并播放服务器响应音频...")
                start_time = time.time()
                max_wait_time = 60
                while not audio_player.is_finished_playing():
                    if time.time() - start_time > max_wait_time:
                        print(f"\n等待超时 ({max_wait_time}秒), 进入下一轮。")
                        break
                    await asyncio.sleep(0.2)
                
                print("\n本轮音频播放完成!")
                turn_counter += 1
    
        except (asyncio.CancelledError, KeyboardInterrupt):
            print("\n程序被中断。")
        except Exception as e:
            print(f"发生未处理的错误: {e}")
        finally:
            # ------------------- 4. 清理资源 -------------------
            print("\n正在关闭连接并清理资源...")
            if message_handler_task and not message_handler_task.done():
                message_handler_task.cancel()
            
            if 'realtime_client' in locals() and realtime_client.ws and not realtime_client.ws.close:
                await realtime_client.close()
                print("连接已关闭。")
    
            audio_player.stop()
            print("程序退出。")
    
    if __name__ == "__main__":
        try:
            asyncio.run(interactive_test())
        except KeyboardInterrupt:
            print("\n程序被用户强制退出。") 

    运行manual_mode.py,按 Enter 键开始说话,再按一次获取模型响应的音频。

交互流程

VAD 模式

session.update事件的session.turn_detection 设为"server_vad"以启用 VAD 模式。此模式下,服务端自动检测语音起止并进行响应。适用于语音通话场景。

交互流程如下:

  1. 服务端检测到语音开始,发送input_audio_buffer.speech_started 事件。

  2. 客户端随时发送 input_audio_buffer.append事件追加音频至缓冲区。

  3. 服务端检测到语音结束,发送input_audio_buffer.speech_stopped事件。

  4. 服务端发送input_audio_buffer.committed 事件提交音频缓冲区。

  5. 服务端发送 conversation.item.created事件,包含从缓冲区创建的用户消息项。

生命周期

客户端事件

服务器事件

会话初始化

session.update

会话配置

session.created

会话已创建

session.updated

会话配置已更新

用户音频输入

input_audio_buffer.append

添加音频到缓冲区

input_image_buffer.append

添加图片到缓冲区

input_audio_buffer.speech_started

检测到语音开始

input_audio_buffer.speech_stopped

检测到语音结束

input_audio_buffer.committed

服务器收到提交的音频

服务器音频输出

response.created

服务端开始生成响应

response.output_item.added

响应时有新的输出内容

conversation.item.created

对话项被创建

response.content_part.added

新的输出内容添加到assistant message

response.audio_transcript.delta

增量生成的转录文字

response.audio.delta

模型增量生成的音频

response.audio_transcript.done

文本转录完成

response.audio.done

音频生成完成

response.content_part.done

Assistant mesasge 的文本或音频内容流式输出完成

response.output_item.done

Assistant mesasge 的整个输出项流式传输完成

response.done

响应完成

Manual 模式

session.update事件的session.turn_detection 设为 None以启用 Manual 模式。此模式下,客户端通过显式发送input_audio_buffer.commitresponse.create事件请求服务器响应。适用于按下即说场景,如聊天软件中的发送语音。

交互流程如下:

  1. 客户端发送 input_audio_buffer.append事件追加音频到缓冲区。

    也可以发送 input_image_buffer.append 事件追加图像到缓冲区。
  2. 客户端发送input_audio_buffer.commit事件提交音频缓冲区与图像缓冲区,创建新用户消息项。

  3. 服务端响应 input_audio_buffer.committed事件。

  4. 客户端发送response.create事件,等待服务端返回模型的输出。

  5. 服务端响应conversation.item.created事件。

生命周期

客户端事件

服务器事件

会话初始化

session.update

会话配置

session.created

会话已创建

session.updated

会话配置已更新

用户音频输入

input_audio_buffer.append

添加音频到缓冲区

input_image_buffer.append

添加图片到缓冲区

input_audio_buffer.commit

提交音频与图片到服务器

response.create

创建模型响应

input_audio_buffer.committed

服务器收到提交的音频

服务器音频输出

input_audio_buffer.clear

清除缓冲区的音频

response.created

服务端开始生成响应

response.output_item.added

响应时有新的输出内容

conversation.item.created

对话项被创建

response.content_part.added

新的输出内容添加到assistant message 项

response.audio_transcript.delta

增量生成的转录文字

response.audio.delta

模型增量生成的音频

response.audio_transcript.done

完成文本转录

response.audio.done

完成音频生成

response.content_part.done

Assistant mesasge 的文本或音频内容流式输出完成

response.output_item.done

Assistant mesasge 的整个输出项流式传输完成

response.done

响应完成

API 参考

计费与限流

计费规则

Qwen-Omni 实时模型根据不同模态(音频、图像、视频)对应的Token数计费。计费详情请参见模型列表

音频、图片与视频转换为Token数的规则

音频

  • Qwen3-Omni-Flash-Realtime:总 Tokens 数 = 音频时长(单位:秒)* 12.5

  • Qwen-Omni-Turbo-Realtime:总 Tokens 数 = 音频时长(单位:秒)* 25

若音频时长不足1秒,则按 1 秒计算。

图片

28x28像素对应1个 Token,1张图最少需要 4个 Token,最多需要 1280个 Token。您可以通过运行以下代码来估计传入图片的 Token。

import math
# 使用以下命令安装Pillow库:pip install Pillow
from PIL import Image

def token_calculate(image_path):
    # 打开指定的PNG图片文件
    image = Image.open(image_path)
    # 获取图片的原始尺寸
    height = image.height
    width = image.width
    # 将高度调整为28的整数倍
    h_bar = round(height / 28) * 28
    # 将宽度调整为28的整数倍
    w_bar = round(width / 28) * 28
    # 图像的Token下限:4个Token
    min_pixels = 28 * 28 * 4
    # 图像的Token上限:1280个Token
    max_pixels = 1280 * 28 * 28
    # 对图像进行缩放处理,调整像素的总数在范围[min_pixels,max_pixels]内
    if h_bar * w_bar > max_pixels:
        # 计算缩放因子beta,使得缩放后的图像总像素数不超过max_pixels
        beta = math.sqrt((height * width) / max_pixels)
        # 重新计算调整后的高度,确保为28的整数倍
        h_bar = math.floor(height / beta / 28) * 28
        # 重新计算调整后的宽度,确保为28的整数倍
        w_bar = math.floor(width / beta / 28) * 28
    elif h_bar * w_bar < min_pixels:
        # 计算缩放因子beta,使得缩放后的图像总像素数不低于min_pixels
        beta = math.sqrt(min_pixels / (height * width))
        # 重新计算调整后的高度,确保为28的整数倍
        h_bar = math.ceil(height * beta / 28) * 28
        # 重新计算调整后的宽度,确保为28的整数倍
        w_bar = math.ceil(width * beta / 28) * 28
    print(f"缩放后的图像尺寸为:高度为{h_bar},宽度为{w_bar}")
    # 计算图像的Token数:总像素除以28 * 28
    token = int((h_bar * w_bar) / (28 * 28))
    # 系统会自动添加<|vision_bos|>和<|vision_eos|>视觉标记(各1个Token)
    total_token = token + 2
    print(f"图像的Token数为{total_token}")    
    return total_token
if __name__ == "__main__":
    total_token = token_calculate(image_path="test.png")

视频

视频文件的 Token 分为 video_tokens(视觉)与 audio_tokens(音频)。

  • video_tokens

    计算过程较为复杂。请参见以下代码:

    # 使用前安装:pip install opencv-python
    import math
    import os
    import logging
    import cv2
    
    logger = logging.getLogger(__name__)
    
    # 固定参数
    FRAME_FACTOR = 2
    IMAGE_FACTOR = 28
    # 视频帧的长宽比
    MAX_RATIO = 200
    
    # 视频帧的 Token 下限
    VIDEO_MIN_PIXELS = 128 * 28 * 28
    # 视频帧的 Token 上限
    VIDEO_MAX_PIXELS = 768 * 28 * 28
    
    # Qwen-Omni 模型 FPS 为 2
    FPS = 2
    # 最少抽取帧数
    FPS_MIN_FRAMES = 4
    # 最大抽取帧数
    # Qwen3-Omni-Flash模型的大抽取帧数:128
    # Qwen3-Omni-Turbo模型的大抽取帧数:80
    FPS_MAX_FRAMES = 128
    
    # 视频输入的最大像素值
    VIDEO_TOTAL_PIXELS = 65536 * 28 * 28
    
    def round_by_factor(number, factor):
        return round(number / factor) * factor
    
    def ceil_by_factor(number, factor):
        return math.ceil(number / factor) * factor
    
    def floor_by_factor(number, factor):
        return math.floor(number / factor) * factor
    
    def get_video(video_path):
        cap = cv2.VideoCapture(video_path)
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        video_fps = cap.get(cv2.CAP_PROP_FPS)
        cap.release()
        return frame_height, frame_width, total_frames, video_fps
    
    def smart_nframes(total_frames, video_fps):
        min_frames = ceil_by_factor(FPS_MIN_FRAMES, FRAME_FACTOR)
        max_frames = floor_by_factor(min(FPS_MAX_FRAMES, total_frames), FRAME_FACTOR)
        duration = total_frames / video_fps if video_fps != 0 else 0
        if duration - int(duration) > (1 / FPS):
            total_frames = math.ceil(duration * video_fps)
        else:
            total_frames = math.ceil(int(duration) * video_fps)
        nframes = total_frames / video_fps * FPS
        nframes = int(min(min(max(nframes, min_frames), max_frames), total_frames))
        if not (FRAME_FACTOR <= nframes <= total_frames):
            raise ValueError(f"nframes should in interval [{FRAME_FACTOR}, {total_frames}], but got {nframes}.")
        return nframes
    
    def smart_resize(height, width, nframes, factor=IMAGE_FACTOR):
        min_pixels = VIDEO_MIN_PIXELS
        total_pixels = VIDEO_TOTAL_PIXELS
        max_pixels = max(min(VIDEO_MAX_PIXELS, total_pixels / nframes * FRAME_FACTOR), int(min_pixels * 1.05))
        if max(height, width) / min(height, width) > MAX_RATIO:
            raise ValueError(f"absolute aspect ratio must be smaller than {MAX_RATIO}, got {max(height, width) / min(height, width)}")
        h_bar = max(factor, round_by_factor(height, factor))
        w_bar = max(factor, round_by_factor(width, factor))
        if h_bar * w_bar > max_pixels:
            beta = math.sqrt((height * width) / max_pixels)
            h_bar = floor_by_factor(height / beta, factor)
            w_bar = floor_by_factor(width / beta, factor)
        elif h_bar * w_bar < min_pixels:
            beta = math.sqrt(min_pixels / (height * width))
            h_bar = ceil_by_factor(height * beta, factor)
            w_bar = ceil_by_factor(width * beta, factor)
        return h_bar, w_bar
    
    def video_token_calculate(video_path):
        height, width, total_frames, video_fps = get_video(video_path)
        nframes = smart_nframes(total_frames, video_fps)
        resized_height, resized_width = smart_resize(height, width, nframes)
        video_token = int(math.ceil(nframes / FPS) * resized_height / 28 * resized_width / 28)
        video_token += 2  # 视觉标记
        return video_token
    
    if __name__ == "__main__":
        video_path = "spring_mountain.mp4"  # 你的视频路径
        video_token = video_token_calculate(video_path)
        print("video_tokens:", video_token)
  • audio_tokens

    • Qwen3-Omni-Flash-Realtime:Tokens数 = 音频时长(单位:秒)* 12.5

    • Qwen-Omni-Turbo-Realtime:Tokens数 = 音频时长(单位:秒)* 25

    若音频时长不足1秒,则按 1 秒计算。

免费额度

关于免费额度的领取、查询、使用方法等详情,请参见新人免费额度

限流

模型限流规则及常见问题,请参见限流

常见问题

Q1:如何在线体验 Qwen-Omni-Realtime 模型?

A:您可以通过以下方式一键部署:

  1. 访问函数计算模板部署类型选择直接部署百炼 API-KEY 填入您的 API Key;单击创建并部署默认环境

  2. 等待约一分钟,在环境详情环境信息中获取访问域名将访问域名的http改成https(示例:https://omni-realtime.fcv3.xxxx.cn-hangzhou.fc.devsapp.net),通过该 HTTPS 链接与模型进行视频/语音通话。

通过资源信息-函数资源查看项目源代码。
函数计算阿里云百炼均为新用户提供免费额度,可以覆盖简单调试所需成本,额度耗尽后按量计费。只有在访问的情况下会产生费用。

Q2:怎么向模型输入图片?

A:通过客户端发送 input_image_buffer.append 事件。

若用于视频通话场景,可以对视频抽帧,以不超过每秒两帧的速度发送 input_image_buffer.append 事件。 DashScope SDK 代码请参见Omni-Realtime 示例代码

错误码

如果模型调用失败并返回报错信息,请参见错误信息进行解决。

音色列表

Qwen3-Omni-Flash-Realtime

音色名

voice参数

音色效果

描述

支持的语种

芊悦

Cherry

阳光积极、亲切自然小姐姐。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

晨煦

Ethan

标准普通话,带部分北方口音。阳光、温暖、活力、朝气。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

不吃鱼

Nofish

不会翘舌音的设计师。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

詹妮弗

Jennifer

品牌级、电影质感般美语女声。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

甜茶

Ryan

节奏拉满,戏感炸裂,真实与张力共舞。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

卡捷琳娜

Katerina

御姐音色,韵律回味十足。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

墨讲师

Elias

既保持学科严谨性,又通过叙事技巧将复杂知识转化为可消化的认知模块。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

上海-阿珍

Jada

风风火火的沪上阿姐。

中文(上海话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

北京-晓东

Dylan

北京胡同里长大的少年。

中文(北京话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

四川-晴儿

Sunny

甜到你心里的川妹子。

中文(四川话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

南京-老李

Li

耐心的瑜伽老师

中文(南京话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

陕西-秦川

Marcus

面宽话短,心实声沉——老陕的味道。

中文(陕西话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

闽南-阿杰

Roy

诙谐直爽、市井活泼的台湾哥仔形象。

中文(闽南语)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

天津-李彼得

Peter

天津相声,专业捧人。

中文(天津话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

粤语-阿强

Rocky

幽默风趣的阿强,在线陪聊。

中文(粤语)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

粤语-阿清

Kiki

甜美的港妹闺蜜。

中文(粤语)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

四川-程川

Eric

一个跳脱市井的四川成都男子。

中文(四川话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

Qwen-Omni-Turbo-Realtime

音色名

voice参数

音色效果

描述

支持的语种

芊悦

Cherry

阳光积极、亲切自然小姐姐。

中文、英语

苏瑶

Serena

温柔小姐姐。

中文、英语

晨煦

Ethan

标准普通话,带部分北方口音。阳光、温暖、活力、朝气。

中文、英语

千雪

Chelsie

二次元虚拟女友。

中文、英语