实时多模态(Qwen-Omni-Realtime)

Qwen-Omni-Realtime 是通义千问推出的一款实时音视频聊天模型。它能够同时理解流式的音频与图像输入(例如从视频流中实时抽取的连续图像帧),并实时输出高质量的文本与音频。

在线体验请参见如何在线体验 Qwen-Omni-Realtime 模型?

如何使用

1. 建立连接

Qwen-Omni-Realtime 模型通过 WebSocket 协议接入,可通过以下 Python 示例代码建立连接。也可通过DashScope SDK 建立连接。

WebSocket 原生连接

连接时需要以下配置项:

配置项

说明

调用地址

中国大陆(北京):wss://dashscope.aliyuncs.com/api-ws/v1/realtime

国际(新加坡):wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime

查询参数

查询参数为model,需指定为访问的模型名。示例:?model=qwen3-omni-flash-realtime

请求头

使用 Bearer Token 鉴权:Authorization: Bearer DASHSCOPE_API_KEY

DASHSCOPE_API_KEY 是您在百炼上申请的API Key
# pip install websocket-client
import json
import websocket
import os

API_KEY=os.getenv("DASHSCOPE_API_KEY")
API_URL = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen3-omni-flash-realtime"

headers = [
    "Authorization: Bearer " + API_KEY
]

def on_open(ws):
    print(f"Connected to server: {API_URL}")
def on_message(ws, message):
    data = json.loads(message)
    print("Received event:", json.dumps(data, indent=2))
def on_error(ws, error):
    print("Error:", error)

ws = websocket.WebSocketApp(
    API_URL,
    header=headers,
    on_open=on_open,
    on_message=on_message,
    on_error=on_error
)

ws.run_forever()

DashScope SDK

# SDK 版本不低于1.23.9
import os
import json
from dashscope.audio.qwen_omni import OmniRealtimeConversation,OmniRealtimeCallback
import dashscope
# 若没有配置 API Key,请将下行改为 dashscope.api_key = "sk-xxx"
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")

class PrintCallback(OmniRealtimeCallback):
    def on_open(self) -> None:
        print("Connected Successfully")
    def on_event(self, response: dict) -> None:
        print("Received event:")
        print(json.dumps(response, indent=2, ensure_ascii=False))
    def on_close(self, close_status_code: int, close_msg: str) -> None:
        print(f"Connection closed (code={close_status_code}, msg={close_msg}).")

callback = PrintCallback()
conversation = OmniRealtimeConversation(
    model="qwen3-omni-flash-realtime",
    callback=callback,
    # 以下为北京地域url,若使用新加坡地域的模型,需将url替换为:wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime
    url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime"
)
try:
    conversation.connect()
    print("Conversation started. Press Ctrl+C to exit.")
    conversation.thread.join()
except KeyboardInterrupt:
    conversation.close()
// SDK 版本不低于 2.20.9
import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import java.util.concurrent.CountDownLatch;

public class Main {
    public static void main(String[] args) throws InterruptedException, NoApiKeyException {
        CountDownLatch latch = new CountDownLatch(1);
        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen3-omni-flash-realtime")
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                // 以下为北京地域url,若使用新加坡地域的模型,需将url替换为:wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime
                .url("wss://dashscope.aliyuncs.com/api-ws/v1/realtime")
                .build();

        OmniRealtimeConversation conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("Connected Successfully");
            }
            @Override
            public void onEvent(JsonObject message) {
                System.out.println(message);
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
                latch.countDown();
            }
        });
        conversation.connect();
        latch.await();
        conversation.close(1000, "bye");
        System.exit(0);
    }
}

2. 配置会话

发送客户端事件session.update

{
    // 该事件的id,由客户端生成
    "event_id": "event_ToPZqeobitzUJnt3QqtWg",
    // 事件类型,固定为session.update
    "type": "session.update",
    // 会话配置
    "session": {
        // 输出模态,支持设置为["text"](仅输出文本)或["text","audio"](输出文本与音频)。
        "modalities": [
            "text",
            "audio"
        ],
        // 输出音频的音色
        "voice": "Chelsie",
        // 输入音频格式,仅支持设为pcm16。
        "input_audio_format": "pcm16",
        // 输出音频格式,仅支持设为pcm24。
        "output_audio_format": "pcm24",
        // 系统消息,用于设定模型的目标或角色。
        "instructions": "你是某五星级酒店的AI客服专员,请准确且友好地解答客户关于房型、设施、价格、预订政策的咨询。请始终以专业和乐于助人的态度回应,杜绝提供未经证实或超出酒店服务范围的信息。",
        // 是否开启语音活动检测。若需启用,需传入一个配置对象,服务端将据此自动检测语音起止。
        // 设置为null表示由客户端决定何时发起模型响应。
        "turn_detection": {
            // VAD类型,需设置为server_vad。
            "type": "server_vad",
            // VAD检测阈值。建议在嘈杂的环境中增加,在安静的环境中降低。
            "threshold": 0.5,
            // 检测语音停止的静音持续时间,超过此值后会触发模型响应
            "silence_duration_ms": 800
        }
    }
}

3. 输入音频与图片

客户端通过input_audio_buffer.append和 input_image_buffer.append 事件发送 Base64 编码的音频和图片数据到服务端缓冲区。音频输入是必需的;图片输入是可选的。

图片可以来自本地文件,或从视频流中实时采集。
启用服务端VAD时,服务端会在检测到语音结束时自动提交数据并触发响应。禁用VAD时(手动模式),客户端必须在发送完数据后,主动调用input_audio_buffer.commit事件来提交。

4. 接收模型响应

模型的响应格式取决于配置的输出模态。

模型列表

Qwen3-Omni-Flash-Realtime 是通义千问最新推出的实时多模态模型,相比于上一代的 Qwen-Omni-Turbo-Realtime(后续不再更新):

  • 支持的语言

    增加至 10 种,包括汉语(支持普通话及多种主流方言,如上海话、粤语、四川话等)、英语,法语、德语、俄语、意语、西语、葡语、日语、韩语,Qwen-Omni-Turbo-Realtime 仅支持 2 种(汉语(普通话)和英语)。

  • 支持的音色

    增加至 17 种,Qwen-Omni-Turbo-Realtime 仅支持 4 种。音色列表

中国大陆(北京)

模型名称

版本

上下文长度

最大输入

最大输出

免费额度

(注)

(Token数)

qwen3-omni-flash-realtime

当前能力等同 qwen3-omni-flash-realtime-2025-09-15

稳定版

65,536

49,152

16,384

100Token(不区分模态)

有效期:百炼开通后90天内

qwen3-omni-flash-realtime-2025-09-15

快照版

更多模型

模型名称

版本

上下文长度

最大输入

最大输出

免费额度

(注)

(Token数)

qwen-omni-turbo-realtime

当前能力等同 qwen-omni-turbo-realtime-2025-05-08

稳定版

32,768

30,720

2,048

100Token(不区分模态)

有效期:百炼开通后90天内

qwen-omni-turbo-realtime-latest

能力始终等同最新快照版

最新版

qwen-omni-turbo-realtime-2025-05-08

快照版

国际(新加坡)

模型名称

版本

上下文长度

最大输入

最大输出

免费额度

(注)

(Token数)

qwen3-omni-flash-realtime

当前能力等同 qwen3-omni-flash-realtime-2025-09-15

稳定版

65,536

49,152

16,384

无免费额度

qwen3-omni-flash-realtime-2025-09-15

快照版

更多模型

模型名称

版本

上下文长度

最大输入

最大输出

免费额度

(注)

(Token数)

qwen-omni-turbo-realtime

当前能力等同 qwen-omni-turbo-realtime-2025-05-08

稳定版

32,768

30,720

2,048

无免费额度

qwen-omni-turbo-realtime-latest

能力始终等同最新快照版

最新版

qwen-omni-turbo-realtime-2025-05-08

快照版

快速开始

您需要获取API Key配置API Key到环境变量

请选择您熟悉的编程语言,通过以下步骤快速体验与 Realtime 模型实时对话的功能。

DashScope Python SDK

  • 准备运行环境

您的 Python 版本需要不低于 3.10。

首先根据您的操作系统安装 pyaudio。

macOS

brew install portaudio && pip install pyaudio

Debian/Ubuntu

  • 若未使用虚拟环境,可直接通过系统包管理器安装:

    sudo apt-get install python3-pyaudio
  • 若使用虚拟环境,需先安装编译依赖:

    sudo apt update
    sudo apt install -y python3-dev portaudio19-dev

    然后在已激活的虚拟环境中使用 pip 安装:

    pip install pyaudio

CentOS

sudo yum install -y portaudio portaudio-devel && pip install pyaudio

Windows

pip install pyaudio

安装完成后,通过 pip 安装依赖:

pip install websocket-client dashscope
  • 选择交互模式

    • VAD 模式(Voice Activity Detection,自动检测语音起止)

      服务端自动判断用户何时开始与停止说话并作出回应。

    • Manual 模式(按下即说,松开即发送)

      客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。

    VAD 模式

    新建一个 python 文件,命名为vad_dash.py,并将以下代码复制到文件中:

    vad_dash.py

    # 依赖:dashscope >= 1.23.9,pyaudio
    import os
    import base64
    import signal
    import sys
    import time
    import pyaudio
     
    from dashscope.audio.qwen_omni import *
    import dashscope
    # 如果没有设置环境变量,请用您的 API Key 将下行替换为 dashscope.api_key = "sk-xxx"
    dashscope.api_key = os.getenv('DASHSCOPE_API_KEY')
    voice = 'Cherry'  # 指定返回音色
    
    class MyCallback(OmniRealtimeCallback):
        """最简回调:建立连接时初始化音频设备,事件中播放返回音频。"""
        def __init__(self, ctx):
            super().__init__()
            self.ctx = ctx
    
        def on_open(self) -> None:
            # 连接建立后初始化麦克风(16k/mono/16bit)与扬声器(24k/mono/16bit)
            print('connection opened')
            try:
                self.ctx['pya'] = pyaudio.PyAudio()
                self.ctx['mic'] = self.ctx['pya'].open(
                    format=pyaudio.paInt16,
                    channels=1,
                    rate=16000,
                    input=True
                )
                self.ctx['out'] = self.ctx['pya'].open(
                    format=pyaudio.paInt16,
                    channels=1,
                    rate=24000,
                    output=True
                )
                self.ctx['ready'] = True  # 通知主循环可以开始采集
                print('audio initialized')
            except Exception as e:
                print('[Error] audio init failed: {}'.format(e))
                self.ctx['ready'] = True  # 失败也放行,便于主循环及时退出
    
        def on_close(self, close_status_code, close_msg) -> None:
            print('connection closed with code: {}, msg: {}'.format(close_status_code, close_msg))
            sys.exit(0)
    
        def on_event(self, response: str) -> None:
            # 用字典分发事件,保留最少必要事件
            try:
                t = response['type']
                handlers = {
                    'session.created': lambda r: print('start session: {}'.format(r['session']['id'])),
                    'conversation.item.input_audio_transcription.completed': lambda r: print('question: {}'.format(r['transcript'])),
                    'response.audio_transcript.delta': lambda r: print('llm text: {}'.format(r['delta'])),
                    'response.audio.delta': self._play_audio,
                }
                h = handlers.get(t)
                if h:
                    h(response)
            except Exception as e:
                print('[Error] {}'.format(e))
    
        def _play_audio(self, response):
            # 直接解码base64并写入输出流进行播放
            if self.ctx['out'] is None:
                return
            try:
                data = base64.b64decode(response['delta'])
                self.ctx['out'].write(data)
            except Exception as e:
                print('[Error] audio playback failed: {}'.format(e))
    
    def shutdown_ctx(ctx):
        """安全释放音频与PyAudio资源。"""
        try:
            if ctx['mic'] is not None:
                ctx['mic'].close()
                ctx['mic'] = None
        except Exception:
            pass
        try:
            if ctx['out'] is not None:
                ctx['out'].close()
                ctx['out'] = None
        except Exception:
            pass
        try:
            if ctx['pya'] is not None:
                ctx['pya'].terminate()
                ctx['pya'] = None
        except Exception:
            pass
    
    if __name__  == '__main__':
        print('Initializing ...')
        # 运行时上下文:存放音频与会话句柄
        ctx = {'pya': None, 'mic': None, 'out': None, 'conv': None, 'ready': False}
        callback = MyCallback(ctx)
        conv = OmniRealtimeConversation(
            model='qwen3-omni-flash-realtime',
            callback=callback,
        )
        try:
            conv.connect()  # 建立实时WebSocket连接
        except Exception as e:
            print('[Error] connect failed: {}'.format(e))
            sys.exit(1)
    
        ctx['conv'] = conv
        # 会话配置:启用文本+音频输出,VAD开关与转写模型
        conv.update_session(
            output_modalities=[MultiModality.AUDIO, MultiModality.TEXT],
            voice=voice,
            input_audio_format=AudioFormat.PCM_16000HZ_MONO_16BIT,
            output_audio_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
            enable_input_audio_transcription=True,
            # 对输入音频做语音转录的模型,仅支持gummy-realtime-v1
            input_audio_transcription_model='gummy-realtime-v1',
            enable_turn_detection=True,
            turn_detection_type='server_vad',
            instructions="你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。"
        )
    
        def signal_handler(sig, frame):
            # Ctrl+C 时优雅关闭会话并释放音频资源
            print('Ctrl+C pressed, stop recognition ...')
            if ctx['conv'] is not None:
                ctx['conv'].close()
            shutdown_ctx(ctx)
            print('omni realtime stopped.')
            sys.exit(0)
    
        signal.signal(signal.SIGINT, signal_handler)
        print("Press 'Ctrl+C' to stop conversation...")
    
        # 主循环:每100ms读取一帧(3200字节=16kHz*16bit*0.1s)并发送
        while True:
            if not ctx['ready'] or ctx['mic'] is None:
                time.sleep(0.05)
                continue
            audio_data = ctx['mic'].read(3200, exception_on_overflow=False)
            audio_b64 = base64.b64encode(audio_data).decode('ascii')
            conv.append_audio(audio_b64)

    运行vad_dash.py,通过麦克风即可与 Qwen-Omni-Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。

    Manual 模式

    新建一个 python 文件,命名为manual_dash.py,并将以下代码复制进文件中:

    manual_dash.py

    # 依赖:dashscope >= 1.23.9,pyaudio。
    import os
    import base64
    import sys
    import threading
    import pyaudio
    from dashscope.audio.qwen_omni import *
    import dashscope
    
    # 如果没有设置环境变量,请用您的 API Key 将下行替换为 dashscope.api_key = "sk-xxx"
    dashscope.api_key = os.getenv('DASHSCOPE_API_KEY')
    voice = 'Cherry'
    
    class MyCallback(OmniRealtimeCallback):
        """最简回调:建立连接时初始化扬声器,事件中直接播放返回音频。"""
        def __init__(self, ctx):
            super().__init__()
            self.ctx = ctx
    
        def on_open(self) -> None:
            # 连接建立后初始化 PyAudio 与扬声器(24k/mono/16bit)
            print('connection opened')
            try:
                self.ctx['pya'] = pyaudio.PyAudio()
                self.ctx['out'] = self.ctx['pya'].open(
                    format=pyaudio.paInt16,
                    channels=1,
                    rate=24000,
                    output=True
                )
                print('audio output initialized')
            except Exception as e:
                print('[Error] audio init failed: {}'.format(e))
    
        def on_close(self, close_status_code, close_msg) -> None:
            print('connection closed with code: {}, msg: {}'.format(close_status_code, close_msg))
            sys.exit(0)
    
        def on_event(self, response: str) -> None:
            try:
                t = response['type']
                handlers = {
                    'session.created': lambda r: print('start session: {}'.format(r['session']['id'])),
                    'conversation.item.input_audio_transcription.completed': lambda r: print('question: {}'.format(r['transcript'])),
                    'response.audio_transcript.delta': lambda r: print('llm text: {}'.format(r['delta'])),
                    'response.audio.delta': self._play_audio,
                    'response.done': self._response_done,
                }
                h = handlers.get(t)
                if h:
                    h(response)
            except Exception as e:
                print('[Error] {}'.format(e))
    
        def _play_audio(self, response):
            # 直接解码base64并写入输出流进行播放
            if self.ctx['out'] is None:
                return
            try:
                data = base64.b64decode(response['delta'])
                self.ctx['out'].write(data)
            except Exception as e:
                print('[Error] audio playback failed: {}'.format(e))
    
        def _response_done(self, response):
            # 标记本轮对话完成,用于主循环等待
            if self.ctx['conv'] is not None:
                print('[Metric] response: {}, first text delay: {}, first audio delay: {}'.format(
                    self.ctx['conv'].get_last_response_id(),
                    self.ctx['conv'].get_last_first_text_delay(),
                    self.ctx['conv'].get_last_first_audio_delay(),
                ))
            if self.ctx['resp_done'] is not None:
                self.ctx['resp_done'].set()
    
    def shutdown_ctx(ctx):
        """安全释放音频与PyAudio资源。"""
        try:
            if ctx['out'] is not None:
                ctx['out'].close()
                ctx['out'] = None
        except Exception:
            pass
        try:
            if ctx['pya'] is not None:
                ctx['pya'].terminate()
                ctx['pya'] = None
        except Exception:
            pass
    
    
    def record_until_enter(pya_inst: pyaudio.PyAudio, sample_rate=16000, chunk_size=3200):
        """按 Enter 停止录音,返回PCM字节。"""
        frames = []
        stop_evt = threading.Event()
    
        stream = pya_inst.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=sample_rate,
            input=True,
            frames_per_buffer=chunk_size
        )
    
        def _reader():
            while not stop_evt.is_set():
                try:
                    frames.append(stream.read(chunk_size, exception_on_overflow=False))
                except Exception:
                    break
    
        t = threading.Thread(target=_reader, daemon=True)
        t.start()
        input()  # 用户再次按 Enter 停止录音
        stop_evt.set()
        t.join(timeout=1.0)
        try:
            stream.close()
        except Exception:
            pass
        return b''.join(frames)
    
    
    if __name__  == '__main__':
        print('Initializing ...')
        # 运行时上下文:存放音频与会话句柄
        ctx = {'pya': None, 'out': None, 'conv': None, 'resp_done': threading.Event()}
        callback = MyCallback(ctx)
        conversation = OmniRealtimeConversation(
            model='qwen3-omni-flash-realtime',
            callback=callback,
            # 以下为北京地域url,若使用新加坡地域的模型,需将url替换为:wss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime
            url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime",
        )
        try:
            conversation.connect()
        except Exception as e:
            print('[Error] connect failed: {}'.format(e))
            sys.exit(1)
    
        ctx['conv'] = conversation
        # 会话配置:启用文本+音频输出(禁用服务端VAD,改为手动录音)
        conversation.update_session(
            output_modalities=[MultiModality.AUDIO, MultiModality.TEXT],
            voice=voice,
            input_audio_format=AudioFormat.PCM_16000HZ_MONO_16BIT,
            output_audio_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
            enable_input_audio_transcription=True,
            # 对输入音频做语音转录的模型,仅支持gummy-realtime-v1
            input_audio_transcription_model='gummy-realtime-v1',
            enable_turn_detection=False,
            instructions="你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。"
        )
    
        try:
            turn = 1
            while True:
                print(f"\n--- 第 {turn} 轮对话 ---")
                print("按 Enter 开始录音(输入 q 回车退出)...")
                user_input = input()
                if user_input.strip().lower() in ['q', 'quit']:
                    print("用户请求退出...")
                    break
                print("录音中... 再次按 Enter 停止录音。")
                if ctx['pya'] is None:
                    ctx['pya'] = pyaudio.PyAudio()
                recorded = record_until_enter(ctx['pya'])
                if not recorded:
                    print("未录制到有效音频,请重试。")
                    continue
                print(f"成功录制音频: {len(recorded)} 字节,发送中...")
    
                # 以3200字节为块发送(对应16k/16bit/100ms)
                chunk_size = 3200
                for i in range(0, len(recorded), chunk_size):
                    chunk = recorded[i:i+chunk_size]
                    conversation.append_audio(base64.b64encode(chunk).decode('ascii'))
    
                print("发送完成,等待模型响应...")
                ctx['resp_done'].clear()
                conversation.commit()
                conversation.create_response()
                ctx['resp_done'].wait()
                print('播放音频完成')
                turn += 1
        except KeyboardInterrupt:
            print("\n程序被用户中断")
        finally:
            shutdown_ctx(ctx)
            print("程序退出")

    运行manual_dash.py,按 Enter 键开始说话,再按一次获取模型响应的音频。

DashScope Java SDK

选择交互模式

  • VAD 模式(Voice Activity Detection,自动检测语音起止)

    Realtime API 自动判断用户何时开始与停止说话并作出回应。

  • Manual 模式(按下即说,松开即发送)

    客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。

VAD 模式

OmniServerVad.java

// DashScope Java SDK 版本不低于2.20.9

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class OmniServerVad {
    // RealtimePcmPlayer 类定义开始
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();
    
        // 构造函数初始化音频格式和音频线路
        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }
    
        // 播放一个音频块并阻塞直到播放完成
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;
    
            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // 等待缓冲区中的音频播放完成
            Thread.sleep(audioLength - 10);
        }
    
        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }
    
        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }
    
        public void waitForComplete() throws InterruptedException {
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            line.drain();
        }
    
        public void shutdown() throws InterruptedException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();
            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    } // RealtimePcmPlayer 类定义结束
    
    public static void main(String[] args) throws InterruptedException, LineUnavailableException {
        String imageB64 = null;

        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen3-omni-flash-realtime")
                // 如果没有配置环境变量,请用您的 API Key 将下行修改为.apikey("sk-xxx")
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                .build();

        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);
        OmniRealtimeConversation conversation = null;
        final AtomicReference<OmniRealtimeConversation> conversationRef = new AtomicReference<>(null);
        conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("connection opened");
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        System.out.println("start session: " + message.get("session").getAsJsonObject().get("id").getAsString());
                        break;
                    case "conversation.item.input_audio_transcription.completed":
                        System.out.println("question: " + message.get("transcript").getAsString());
                        break;
                    case "response.audio_transcript.delta":
                        System.out.println("got llm response delta: " + message.get("delta").getAsString());
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "input_audio_buffer.speech_started":
                        System.out.println("======VAD Speech Start======");
                        audioPlayer.cancel();
                        break;
                    case "response.done":
                        System.out.println("======RESPONSE DONE======");
                        if (conversationRef.get() != null) {
                            System.out.println("[Metric] response: " + conversationRef.get().getResponseId() +
                                    ", first text delay: " + conversationRef.get().getFirstTextDelay() +
                                    " ms, first audio delay: " + conversationRef.get().getFirstAudioDelay() + " ms");
                        }
                        break;
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
            }
        });
        conversationRef.set(conversation);
        try {
            conversation.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        OmniRealtimeConfig config = OmniRealtimeConfig.builder()
        .modalities(Arrays.asList(OmniRealtimeModality.AUDIO, OmniRealtimeModality.TEXT))
        .voice("Cherry")
        .enableTurnDetection(true)
        .enableInputAudioTranscription(true)
        // 对输入音频做语音转录的模型,仅支持gummy-realtime-v1
        .InputAudioTranscription("gummy-realtime-v1")
        // 设定模型的角色
        .parameters(new HashMap<String, Object>() {{
            put("instructions","你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。"); 
        }})
        .build();
        conversation.updateSession(config);
        long last_photo_time = System.currentTimeMillis();
        try {
            // 创建音频格式
            AudioFormat audioFormat = new AudioFormat(16000, 16, 1, true, false);
            // 根据格式匹配默认录音设备
            TargetDataLine targetDataLine =
                    AudioSystem.getTargetDataLine(audioFormat);
            targetDataLine.open(audioFormat);
            // 开始录音
            targetDataLine.start();
            ByteBuffer buffer = ByteBuffer.allocate(3200);
            long start = System.currentTimeMillis();
            // 录音50s并进行实时转写
            while (System.currentTimeMillis() - start < 50000) {
                int read = targetDataLine.read(buffer.array(), 0, buffer.capacity());
                if (read > 0) {
                    buffer.limit(read);
                    String audioB64 = Base64.getEncoder().encodeToString(buffer.array());
                    // 将录音音频数据发送给流式识别服务
                    conversation.appendAudio(audioB64);
                    buffer = ByteBuffer.allocate(3200);
                    // 录音速率有限,防止cpu占用过高,休眠一小会儿
                    Thread.sleep(20);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        conversation.close(1000, "bye");
        audioPlayer.waitForComplete();
        audioPlayer.shutdown();
        System.exit(0);
    }
}

运行OmniServerVad.main()方法,通过麦克风即可与 Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。

Manual 模式

OmniWithoutServerVad.java

// DashScope Java SDK 版本不低于2.20.9

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class OmniWithoutServerVad {
    // RealtimePcmPlayer 类定义开始
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();

        // 构造函数初始化音频格式和音频线路
        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }

        // 播放一个音频块并阻塞直到播放完成
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;

            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // 等待缓冲区中的音频播放完成
            Thread.sleep(audioLength - 10);
        }

        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }

        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }

        public void waitForComplete() throws InterruptedException {
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            line.drain();
        }

        public void shutdown() throws InterruptedException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();
            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    } // RealtimePcmPlayer 类定义结束
    // 新增录音方法
    private static void recordAndSend(TargetDataLine line, OmniRealtimeConversation conversation) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        byte[] buffer = new byte[3200];
        AtomicBoolean stopRecording = new AtomicBoolean(false);

        // 启动监听Enter键的线程
        Thread enterKeyListener = new Thread(() -> {
            try {
                System.in.read();
                stopRecording.set(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        enterKeyListener.start();

        // 录音循环
        while (!stopRecording.get()) {
            int count = line.read(buffer, 0, buffer.length);
            if (count > 0) {
                out.write(buffer, 0, count);
            }
        }

        // 发送录音数据
        byte[] audioData = out.toByteArray();
        String audioB64 = Base64.getEncoder().encodeToString(audioData);
        conversation.appendAudio(audioB64);
        out.close();
    }

    public static void main(String[] args) throws InterruptedException, LineUnavailableException {
        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen3-omni-flash-realtime")
                // .apikey("your-dashscope-api-key")
                .build();
        AtomicReference<CountDownLatch> responseDoneLatch = new AtomicReference<>(null);
        responseDoneLatch.set(new CountDownLatch(1));

        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);
        final AtomicReference<OmniRealtimeConversation> conversationRef = new AtomicReference<>(null);
        OmniRealtimeConversation conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("connection opened");
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        System.out.println("start session: " + message.get("session").getAsJsonObject().get("id").getAsString());
                        break;
                    case "conversation.item.input_audio_transcription.completed":
                        System.out.println("question: " + message.get("transcript").getAsString());
                        break;
                    case "response.audio_transcript.delta":
                        System.out.println("got llm response delta: " + message.get("delta").getAsString());
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "response.done":
                        System.out.println("======RESPONSE DONE======");
                        if (conversationRef.get() != null) {
                            System.out.println("[Metric] response: " + conversationRef.get().getResponseId() +
                                    ", first text delay: " + conversationRef.get().getFirstTextDelay() +
                                    " ms, first audio delay: " + conversationRef.get().getFirstAudioDelay() + " ms");
                        }
                        responseDoneLatch.get().countDown();
                        break;
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
            }
        });
        conversationRef.set(conversation);
        try {
            conversation.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        OmniRealtimeConfig config = OmniRealtimeConfig.builder()
                .modalities(Arrays.asList(OmniRealtimeModality.AUDIO, OmniRealtimeModality.TEXT))
                .voice("Cherry")
                .enableTurnDetection(false)
                // 设定模型角色
                .parameters(new HashMap<String, Object>() {{
                    put("instructions","你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。");
                }})
                .build();
        conversation.updateSession(config);

        // 新增麦克风录音功能
        AudioFormat format = new AudioFormat(16000, 16, 1, true, false);
        DataLine.Info info = new DataLine.Info(TargetDataLine.class, format);
        
        if (!AudioSystem.isLineSupported(info)) {
            System.out.println("Line not supported");
            return;
        }
        
        TargetDataLine line = null;
        try {
            line = (TargetDataLine) AudioSystem.getLine(info);
            line.open(format);
            line.start();
            
            while (true) {
                System.out.println("按Enter开始录音...");
                System.in.read();
                System.out.println("开始录音,请说话...再次按Enter停止录音并发送");
                recordAndSend(line, conversation);
                conversation.commit();
                conversation.createResponse(null, null);
                // 重置latch以便下次等待
                responseDoneLatch.set(new CountDownLatch(1));
            }
        } catch (LineUnavailableException | IOException e) {
            e.printStackTrace();
        } finally {
            if (line != null) {
                line.stop();
                line.close();
            }
        }
    }}

运行OmniWithoutServerVad.main()方法,按 Enter 键开始录音,录音过程中再次按 Enter 键停止录音并发送,随后将接收并播放模型响应。

WebSocket(Python)

  • 准备运行环境

    您的 Python 版本需要不低于 3.10。

    首先根据您的操作系统来安装 pyaudio。

    macOS

    brew install portaudio && pip install pyaudio

    Debian/Ubuntu

    sudo apt-get install python3-pyaudio
    
    或者
    
    pip install pyaudio
    推荐使用pip install pyaudio。如果安装失败,请先根据您的操作系统安装portaudio依赖。

    CentOS

    sudo yum install -y portaudio portaudio-devel && pip install pyaudio

    Windows

    pip install pyaudio

    安装完成后,通过 pip 安装 websocket 相关的依赖:

    pip install websockets==15.0.1
  • 创建客户端

    在本地新建一个 python 文件,命名为omni_realtime_client.py,并将以下代码复制进文件中:

    omni_realtime_client.py

    # -- coding: utf-8 --
    
    import asyncio
    import websockets
    import json
    import base64
    import time
    from typing import Optional, Callable, List, Dict, Any
    from enum import Enum
    
    
    class TurnDetectionMode(Enum):
        SERVER_VAD = "server_vad"
        MANUAL = "manual"
    
    
    class OmniRealtimeClient:
        """
        与 Omni Realtime API 交互的演示客户端。
    
        该类提供了连接 Realtime API、发送文本和音频数据、处理响应以及管理 WebSocket 连接的相关方法。
    
        属性说明:
            base_url (str):
                Realtime API 的基础地址。
            api_key (str):
                用于身份验证的 API Key。
            model (str):
                用于聊天的 Omni 模型名称。
            voice (str):
                服务器合成语音所使用的声音。
            turn_detection_mode (TurnDetectionMode):
                轮次检测模式。
            on_text_delta (Callable[[str], None]):
                文本增量回调函数。
            on_audio_delta (Callable[[bytes], None]):
                音频增量回调函数。
            on_input_transcript (Callable[[str], None]):
                输入转录文本回调函数。
            on_interrupt (Callable[[], None]):
                用户打断回调函数,应在此停止音频播放。
            on_output_transcript (Callable[[str], None]):
                输出转录文本回调函数。
            extra_event_handlers (Dict[str, Callable[[Dict[str, Any]], None]]):
                其他事件处理器,事件名到处理函数的映射。
        """
    
        def __init__(
                self,
                base_url,
                api_key: str,
                model: str = "",
                voice: str = "Ethan",
                instructions: str = "You are a helpful assistant.",
                turn_detection_mode: TurnDetectionMode = TurnDetectionMode.MANUAL,
                on_text_delta: Optional[Callable[[str], None]] = None,
                on_audio_delta: Optional[Callable[[bytes], None]] = None,
                on_interrupt: Optional[Callable[[], None]] = None,
                on_input_transcript: Optional[Callable[[str], None]] = None,
                on_output_transcript: Optional[Callable[[str], None]] = None,
                extra_event_handlers: Optional[Dict[str, Callable[[Dict[str, Any]], None]]] = None
        ):
            self.base_url = base_url
            self.api_key = api_key
            self.model = model
            self.voice = voice
            self.instructions = instructions
            self.ws = None
            self.on_text_delta = on_text_delta
            self.on_audio_delta = on_audio_delta
            self.on_interrupt = on_interrupt
            self.on_input_transcript = on_input_transcript
            self.on_output_transcript = on_output_transcript
            self.turn_detection_mode = turn_detection_mode
            self.extra_event_handlers = extra_event_handlers or {}
    
            # 当前回复状态
            self._current_response_id = None
            self._current_item_id = None
            self._is_responding = False
            # 输入/输出转录打印状态
            self._print_input_transcript = False
            self._output_transcript_buffer = ""
    
        async def connect(self) -> None:
            """与 Realtime API 建立 WebSocket 连接。"""
            url = f"{self.base_url}?model={self.model}"
            headers = {
                "Authorization": f"Bearer {self.api_key}"
            }
    
            self.ws = await websockets.connect(url, additional_headers=headers)
    
            # 设置默认会话配置
            if self.turn_detection_mode == TurnDetectionMode.MANUAL:
                await self.update_session({
                    "modalities": ["text", "audio"],
                    "voice": self.voice,
                    "instructions": self.instructions,
                    "input_audio_format": "pcm16",
                    "output_audio_format": "pcm24",
                    "input_audio_transcription": {
                        # 对输入音频做语音转录的模型,仅支持gummy-realtime-v1
                        "model": "gummy-realtime-v1"
                    },
                    "turn_detection": None
                })
            elif self.turn_detection_mode == TurnDetectionMode.SERVER_VAD:
                await self.update_session({
                    "modalities": ["text", "audio"],
                    "voice": self.voice,
                    "instructions": self.instructions,
                    "input_audio_format": "pcm16",
                    "output_audio_format": "pcm24",
                    "input_audio_transcription": {
                        # 对输入音频做语音转录的模型,仅支持gummy-realtime-v1
                        "model": "gummy-realtime-v1"
                    },
                    "turn_detection": {
                        "type": "server_vad",
                        "threshold": 0.1,
                        "prefix_padding_ms": 500,
                        "silence_duration_ms": 900
                    }
                })
            else:
                raise ValueError(f"Invalid turn detection mode: {self.turn_detection_mode}")
    
        async def send_event(self, event) -> None:
            event['event_id'] = "event_" + str(int(time.time() * 1000))
            print(f" Send event: type={event['type']}, event_id={event['event_id']}")
            await self.ws.send(json.dumps(event))
    
        async def update_session(self, config: Dict[str, Any]) -> None:
            """更新会话配置。"""
            event = {
                "type": "session.update",
                "session": config
            }
            print("update session: ", event)
            await self.send_event(event)
    
        async def stream_audio(self, audio_chunk: bytes) -> None:
            """向 API 流式发送原始音频数据。"""
            # 仅支持 16bit 16kHz 单声道 PCM
            audio_b64 = base64.b64encode(audio_chunk).decode()
    
            append_event = {
                "type": "input_audio_buffer.append",
                "audio": audio_b64
            }
            await self.send_event(append_event)
    
        async def commit_audio_buffer(self) -> None:
            """提交音频缓冲区以触发处理。"""
            event = {
                "type": "input_audio_buffer.commit"
            }
            await self.send_event(event)
    
        async def append_image(self, image_chunk: bytes) -> None:
            """向视频缓冲区追加图像数据。
            图像数据可以来自本地文件,也可以来自实时视频流。
    
            注意:
                - 图像格式必须为 JPG 或 JPEG。推荐分辨率为 480P 或 720P,最高支持 1080P。
                - 单张图片大小不应超过 500KB。
                - 本方法会将图像数据编码为 Base64 后再发送。
                - 建议以每秒 2 帧的频率向服务器发送图像。
                - 在发送图像数据之前,需要先发送音频数据。
            """
            image_b64 = base64.b64encode(image_chunk).decode()
    
            event = {
                "type": "input_image_buffer.append",
                "image": image_b64
            }
            await self.send_event(event)
    
        async def create_response(self) -> None:
            """向 API 请求生成回复(仅在手动模式下需要调用)。"""
            event = {
                "type": "response.create",
                "response": {
                    "instructions": "You are a helpful assistant.",
                    "modalities": ["text", "audio"]
                }
            }
            print("create response: ", event)
            await self.send_event(event)
    
        async def cancel_response(self) -> None:
            """取消当前回复。"""
            event = {
                "type": "response.cancel"
            }
            await self.send_event(event)
    
        async def handle_interruption(self):
            """处理用户对当前回复的打断。"""
            if not self._is_responding:
                return
    
            print(" Handling interruption")
    
            # 1. 取消当前回复
            if self._current_response_id:
                await self.cancel_response()
    
            self._is_responding = False
            self._current_response_id = None
            self._current_item_id = None
    
        async def handle_messages(self) -> None:
            try:
                async for message in self.ws:
                    event = json.loads(message)
                    event_type = event.get("type")
    
                    if event_type != "response.audio.delta":
                        print(" event: ", event)
                    else:
                        print(" event_type: ", event_type)
    
                    if event_type == "error":
                        print(" Error: ", event['error'])
                        continue
                    elif event_type == "response.created":
                        self._current_response_id = event.get("response", {}).get("id")
                        self._is_responding = True
                    elif event_type == "response.output_item.added":
                        self._current_item_id = event.get("item", {}).get("id")
                    elif event_type == "response.done":
                        self._is_responding = False
                        self._current_response_id = None
                        self._current_item_id = None
                    # Handle interruptions
                    elif event_type == "input_audio_buffer.speech_started":
                        print(" Speech detected")
                        if self._is_responding:
                            print(" Handling interruption")
                            await self.handle_interruption()
    
                        if self.on_interrupt:
                            print(" Handling on_interrupt, stop playback")
                            self.on_interrupt()
                    elif event_type == "input_audio_buffer.speech_stopped":
                        print(" Speech ended")
                    # Handle normal response events
                    elif event_type == "response.text.delta":
                        if self.on_text_delta:
                            self.on_text_delta(event["delta"])
                    elif event_type == "response.audio.delta":
                        if self.on_audio_delta:
                            audio_bytes = base64.b64decode(event["delta"])
                            self.on_audio_delta(audio_bytes)
                    elif event_type == "conversation.item.input_audio_transcription.completed":
                        transcript = event.get("transcript", "")
                        if self.on_input_transcript:
                            await asyncio.to_thread(self.on_input_transcript, transcript)
                            self._print_input_transcript = True
                    elif event_type == "response.audio_transcript.delta":
                        if self.on_output_transcript:
                            delta = event.get("delta", "")
                            if not self._print_input_transcript:
                                self._output_transcript_buffer += delta
                            else:
                                if self._output_transcript_buffer:
                                    await asyncio.to_thread(self.on_output_transcript, self._output_transcript_buffer)
                                    self._output_transcript_buffer = ""
                                await asyncio.to_thread(self.on_output_transcript, delta)
                    elif event_type == "response.audio_transcript.done":
                        self._print_input_transcript = False
                    elif event_type in self.extra_event_handlers:
                        self.extra_event_handlers[event_type](event)
    
            except websockets.exceptions.ConnectionClosed:
                print(" Connection closed")
            except Exception as e:
                print(" Error in message handling: ", str(e))
    
        async def close(self) -> None:
            """关闭 WebSocket 连接。"""
            if self.ws:
                await self.ws.close()
  • 选择交互模式

    • VAD 模式(Voice Activity Detection,自动检测语音起止)

      Realtime API 自动判断用户何时开始与停止说话并作出回应。

    • Manual 模式(按下即说,松开即发送)

      客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。

    VAD 模式

    omni_realtime_client.py的同级目录下新建另一个 python 文件,命名为vad_mode.py,并将以下代码复制进文件中:

    vad_mode.py

    # -- coding: utf-8 --
    import os, asyncio
    from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
    import pyaudio
    import queue
    import threading
    
    class AudioPlayer:
        """简单音频播放器,封装播放线程与中断控制。"""
        def __init__(self, pyaudio_instance: pyaudio.PyAudio, rate: int = 16000, channels: int = 1, chunk: int = 3200):
            self.pyaudio_instance = pyaudio_instance
            self.rate = rate
            self.channels = channels
            self.chunk = chunk
            self.format = pyaudio.paInt16
            self.audio_queue = queue.Queue()
            self._thread = None
            self._stop_evt = threading.Event()
            self._interrupt_evt = threading.Event()
            self._stream = None
    
        def start(self):
            if self._thread and self._thread.is_alive():
                return
            self._stream = self.pyaudio_instance.open(
                format=self.format,
                channels=self.channels,
                rate=self.rate,
                output=True,
                frames_per_buffer=self.chunk,
            )
            self._stop_evt.clear()
            self._interrupt_evt.clear()
            self._thread = threading.Thread(target=self._run, daemon=True)
            self._thread.start()
    
        def stop(self):
            self._stop_evt.set()
            self.audio_queue.put(None)
            if self._thread:
                self._thread.join(timeout=1)
            if self._stream:
                try:
                    self._stream.stop_stream()
                finally:
                    self._stream.close()
                self._stream = None
    
        def handle_interrupt(self):
            self._interrupt_evt.set()
            # 清空队列
            with self.audio_queue.mutex:
                self.audio_queue.queue.clear()
    
        def add_audio(self, audio_data: bytes):
            if audio_data:
                self.audio_queue.put(audio_data)
    
        def _run(self):
            try:
                while not self._stop_evt.is_set():
                    try:
                        if self._interrupt_evt.is_set():
                            self._interrupt_evt.clear()
                            continue
                        audio_data = self.audio_queue.get(timeout=0.5)
                        if audio_data is None:
                            break
                        if self._interrupt_evt.is_set():
                            self._interrupt_evt.clear()
                            self.audio_queue.task_done()
                            continue
                        self._stream.write(audio_data)
                        self.audio_queue.task_done()
                    except queue.Empty:
                        continue
            finally:
                try:
                    if self._stream:
                        self._stream.stop_stream()
                        self._stream.close()
                except Exception:
                    pass
    
    
    async def start_microphone_streaming(client: OmniRealtimeClient, p: pyaudio.PyAudio, rate: int = 16000, chunk: int = 3200, channels: int = 1):
        FORMAT = pyaudio.paInt16
        stream = p.open(
            format=FORMAT, channels=channels, rate=rate, input=True, frames_per_buffer=chunk
        )
    
        try:
            print("开始录音,请讲话...")
            while True:
                audio_data = stream.read(chunk)
                await client.stream_audio(audio_data)
    
                await asyncio.sleep(0.02)
        finally:
            stream.stop_stream()
            stream.close()
            # 麦克风流结束,主进程统一终止全局 PyAudio
    
    
    async def main():
        # 初始化全局 PyAudio,并启动播放器
        p = pyaudio.PyAudio()
        player = AudioPlayer(pyaudio_instance=p, rate=24000, channels=1, chunk=3200)
        player.start()
    
        realtime_client = OmniRealtimeClient(
            # 以下是北京地域 base_url,新加坡地域base_urlwss://dashscope-intl.aliyuncs.com/api-ws/v1/realtime
            base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime",
            api_key=os.environ.get("DASHSCOPE_API_KEY"),
            model="qwen3-omni-flash-realtime",
            voice="Cherry",
            on_text_delta=lambda text: print(f"\nAssistant: {text}", end="", flush=True),
            on_audio_delta=player.add_audio,
            on_interrupt=player.handle_interrupt,  # 添加中断回调函数
            turn_detection_mode=TurnDetectionMode.SERVER_VAD,
            # 设定模型角色
            instructions="你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。"
        )
    
        try:
            await realtime_client.connect()
            # 启动消息处理和麦克风录音
            message_handler = asyncio.create_task(realtime_client.handle_messages())
            streaming_task = asyncio.create_task(
                start_microphone_streaming(realtime_client, p=p, rate=16000, chunk=3200, channels=1)
            )
    
            # 使用 Future 永久等待
            await asyncio.Future()
        except Exception as e:
            print(f"Error: {e}")
        finally:
            # 结束音频播放
            player.stop()
            await realtime_client.close()
            p.terminate()
    
    
    if __name__ == "__main__":
        asyncio.run(main())

    运行vad_mode.py,通过麦克风即可与 Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。

    Manual 模式

    omni_realtime_client.py的同级目录下新建另一个 python 文件,命名为manual_mode.py,并将以下代码复制进文件中:

    manual_mode.py

    # -- coding: utf-8 --
    import os
    import asyncio
    import time
    import threading
    import queue
    import pyaudio
    from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
    
    class AudioPlayer:
        """实时音频播放器类"""
        def __init__(self, sample_rate=24000, channels=1, sample_width=2):
            self.sample_rate = sample_rate
            self.channels = channels
            self.sample_width = sample_width  # 2 bytes for 16-bit
            self.audio_queue = queue.Queue()
            self.is_playing = False
            self.play_thread = None
            self.pyaudio_instance = None
            self.stream = None
            self._lock = threading.Lock()  # 添加锁来同步访问
            self._last_data_time = time.time()  # 记录最后接收数据的时间
            self._response_done = False  # 添加响应完成标志
            self._waiting_for_response = False # 标记是否正在等待服务器响应
            # 记录最后一次向音频流写入数据的时间及最近一次音频块的时长,用于更精确地判断播放结束
            self._last_play_time = time.time()
            self._last_chunk_duration = 0.0
            
        def start(self):
            """启动音频播放器"""
            with self._lock:
                if self.is_playing:
                    return
                    
                self.is_playing = True
                
                try:
                    self.pyaudio_instance = pyaudio.PyAudio()
                    
                    # 创建音频输出流
                    self.stream = self.pyaudio_instance.open(
                        format=pyaudio.paInt16,  # 16-bit
                        channels=self.channels,
                        rate=self.sample_rate,
                        output=True,
                        frames_per_buffer=1024
                    )
                    
                    # 启动播放线程
                    self.play_thread = threading.Thread(target=self._play_audio)
                    self.play_thread.daemon = True
                    self.play_thread.start()
                    
                    print("音频播放器已启动")
                except Exception as e:
                    print(f"启动音频播放器失败: {e}")
                    self._cleanup_resources()
                    raise
        
        def stop(self):
            """停止音频播放器"""
            with self._lock:
                if not self.is_playing:
                    return
                    
                self.is_playing = False
            
            # 清空队列
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except queue.Empty:
                    break
            
            # 等待播放线程结束(在锁外面等待,避免死锁)
            if self.play_thread and self.play_thread.is_alive():
                self.play_thread.join(timeout=2.0)
            
            # 再次获取锁来清理资源
            with self._lock:
                self._cleanup_resources()
            
            print("音频播放器已停止")
        
        def _cleanup_resources(self):
            """清理音频资源(必须在锁内调用)"""
            try:
                # 关闭音频流
                if self.stream:
                    if not self.stream.is_stopped():
                        self.stream.stop_stream()
                    self.stream.close()
                    self.stream = None
            except Exception as e:
                print(f"关闭音频流时出错: {e}")
            
            try:
                if self.pyaudio_instance:
                    self.pyaudio_instance.terminate()
                    self.pyaudio_instance = None
            except Exception as e:
                print(f"终止PyAudio时出错: {e}")
        
        def add_audio_data(self, audio_data):
            """添加音频数据到播放队列"""
            if self.is_playing and audio_data:
                self.audio_queue.put(audio_data)
                with self._lock:
                    self._last_data_time = time.time()  # 更新最后接收数据的时间
                    self._waiting_for_response = False # 收到数据,不再等待
        
        def stop_receiving_data(self):
            """标记不再接收新的音频数据"""
            with self._lock:
                self._response_done = True
                self._waiting_for_response = False # 响应结束,不再等待
        
        def prepare_for_next_turn(self):
            """为下一轮对话重置播放器状态。"""
            with self._lock:
                self._response_done = False
                self._last_data_time = time.time()
                self._last_play_time = time.time()
                self._last_chunk_duration = 0.0
                self._waiting_for_response = True # 开始等待下一轮响应
            
            # 清空上一轮可能残留的音频数据
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except queue.Empty:
                    break
    
        def is_finished_playing(self):
            """检查是否已经播放完所有音频数据"""
            with self._lock:
                queue_size = self.audio_queue.qsize()
                time_since_last_data = time.time() - self._last_data_time
                time_since_last_play = time.time() - self._last_play_time
                
                # ---------------------- 智能结束判定 ----------------------
                # 1. 首选:如果服务器已标记完成且播放队列为空
                #    进一步等待最近一块音频播放完毕(音频块时长 + 0.1s 容错)。
                if self._response_done and queue_size == 0:
                    min_wait = max(self._last_chunk_duration + 0.1, 0.5)  # 至少等待 0.5s
                    if time_since_last_play >= min_wait:
                        return True
    
                # 2. 备用:如果长时间没有新数据且播放队列为空
                #    当服务器没有明确发出 `response.done` 时,此逻辑作为保障
                if not self._waiting_for_response and queue_size == 0 and time_since_last_data > 1.0:
                    print("\n(超时未收到新音频,判定播放结束)")
                    return True
                
                return False
        
        def _play_audio(self):
            """播放音频数据的工作线程"""
            while True:
                # 检查是否应该停止
                with self._lock:
                    if not self.is_playing:
                        break
                    stream_ref = self.stream  # 获取流的引用
                
                try:
                    # 从队列中获取音频数据,超时0.1秒
                    audio_data = self.audio_queue.get(timeout=0.1)
                    
                    # 再次检查状态和流的有效性
                    with self._lock:
                        if self.is_playing and stream_ref and not stream_ref.is_stopped():
                            try:
                                # 播放音频数据
                                stream_ref.write(audio_data)
                                # 更新最近播放信息
                                self._last_play_time = time.time()
                                self._last_chunk_duration = len(audio_data) / (self.channels * self.sample_width) / self.sample_rate
                            except Exception as e:
                                print(f"写入音频流时出错: {e}")
                                break
                        
                    # 标记该数据块已处理完成
                    self.audio_queue.task_done()
    
                except queue.Empty:
                    # 队列为空时继续等待
                    continue
                except Exception as e:
                    print(f"播放音频时出错: {e}")
                    break
    
    class MicrophoneRecorder:
        """实时麦克风录音器"""
        def __init__(self, sample_rate=16000, channels=1, chunk_size=3200):
            self.sample_rate = sample_rate
            self.channels = channels
            self.chunk_size = chunk_size
            self.pyaudio_instance = None
            self.stream = None
            self.frames = []
            self._is_recording = False
            self._record_thread = None
    
        def _recording_thread(self):
            """录音工作线程"""
            # 在 _is_recording 为 True 期间,持续从音频流中读取数据
            while self._is_recording:
                try:
                    # 使用 exception_on_overflow=False 避免因缓冲区溢出而崩溃
                    data = self.stream.read(self.chunk_size, exception_on_overflow=False)
                    self.frames.append(data)
                except (IOError, OSError) as e:
                    # 当流被关闭时,读取操作可能会引发错误
                    print(f"录音流读取错误,可能已关闭: {e}")
                    break
    
        def start(self):
            """开始录音"""
            if self._is_recording:
                print("录音已在进行中。")
                return
            
            self.frames = []
            self._is_recording = True
            
            try:
                self.pyaudio_instance = pyaudio.PyAudio()
                self.stream = self.pyaudio_instance.open(
                    format=pyaudio.paInt16,
                    channels=self.channels,
                    rate=self.sample_rate,
                    input=True,
                    frames_per_buffer=self.chunk_size
                )
                
                self._record_thread = threading.Thread(target=self._recording_thread)
                self._record_thread.daemon = True
                self._record_thread.start()
                print("麦克风录音已开始...")
            except Exception as e:
                print(f"启动麦克风失败: {e}")
                self._is_recording = False
                self._cleanup()
                raise
    
        def stop(self):
            """停止录音并返回音频数据"""
            if not self._is_recording:
                return None
                
            self._is_recording = False
            
            # 等待录音线程安全退出
            if self._record_thread:
                self._record_thread.join(timeout=1.0)
            
            self._cleanup()
            
            print("麦克风录音已停止。")
            return b''.join(self.frames)
    
        def _cleanup(self):
            """安全地清理 PyAudio 资源"""
            if self.stream:
                try:
                    if self.stream.is_active():
                        self.stream.stop_stream()
                    self.stream.close()
                except Exception as e:
                    print(f"关闭音频流时出错: {e}")
            
            if self.pyaudio_instance:
                try:
                    self.pyaudio_instance.terminate()
                except Exception as e:
                    print(f"终止 PyAudio 实例时出错: {e}")
    
            self.stream = None
            self.pyaudio_instance = None
    
    async def interactive_test():
        """
        交互式测试脚本:允许多轮连续对话,每轮可以发送音频和图片。
        """
        # ------------------- 1. 初始化和连接 (一次性) -------------------
        api_key = os.environ.get("DASHSCOPE_API_KEY")
        if not api_key:
            print("请设置DASHSCOPE_API_KEY环境变量")
            return
    
        print("--- 实时多轮音视频对话客户端 ---")
        print("正在初始化音频播放器和客户端...")
        
        audio_player = AudioPlayer()
        audio_player.start()
    
        def on_audio_received(audio_data):
            audio_player.add_audio_data(audio_data)
    
        def on_response_done(event):
            print("\n(收到响应结束标记)")
            audio_player.stop_receiving_data()
    
        realtime_client = OmniRealtimeClient(
            base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime",
            api_key=api_key,
            model="qwen3-omni-flash-realtime",
            voice="Ethan",
            instructions="你是个人助理小云,请你准确且友好地解答用户的问题,始终以乐于助人的态度回应。", # 设定模型角色
            on_text_delta=lambda text: print(f"助手回复: {text}", end="", flush=True),
            on_audio_delta=on_audio_received,
            turn_detection_mode=TurnDetectionMode.MANUAL,
            extra_event_handlers={"response.done": on_response_done}
        )
    
        message_handler_task = None
        try:
            await realtime_client.connect()
            print("已连接到服务器。输入 'q' 或 'quit' 可随时退出程序。")
            message_handler_task = asyncio.create_task(realtime_client.handle_messages())
            await asyncio.sleep(0.5)
    
            turn_counter = 1
            # ------------------- 2. 多轮对话循环 -------------------
            while True:
                print(f"\n--- 第 {turn_counter} 轮对话 ---")
                audio_player.prepare_for_next_turn()
    
                recorded_audio = None
                image_paths = []
                
                # --- 获取用户输入:从麦克风录音 ---
                loop = asyncio.get_event_loop()
                recorder = MicrophoneRecorder(sample_rate=16000) # 推荐使用16k采样率进行语音识别
    
                print("准备录音。按 Enter 键开始录音 (或输入 'q' 退出)...")
                user_input = await loop.run_in_executor(None, input)
                if user_input.strip().lower() in ['q', 'quit']:
                    print("用户请求退出...")
                    return
                
                try:
                    recorder.start()
                except Exception:
                    print("无法启动录音,请检查您的麦克风权限和设备。跳过本轮。")
                    continue
                
                print("录音中... 再次按 Enter 键停止录音。")
                await loop.run_in_executor(None, input)
    
                recorded_audio = recorder.stop()
    
                if not recorded_audio or len(recorded_audio) == 0:
                    print("未录制到有效音频,请重新开始本轮对话。")
                    continue
    
                # --- 获取图片输入 (可选) ---
                # 以下图片输入功能已被注释,暂时禁用。若需启用请取消下方代码注释。
                # print("\n请逐行输入【图片文件】的绝对路径 (可选)。完成后,输入 's' 或按 Enter 发送请求。")
                # while True:
                #     path = input("图片路径: ").strip()
                #     if path.lower() == 's' or path == '':
                #         break
                #     if path.lower() in ['q', 'quit']:
                #         print("用户请求退出...")
                #         return
                #     
                #     if not os.path.isabs(path):
                #         print("错误: 请输入绝对路径。")
                #         continue
                #     if not os.path.exists(path):
                #         print(f"错误: 文件不存在 -> {path}")
                #         continue
                #     image_paths.append(path)
                #     print(f"已添加图片: {os.path.basename(path)}")
                
                # --- 3. 发送数据并获取响应 ---
                print("\n--- 输入确认 ---")
                print(f"待处理音频: 1个 (来自麦克风), 图片: {len(image_paths)}个")
                print("------------------")
    
                # 3.1 发送录制的音频
                try:
                    print(f"发送麦克风录音 ({len(recorded_audio)}字节)")
                    await realtime_client.stream_audio(recorded_audio)
                    await asyncio.sleep(0.1)
                except Exception as e:
                    print(f"发送麦克风录音失败: {e}")
                    continue
    
                # 3.2 发送所有图片文件
                # 以下图片发送代码已被注释,暂时禁用。
                # for i, path in enumerate(image_paths):
                #     try:
                #         with open(path, "rb") as f:
                #             data = f.read()
                #         print(f"发送图片 {i+1}: {os.path.basename(path)} ({len(data)}字节)")
                #         await realtime_client.append_image(data)
                #         await asyncio.sleep(0.1)
                #     except Exception as e:
                #         print(f"发送图片 {os.path.basename(path)} 失败: {e}")
    
                # 3.3 提交并等待响应
                print("提交所有输入,请求服务器响应...")
                await realtime_client.commit_audio_buffer()
                await realtime_client.create_response()
    
                print("等待并播放服务器响应音频...")
                start_time = time.time()
                max_wait_time = 60
                while not audio_player.is_finished_playing():
                    if time.time() - start_time > max_wait_time:
                        print(f"\n等待超时 ({max_wait_time}秒), 进入下一轮。")
                        break
                    await asyncio.sleep(0.2)
                
                print("\n本轮音频播放完成!")
                turn_counter += 1
    
        except (asyncio.CancelledError, KeyboardInterrupt):
            print("\n程序被中断。")
        except Exception as e:
            print(f"发生未处理的错误: {e}")
        finally:
            # ------------------- 4. 清理资源 -------------------
            print("\n正在关闭连接并清理资源...")
            if message_handler_task and not message_handler_task.done():
                message_handler_task.cancel()
            
            if 'realtime_client' in locals() and realtime_client.ws and not realtime_client.ws.close:
                await realtime_client.close()
                print("连接已关闭。")
    
            audio_player.stop()
            print("程序退出。")
    
    if __name__ == "__main__":
        try:
            asyncio.run(interactive_test())
        except KeyboardInterrupt:
            print("\n程序被用户强制退出。") 

    运行manual_mode.py,按 Enter 键开始说话,再按一次获取模型响应的音频。

交互流程

VAD 模式

session.update事件的session.turn_detection 设为"server_vad"以启用 VAD 模式。此模式下,服务端自动检测语音起止并进行响应。适用于语音通话场景。

交互流程如下:

  1. 服务端检测到语音开始,发送input_audio_buffer.speech_started 事件。

  2. 客户端随时发送 input_audio_buffer.appendinput_image_buffer.append事件追加音频与图片至缓冲区。

    发送 input_image_buffer.append 事件前,至少发送过一次 input_audio_buffer.append 事件。
  3. 服务端检测到语音结束,发送input_audio_buffer.speech_stopped事件。

  4. 服务端发送input_audio_buffer.committed 事件提交音频缓冲区。

  5. 服务端发送 conversation.item.created事件,包含从缓冲区创建的用户消息项。

生命周期

客户端事件

服务端事件

会话初始化

session.update

会话配置

session.created

会话已创建

session.updated

会话配置已更新

用户音频输入

input_audio_buffer.append

添加音频到缓冲区

input_image_buffer.append

添加图片到缓冲区

input_audio_buffer.speech_started

检测到语音开始

input_audio_buffer.speech_stopped

检测到语音结束

input_audio_buffer.committed

服务器收到提交的音频

服务器音频输出

response.created

服务端开始生成响应

response.output_item.added

响应时有新的输出内容

conversation.item.created

对话项被创建

response.content_part.added

新的输出内容添加到assistant message

response.audio_transcript.delta

增量生成的转录文字

response.audio.delta

模型增量生成的音频

response.audio_transcript.done

文本转录完成

response.audio.done

音频生成完成

response.content_part.done

Assistant message 的文本或音频内容流式输出完成

response.output_item.done

Assistant message 的整个输出项流式传输完成

response.done

响应完成

Manual 模式

session.update事件的session.turn_detection 设为 null 以启用 Manual 模式。此模式下,客户端通过显式发送input_audio_buffer.commitresponse.create事件请求服务器响应。适用于按下即说场景,如聊天软件中的发送语音。

交互流程如下:

  1. 客户端随时发送 input_audio_buffer.appendinput_image_buffer.append事件追加音频与图片至缓冲区。

    发送 input_image_buffer.append 事件前,至少发送过一次 input_audio_buffer.append 事件。
  2. 客户端发送input_audio_buffer.commit事件提交音频缓冲区与图像缓冲区,告知服务端本轮的用户输入(音频及图片)已全部发送完毕。

  3. 服务端响应 input_audio_buffer.committed事件。

  4. 客户端发送response.create事件,等待服务端返回模型的输出。

  5. 服务端响应conversation.item.created事件。

生命周期

客户端事件

服务端事件

会话初始化

session.update

会话配置

session.created

会话已创建

session.updated

会话配置已更新

用户音频输入

input_audio_buffer.append

添加音频到缓冲区

input_image_buffer.append

添加图片到缓冲区

input_audio_buffer.commit

提交音频与图片到服务器

response.create

创建模型响应

input_audio_buffer.committed

服务器收到提交的音频

服务器音频输出

input_audio_buffer.clear

清除缓冲区的音频

response.created

服务端开始生成响应

response.output_item.added

响应时有新的输出内容

conversation.item.created

对话项被创建

response.content_part.added

新的输出内容添加到assistant message 项

response.audio_transcript.delta

增量生成的转录文字

response.audio.delta

模型增量生成的音频

response.audio_transcript.done

完成文本转录

response.audio.done

完成音频生成

response.content_part.done

Assistant message 的文本或音频内容流式输出完成

response.output_item.done

Assistant message 的整个输出项流式传输完成

response.done

响应完成

API 参考

计费与限流

计费规则

Qwen-Omni-Realtime 模型根据不同模态(音频、图像)对应的Token数计费。计费详情请参见模型列表

音频、图片转换为Token数的规则

音频

  • Qwen3-Omni-Flash-Realtime:总 Token 数 = 音频时长(单位:秒)* 12.5

  • Qwen-Omni-Turbo-Realtime:总 Token 数 = 音频时长(单位:秒)* 25

若音频时长不足1秒,则按 1 秒计算。

图片

28x28像素对应1个 Token,1张图最少需要 4个 Token,最多需要 1280个 Token。运行以下代码可估计传入图片的 Token。

import math
# 使用以下命令安装Pillow库:pip install Pillow
from PIL import Image

def token_calculate(image_path):
    # 打开指定的图片文件
    image = Image.open(image_path)
    # 获取图片的原始尺寸
    height = image.height
    width = image.width
    # 将高度调整为28的整数倍
    h_bar = round(height / 28) * 28
    # 将宽度调整为28的整数倍
    w_bar = round(width / 28) * 28
    # 图像的Token下限:4个Token
    min_pixels = 28 * 28 * 4
    # 图像的Token上限:1280个Token
    max_pixels = 1280 * 28 * 28
    # 对图像进行缩放处理,调整像素的总数在范围[min_pixels,max_pixels]内
    if h_bar * w_bar > max_pixels:
        # 计算缩放因子beta,使得缩放后的图像总像素数不超过max_pixels
        beta = math.sqrt((height * width) / max_pixels)
        # 重新计算调整后的高度,确保为28的整数倍
        h_bar = math.floor(height / beta / 28) * 28
        # 重新计算调整后的宽度,确保为28的整数倍
        w_bar = math.floor(width / beta / 28) * 28
    elif h_bar * w_bar < min_pixels:
        # 计算缩放因子beta,使得缩放后的图像总像素数不低于min_pixels
        beta = math.sqrt(min_pixels / (height * width))
        # 重新计算调整后的高度,确保为28的整数倍
        h_bar = math.ceil(height * beta / 28) * 28
        # 重新计算调整后的宽度,确保为28的整数倍
        w_bar = math.ceil(width * beta / 28) * 28
    print(f"缩放后的图像尺寸为:高度为{h_bar},宽度为{w_bar}")
    # 计算图像的Token数:总像素除以28 * 28
    token = int((h_bar * w_bar) / (28 * 28))
    # 系统会自动添加<|vision_bos|>和<|vision_eos|>视觉标记(各1个Token)
    total_token = token + 2
    print(f"图像的Token数为{total_token}")    
    return total_token
if __name__ == "__main__":
    total_token = token_calculate(image_path="test.jpeg")

限流

模型限流规则请参见限流

常见问题

Q1:如何在线体验 Qwen-Omni-Realtime 模型?

A:您可以通过以下方式一键部署:

  1. 访问函数计算模板部署类型选择直接部署百炼 API-KEY 填入您的 API Key;单击创建并部署默认环境

  2. 等待约一分钟,在环境详情环境信息中获取访问域名将访问域名的http改成https(示例:https://omni-realtime.fcv3.xxxx.cn-hangzhou.fc.devsapp.net),修改后的 HTTPS 链接指向一个可在线体验的 Web 应用,可通过它与模型进行实时视频或语音通话。

重要

此链接使用自签名证书,仅用于临时测试。首次访问时,浏览器会显示安全警告,这是预期行为,请勿在生产环境使用。如需继续,请按浏览器提示操作(如点击“高级” → “继续前往(不安全)”)。

通过资源信息-函数资源查看项目源代码。
函数计算阿里云百炼均为新用户提供免费额度,可以覆盖简单调试所需成本,额度耗尽后按量计费。只有在访问的情况下会产生费用。

Q2:怎么向模型输入图片?

A:通过客户端发送input_image_buffer.append事件。

若用于视频通话场景,可以对视频抽帧,以不超过每秒两帧的速度发送input_image_buffer.append 事件。 DashScope SDK 代码请参见Omni-Realtime 示例代码

错误码

如果模型调用失败并返回报错信息,请参见错误信息进行解决。

音色列表

Qwen3-Omni-Flash-Realtime

音色名

voice参数

音色效果

描述

支持的语种

芊悦

Cherry

阳光积极、亲切自然小姐姐。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

晨煦

Ethan

标准普通话,带部分北方口音。阳光、温暖、活力、朝气。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

不吃鱼

Nofish

不会翘舌音的设计师。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

詹妮弗

Jennifer

品牌级、电影质感般美语女声。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

甜茶

Ryan

节奏拉满,戏感炸裂,真实与张力共舞。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

卡捷琳娜

Katerina

御姐音色,韵律回味十足。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

墨讲师

Elias

既保持学科严谨性,又通过叙事技巧将复杂知识转化为可消化的认知模块。

中文、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

上海-阿珍

Jada

风风火火的沪上阿姐。

中文(上海话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

北京-晓东

Dylan

北京胡同里长大的少年。

中文(北京话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

四川-晴儿

Sunny

甜到你心里的川妹子。

中文(四川话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

南京-老李

Li

耐心的瑜伽老师

中文(南京话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

陕西-秦川

Marcus

面宽话短,心实声沉——老陕的味道。

中文(陕西话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

闽南-阿杰

Roy

诙谐直爽、市井活泼的台湾哥仔形象。

中文(闽南语)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

天津-李彼得

Peter

天津相声,专业捧人。

中文(天津话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

粤语-阿强

Rocky

幽默风趣的阿强,在线陪聊。

中文(粤语)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

粤语-阿清

Kiki

甜美的港妹闺蜜。

中文(粤语)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

四川-程川

Eric

一个跳脱市井的四川成都男子。

中文(四川话)、英语、法语、德语、俄语、意大利语、西班牙语、葡萄牙语、日语、韩语

Qwen-Omni-Turbo-Realtime

音色名

voice参数

音色效果

描述

支持的语种

芊悦

Cherry

阳光积极、亲切自然小姐姐。

中文、英语

苏瑶

Serena

温柔小姐姐。

中文、英语

晨煦

Ethan

标准普通话,带部分北方口音。阳光、温暖、活力、朝气。

中文、英语

千雪

Chelsie

二次元虚拟女友。

中文、英语