实时多模态(Qwen-Omni-Realtime)

更新时间: 2025-08-14 19:52:15

Qwen-Omni 实时 API 提供了低延迟的多模态交互能力,支持音频的流式输入,并能够流式输出文本和音频。

在线体验请参见如何在线体验 Qwen-Omni-Realtime 模型?

相比于 Qwen-Omni 模型,Qwen-Omni 实时模型可以:

  • 音频流式输入:Qwen-Omni 模型只能接收音频文件作为输入,而 Qwen-Omni 实时模型可以实时接收音频流;

  • 语音活动检测:Qwen-Omni 实时模型内置 VAD(Voice Activity Detection,语音活动检测)功能,可自动检测用户语音的开始和结束;

模型支持的音色包括Chelsie、Serena、Ethan和Cherry。

支持的模型

模型名称

版本

上下文长度

最大输入

最大输出

免费额度

(注)

(Token数)

qwen-omni-turbo-realtime

当前能力等同 qwen-omni-turbo-realtime-2025-05-08

稳定版

32,768

30,720

2,048

各100万Token(不区分模态)

有效期:百炼开通后180天内

qwen-omni-turbo-realtime-latest

能力始终等同最新快照版

最新版

qwen-omni-turbo-realtime-2025-05-08

快照版

免费额度用完后,输入与输出的计费规则如下:

输入计费项

单价(每千 Token)

输入:文本

0.0016元

输入:音频

0.025元

输入:图片

0.006元

输出计费项

单价(每千 Token)

输出:文本

0.0064元(输入仅包含文本时)

0.018元(输入包含图片/音频时)

输出:文本+音频

0.05元(音频)

输出的文本不计费。

访问方式

Qwen-Omni 实时 API 基于 WebSocket 协议,可通过 DashScope 的 Python 或 Java SDK 接入,也支持使用各类语言的 WebSocket 库实现通信。

通过以下代码与 Qwen-Omni 实时 API服务建立WebSocket连接。

建立WebSocket连接

DashScope Python SDK

# SDK 版本不低于1.23.9
import os
import json
from dashscope.audio.qwen_omni import OmniRealtimeConversation,OmniRealtimeCallback
import dashscope
# 若没有配置 API Key,请将下行改为 dashscope.api_key = "sk-xxx"
dashscope.api_key = os.getenv("DASHSCOPE_API_KEY")

class PrintCallback(OmniRealtimeCallback):
    def on_open(self) -> None:
        print("Connected Successfully")
    def on_event(self, response: dict) -> None:
        print("Received event:")
        print(json.dumps(response, indent=2, ensure_ascii=False))
    def on_close(self, close_status_code: int, close_msg: str) -> None:
        print(f"Connection closed (code={close_status_code}, msg={close_msg}).")

callback = PrintCallback()
conversation = OmniRealtimeConversation(
    model="qwen-omni-turbo-realtime-latest",
    callback=callback,
)
conversation.connect()
conversation.thread.join()

DashScope Java SDK

// SDK 版本不低于 2.20.9

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;

public class Main {
    public static void main(String[] args) throws InterruptedException, NoApiKeyException {
        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen-omni-turbo-realtime-latest")
                // 若没有配置环境变量,请用您的 API Key 将下行修改为.apikey("sk-xxx")
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                .build();

        OmniRealtimeConversation conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("Connected Successfully");
            }
            @Override
            public void onEvent(JsonObject message) {
                System.out.println(message);
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
            }
        });

        conversation.connect();
        Thread.sleep(Long.MAX_VALUE);
        conversation.close(1000, "bye");
        System.exit(0);
    }
}

WebSocket(Python)

配置项

说明

调用地址

需要配置为:wss://dashscope.aliyuncs.com/api-ws/v1/realtime

查询参数

查询参数为model,需指定为要访问的model名称,请参见支持的模型

消息头

使用 Bearer Token 鉴权:Authorization: Bearer DASHSCOPE_API_KEY

DASHSCOPE_API_KEY 是您在百炼上申请的API-KEY。
# pip install websocket-client
import json
import websocket
import os

API_KEY=os.getenv("DASHSCOPE_API_KEY")
API_URL = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=qwen-omni-turbo-realtime"

headers = [
    "Authorization: Bearer " + API_KEY
]

def on_open(ws):
    print(f"Connected to server: {API_URL}")
def on_message(ws, message):
    data = json.loads(message)
    print("Received event:", json.dumps(data, indent=2))
def on_error(ws, error):
    print("Error:", error)

ws = websocket.WebSocketApp(
    API_URL,
    header=headers,
    on_open=on_open,
    on_message=on_message,
    on_error=on_error
)

ws.run_forever()

连接后可以接收到以下回调信息:

{
    "event_id": "event_Bj1ixvmDpFda3ahThkOSz",
    "type": "session.created",
    "session": {
        "object": "realtime.session",
        "model": "qwen-omni-turbo-realtime",
        "modalities": [
            "text",
            "audio"
        ],
        "instructions": "...model instructions here...",
        "voice": "Chelsie",
        "input_audio_format": "pcm16",
        "output_audio_format": "pcm16",
        "input_audio_transcription": {
            "model": "gummy-realtime-v1"
        },
        "turn_detection": {
            "type": "server_vad",
            "threshold": 0.5,
            "prefix_padding_ms": 300,
            "silence_duration_ms": 800,
            "create_response": true,
            "interrupt_response": true
        },
        "tools": [],
        "tool_choice": "auto",
        "temperature": 0.8,
        "max_response_output_tokens": "inf",
        "id": "sess_JSqGSEx9RRfReAIWNegSp"
    }
}

快速开始

您需要获取API Key配置API Key到环境变量

请选择您熟悉的编程语言,通过以下步骤快速体验与 Realtime 模型实时对话的功能。

DashScope Python SDK

  • 准备运行环境

您的 Python 版本需要不低于 3.10。

首先根据您的操作系统来安装 pyaudio。

macOS

brew install portaudio && pip install pyaudio

Debian/Ubuntu

sudo apt-get install python3-pyaudio

或者

pip install pyaudio

CentOS

sudo yum install -y portaudio portaudio-devel && pip install pyaudio

Windows

pip install pyaudio

安装完成后,通过 pip 安装依赖:

pip install websocket-client==1.8.0 websockets dashscope==1.23.9
  • 选择交互模式

    • VAD 模式(Voice Activity Detection,自动检测语音起止)

      Realtime API 自动判断用户何时开始与停止说话并作出回应。

    • Manual 模式(按下即说,松开即发送)

      客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。

    VAD 模式

    新建一个 python 文件,命名为vad_dash.py,并将以下代码复制到文件中:

    vad_dash.py

    # dashscope SDK 版本需不低于 1.23.9
    import os
    import base64
    import signal
    import sys
    import time
    import pyaudio
    import contextlib
    import threading
    import queue
    from dashscope.audio.qwen_omni import *
    import dashscope
    # 如果没有设置环境变量,请用您的 API Key 将下行替换为dashscope.api_key = "sk-xxx"
    dashscope.api_key = os.getenv('DASHSCOPE_API_KEY')
    voice = 'Chelsie'
    conversation = None
    
    class B64PCMPlayer:
      def __init__(self, pya: pyaudio.PyAudio, sample_rate=24000, chunk_size_ms=100):
        self.pya = pya
        self.sample_rate = sample_rate
        self.chunk_size_bytes = chunk_size_ms * sample_rate *2 // 1000
        self.player_stream = pya.open(format=pyaudio.paInt16,
                                      channels=1,
                                      rate=sample_rate,
                                      output=True)
    
        self.raw_audio_buffer: queue.Queue = queue.Queue()
        self.b64_audio_buffer: queue.Queue = queue.Queue()
        self.status_lock = threading.Lock()
        self.status = 'playing'
        self.decoder_thread = threading.Thread(target=self.decoder_loop)
        self.player_thread = threading.Thread(target=self.player_loop)
        self.decoder_thread.start()
        self.player_thread.start()
        self.complete_event: threading.Event = None
    
      def decoder_loop(self):
        while self.status != 'stop':
          recv_audio_b64 = None
          with contextlib.suppress(queue.Empty):
            recv_audio_b64 = self.b64_audio_buffer.get(timeout=0.1)
          if recv_audio_b64 is None:
            continue
          recv_audio_raw = base64.b64decode(recv_audio_b64)
          # 将原始音频数据推入队列,按块处理
          for i in range(0, len(recv_audio_raw), self.chunk_size_bytes):
            chunk = recv_audio_raw[i:i + self.chunk_size_bytes]
            self.raw_audio_buffer.put(chunk)
    
      def player_loop(self):
        while self.status != 'stop':
          recv_audio_raw = None
          with contextlib.suppress(queue.Empty):
            recv_audio_raw = self.raw_audio_buffer.get(timeout=0.1)
          if recv_audio_raw is None:
            if self.complete_event:
              self.complete_event.set()
            continue
            # 将块写入pyaudio音频播放器,等待播放完这个块
          self.player_stream.write(recv_audio_raw)
    
      def cancel_playing(self):
        self.b64_audio_buffer.queue.clear()
        self.raw_audio_buffer.queue.clear()
    
      def add_data(self, data):
        self.b64_audio_buffer.put(data)
    
      def wait_for_complete(self):
        self.complete_event = threading.Event()
        self.complete_event.wait()
        self.complete_event = None
    
      def shutdown(self):
        self.status = 'stop'
        self.decoder_thread.join()
        self.player_thread.join()
        self.player_stream.close()
    
    
    
    class MyCallback(OmniRealtimeCallback):
      def on_open(self) -> None:
        global pya
        global mic_stream
        global b64_player
        print('connection opened, init microphone')
        pya = pyaudio.PyAudio()
        mic_stream = pya.open(format=pyaudio.paInt16,
                            channels=1,
                            rate=16000,
                            input=True)
        b64_player = B64PCMPlayer(pya)
        def on_close(self, close_status_code, close_msg) -> None:
            print('connection closed with code: {}, msg: {}, destroy microphone'.format(close_status_code, close_msg))
            sys.exit(0)
    
        def on_event(self, response: str) -> None:
            try:
                global conversation
                global b64_player
                type = response['type']
                if 'session.created' == type:
                    print('start session: {}'.format(response['session']['id']))
                if 'conversation.item.input_audio_transcription.completed' == type:
                    print('question: {}'.format(response['transcript']))
                if 'response.audio_transcript.delta' == type:
                    text = response['delta']
                    print("got llm response delta: {}".format(text))
                if 'response.audio.delta' == type:
                    recv_audio_b64 = response['delta']
                    b64_player.add_data(recv_audio_b64)
                if 'input_audio_buffer.speech_started' == type:
                    print('======VAD Speech Start======')
                    b64_player.cancel_playing()
                if 'response.done' == type:
                    print('======RESPONSE DONE======')
                    print('[Metric] response: {}, first text delay: {}, first audio delay: {}'.format(
                                    conversation.get_last_response_id(), 
                                    conversation.get_last_first_text_delay(), 
                                    conversation.get_last_first_audio_delay(),
                                    ))
            except Exception as e:
                print('[Error] {}'.format(e))
                return
    
    if __name__  == '__main__':
        print('Initializing ...')
        callback = MyCallback()
        conversation = OmniRealtimeConversation(
            model='qwen-omni-turbo-realtime-latest',
            callback=callback, 
            )
        conversation.connect()
        conversation.update_session(
            output_modalities=[MultiModality.AUDIO, MultiModality.TEXT],
            voice=voice,
            input_audio_format=AudioFormat.PCM_16000HZ_MONO_16BIT,
            output_audio_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
            enable_input_audio_transcription=True,
            input_audio_transcription_model='gummy-realtime-v1',
            enable_turn_detection=True,
            turn_detection_type='server_vad',
        )
        def signal_handler(sig, frame):
            print('Ctrl+C pressed, stop recognition ...')
            conversation.close()
            b64_player.shutdown()
            print('omni realtime stopped.')
            sys.exit(0)
        signal.signal(signal.SIGINT, signal_handler)
        print("Press 'Ctrl+C' to stop conversation...")
        last_photo_time = time.time()*1000
        while True:
            if mic_stream:
                audio_data = mic_stream.read(3200, exception_on_overflow=False)
                audio_b64 = base64.b64encode(audio_data).decode('ascii')
                conversation.append_audio(audio_b64)
            else:
                break

    运行vad_dash.py,通过麦克风即可与 Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。

    Manual 模式

    新建一个 python 文件,命名为manual_dash.py,并将以下代码复制进文件中:

    manual_dash.py

    # dashscope SDK 版本需不低于 1.23.9
    import os
    import base64
    import contextlib
    import queue
    import sys
    import threading
    import pyaudio
    from dashscope.audio.qwen_omni import *
    import dashscope
    # 如果没有设置环境变量,请用您的 API Key 将下行替换为dashscope.api_key = "sk-xxx"
    dashscope.api_key = os.getenv('DASHSCOPE_API_KEY')
    
    voice = 'Chelsie'
    conversation = None
    
    class B64PCMPlayer:
        def __init__(self, pya: pyaudio.PyAudio, sample_rate=24000, chunk_size_ms=100):
            self.pya = pya
            self.sample_rate = sample_rate
            self.chunk_size_bytes = chunk_size_ms * sample_rate *2 // 1000
            self.player_stream = pya.open(format=pyaudio.paInt16,
                    channels=1,
                    rate=sample_rate,
                    output=True)
    
            self.raw_audio_buffer: queue.Queue = queue.Queue()
            self.b64_audio_buffer: queue.Queue = queue.Queue()
            self.status_lock = threading.Lock()
            self.status = 'playing'
            self.decoder_thread = threading.Thread(target=self.decoder_loop)
            self.player_thread = threading.Thread(target=self.player_loop)
            self.decoder_thread.start()
            self.player_thread.start()
            self.complete_event: threading.Event = None
    
        def decoder_loop(self):
            while self.status != 'stop':
                recv_audio_b64 = None
                with contextlib.suppress(queue.Empty):
                    recv_audio_b64 = self.b64_audio_buffer.get(timeout=0.1)
                if recv_audio_b64 is None:
                    continue
                recv_audio_raw = base64.b64decode(recv_audio_b64)
                # push raw audio data into queue by chunk
                for i in range(0, len(recv_audio_raw), self.chunk_size_bytes):
                    chunk = recv_audio_raw[i:i + self.chunk_size_bytes]
                    self.raw_audio_buffer.put(chunk)
    
        def player_loop(self):
            while self.status != 'stop':
                recv_audio_raw = None
                with contextlib.suppress(queue.Empty):
                    recv_audio_raw = self.raw_audio_buffer.get(timeout=0.1)
                if recv_audio_raw is None:
                    if self.complete_event:
                        self.complete_event.set()
                    continue
                # write chunk to pyaudio audio player, wait until finish playing this chunk.
                self.player_stream.write(recv_audio_raw)
    
        def cancel_playing(self):
            self.b64_audio_buffer.queue.clear()
            self.raw_audio_buffer.queue.clear()
    
        def add_data(self, data):
            self.b64_audio_buffer.put(data)
    
        def wait_for_complete(self):
            self.complete_event = threading.Event()
            self.complete_event.wait()
            self.complete_event = None
    
        def shutdown(self):
            self.status = 'stop'
            self.decoder_thread.join()
            self.player_thread.join()
            self.player_stream.close()
    
    class MyCallback(OmniRealtimeCallback):
        def __init__(self):
            super().__init__()
            self.complete_event: threading.Event = threading.Event()
        
        def wait_for_complete(self):
            self.complete_event.wait()
            self.complete_event = threading.Event()
    
        def on_open(self) -> None:
            global b64_player
            print('connection opened, init microphone')
            self.pya = pyaudio.PyAudio()
            b64_player = B64PCMPlayer(self.pya)
    
        def on_close(self, close_status_code, close_msg) -> None:
            print('connection closed with code: {}, msg: {}, destroy microphone'.format(close_status_code, close_msg))
            sys.exit(0)
    
        def on_event(self, response: str) -> None:
            global conversation
            global b64_player
            try:
                type = response['type']
                if 'session.created' == type:
                    print('start session: {}'.format(response['session']['id']))
                if 'conversation.item.input_audio_transcription.completed' == type:
                    print('question: {}'.format(response['transcript']))
                if 'response.audio_transcript.delta' == type:
                    text = response['delta']
                    print("got llm response delta: {}".format(text))
                if 'response.audio.delta' == type:
                    recv_audio_b64 = response['delta']
                    b64_player.add_data(recv_audio_b64)
                if 'response.done' == type:
                    print('======RESPONSE DONE======')
                    print('[Metric] response: {}, first text delay: {}, first audio delay: {}'.format(
                                    conversation.get_last_response_id(), 
                                    conversation.get_last_first_text_delay(), 
                                    conversation.get_last_first_audio_delay(),
                                    ))
                    if self.complete_event:
                        self.complete_event.set()
            except Exception as e:
                print('[Error] {}'.format(e))
                return
    
    class MicrophoneRecorder:
        """实时麦克风录音器"""
        def __init__(self, sample_rate=16000, channels=1, chunk_size=1024):
            self.sample_rate = sample_rate
            self.channels = channels
            self.chunk_size = chunk_size
            self.pyaudio_instance = None
            self.stream = None
            self.frames = []
            self._is_recording = False
            self._record_thread = None
    
        def _recording_thread(self):
            """录音工作线程"""
            # 在 _is_recording 为 True 期间,持续从音频流中读取数据
            while self._is_recording:
                try:
                    # 使用 exception_on_overflow=False 避免因缓冲区溢出而崩溃
                    data = self.stream.read(self.chunk_size, exception_on_overflow=False)
                    self.frames.append(data)
                except (IOError, OSError) as e:
                    # 当流被关闭时,读取操作可能会引发错误
                    print(f"录音流读取错误,可能已关闭: {e}")
                    break
    
        def start(self):
            """开始录音"""
            if self._is_recording:
                print("录音已在进行中。")
                return
            self.frames = []
            self._is_recording = True
            
            try:
                self.pyaudio_instance = pyaudio.PyAudio()
                self.stream = self.pyaudio_instance.open(
                    format=pyaudio.paInt16,
                    channels=self.channels,
                    rate=self.sample_rate,
                    input=True,
                    frames_per_buffer=self.chunk_size
                )
                self._record_thread = threading.Thread(target=self._recording_thread)
                self._record_thread.daemon = True
                self._record_thread.start()
                print("麦克风录音已开始...")
            except Exception as e:
                print(f"启动麦克风失败: {e}")
                self._is_recording = False
                self._cleanup()
                raise
    
        def stop(self):
            """停止录音并返回音频数据"""
            if not self._is_recording:
                return None 
            self._is_recording = False
            # 等待录音线程安全退出
            if self._record_thread:
                self._record_thread.join(timeout=1.0)
            self._cleanup()
            print("麦克风录音已停止。")
            return b''.join(self.frames)
    
        def _cleanup(self):
            """安全地清理 PyAudio 资源"""
            if self.stream:
                try:
                    if self.stream.is_active():
                        self.stream.stop_stream()
                    self.stream.close()
                except Exception as e:
                    print(f"关闭音频流时出错: {e}")
            
            if self.pyaudio_instance:
                try:
                    self.pyaudio_instance.terminate()
                except Exception as e:
                    print(f"终止 PyAudio 实例时出错: {e}")
            self.stream = None
            self.pyaudio_instance = None
    
    
    if __name__  == '__main__':
        print('Initializing ...')
        callback = MyCallback()
        conversation = OmniRealtimeConversation(
            model='qwen-omni-turbo-realtime-latest',
            callback=callback, 
            )
        conversation.connect()
        conversation.update_session(
            output_modalities=[MultiModality.AUDIO, MultiModality.TEXT],
            voice=voice,
            input_audio_format=AudioFormat.PCM_16000HZ_MONO_16BIT,
            output_audio_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
            enable_input_audio_transcription=True,
            input_audio_transcription_model='gummy-realtime-v1',
            enable_turn_detection=False,
        )
        try:
            turn_counter = 1
            while True:
                print(f"\n--- 第 {turn_counter} 轮对话 ---")  
                # 初始化麦克风录音
                recorder = MicrophoneRecorder(sample_rate=16000)  # 16k采样率进行语音识别
                print("准备录音。按 Enter 键开始录音 (或输入 'q' 退出)...")
                user_input = input()
                if user_input.strip().lower() in ['q', 'quit']:
                    print("用户请求退出...")
                    break
                try:
                    recorder.start()
                except Exception as e:
                    print(f"无法启动录音,请检查您的麦克风权限和设备: {e}")
                    continue
                print("录音中... 再次按 Enter 键停止录音。")
                input()
                recorded_audio = recorder.stop()           
                if not recorded_audio or len(recorded_audio) == 0:
                    print("未录制到有效音频,请重新开始本轮对话。")
                    continue 
                print(f"成功录制音频: {len(recorded_audio)}字节")
                # 将录制的音频分块发送
                chunk_size = 3200  # 与原始代码保持一致的块大小
                for i in range(0, len(recorded_audio), chunk_size):
                    audio_chunk = recorded_audio[i:i+chunk_size]
                    if audio_chunk:
                        conversation.append_audio(base64.b64encode(audio_chunk).decode('ascii'))   
                print("发送录音完成,等待响应...")
                conversation.commit()
                conversation.create_response()
                callback.wait_for_complete()
                b64_player.wait_for_complete()
                print('播放音频完成')
                turn_counter += 1 
        except KeyboardInterrupt:
            print("\n程序被用户中断")
        finally:
            print("程序退出")

    运行manual_dash.py,按 Enter 键开始说话,再按一次获取模型响应的音频。

DashScope Java SDK

选择交互模式

  • VAD 模式(Voice Activity Detection,自动检测语音起止)

    Realtime API 自动判断用户何时开始与停止说话并作出回应。

  • Manual 模式(按下即说,松开即发送)

    客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。

VAD 模式

OmniServerVad.java

// DashScope Java SDK 版本不低于2.20.9

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class OmniServerVad {
    // RealtimePcmPlayer 类定义开始
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();
    
        // 构造函数初始化音频格式和音频线路
        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }
    
        // 播放一个音频块并阻塞直到播放完成
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;
    
            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // 等待缓冲区中的音频播放完成
            Thread.sleep(audioLength - 10);
        }
    
        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }
    
        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }
    
        public void waitForComplete() throws InterruptedException {
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            line.drain();
        }
    
        public void shutdown() throws InterruptedException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();
            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    } // RealtimePcmPlayer 类定义结束
    
    public static void main(String[] args) throws InterruptedException, LineUnavailableException {
        String imageB64 = null;

        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen-omni-turbo-realtime-latest")
                // 如果没有配置环境变量,请用您的 API Key 将下行修改为.apikey("sk-xxx")
                .apikey(System.getenv("DASHSCOPE_API_KEY"))
                .build();

        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);
        OmniRealtimeConversation conversation = null;
        final AtomicReference<OmniRealtimeConversation> conversationRef = new AtomicReference<>(null);
        conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("connection opened");
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        System.out.println("start session: " + message.get("session").getAsJsonObject().get("id").getAsString());
                        break;
                    case "conversation.item.input_audio_transcription.completed":
                        System.out.println("question: " + message.get("transcript").getAsString());
                        break;
                    case "response.audio_transcript.delta":
                        System.out.println("got llm response delta: " + message.get("delta").getAsString());
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "input_audio_buffer.speech_started":
                        System.out.println("======VAD Speech Start======");
                        audioPlayer.cancel();
                        break;
                    case "response.done":
                        System.out.println("======RESPONSE DONE======");
                        if (conversationRef.get() != null) {
                            System.out.println("[Metric] response: " + conversationRef.get().getResponseId() +
                                    ", first text delay: " + conversationRef.get().getFirstTextDelay() +
                                    " ms, first audio delay: " + conversationRef.get().getFirstAudioDelay() + " ms");
                        }
                        break;
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
            }
        });
        conversationRef.set(conversation);
        try {
            conversation.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        OmniRealtimeConfig config = OmniRealtimeConfig.builder()
                .modalities(Arrays.asList(OmniRealtimeModality.AUDIO, OmniRealtimeModality.TEXT))
                .voice("Chelsie")
                .enableTurnDetection(true)
                .enableInputAudioTranscription(true)
                .InputAudioTranscription("gummy-realtime-v1")
                .build();
        conversation.updateSession(config);
        long last_photo_time = System.currentTimeMillis();
        try {
            // 创建音频格式
            AudioFormat audioFormat = new AudioFormat(16000, 16, 1, true, false);
            // 根据格式匹配默认录音设备
            TargetDataLine targetDataLine =
                    AudioSystem.getTargetDataLine(audioFormat);
            targetDataLine.open(audioFormat);
            // 开始录音
            targetDataLine.start();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            long start = System.currentTimeMillis();
            // 录音50s并进行实时转写
            while (System.currentTimeMillis() - start < 50000) {
                int read = targetDataLine.read(buffer.array(), 0, buffer.capacity());
                if (read > 0) {
                    buffer.limit(read);
                    String audioB64 = Base64.getEncoder().encodeToString(buffer.array());
                    // 将录音音频数据发送给流式识别服务
                    conversation.appendAudio(audioB64);
                    buffer = ByteBuffer.allocate(1024);
                    // 录音速率有限,防止cpu占用过高,休眠一小会儿
                    Thread.sleep(20);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        conversation.commit();
        conversation.createResponse(null, null);
        conversation.close(1000, "bye");
        audioPlayer.waitForComplete();
        audioPlayer.shutdown();
        System.exit(0);
    }
}

运行OmniServerVad.main()方法,通过麦克风即可与 Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。

Manual 模式

OmniWithoutServerVad.java

// DashScope Java SDK 版本不低于2.20.9

import com.alibaba.dashscope.audio.omni.*;
import com.alibaba.dashscope.exception.NoApiKeyException;
import com.google.gson.JsonObject;
import javax.sound.sampled.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class OmniWithoutServerVad {
    // RealtimePcmPlayer 类定义开始
    public static class RealtimePcmPlayer {
        private int sampleRate;
        private SourceDataLine line;
        private AudioFormat audioFormat;
        private Thread decoderThread;
        private Thread playerThread;
        private AtomicBoolean stopped = new AtomicBoolean(false);
        private Queue<String> b64AudioBuffer = new ConcurrentLinkedQueue<>();
        private Queue<byte[]> RawAudioBuffer = new ConcurrentLinkedQueue<>();

        // 构造函数初始化音频格式和音频线路
        public RealtimePcmPlayer(int sampleRate) throws LineUnavailableException {
            this.sampleRate = sampleRate;
            this.audioFormat = new AudioFormat(this.sampleRate, 16, 1, true, false);
            DataLine.Info info = new DataLine.Info(SourceDataLine.class, audioFormat);
            line = (SourceDataLine) AudioSystem.getLine(info);
            line.open(audioFormat);
            line.start();
            decoderThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        String b64Audio = b64AudioBuffer.poll();
                        if (b64Audio != null) {
                            byte[] rawAudio = Base64.getDecoder().decode(b64Audio);
                            RawAudioBuffer.add(rawAudio);
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            playerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (!stopped.get()) {
                        byte[] rawAudio = RawAudioBuffer.poll();
                        if (rawAudio != null) {
                            try {
                                playChunk(rawAudio);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        } else {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
                }
            });
            decoderThread.start();
            playerThread.start();
        }

        // 播放一个音频块并阻塞直到播放完成
        private void playChunk(byte[] chunk) throws IOException, InterruptedException {
            if (chunk == null || chunk.length == 0) return;

            int bytesWritten = 0;
            while (bytesWritten < chunk.length) {
                bytesWritten += line.write(chunk, bytesWritten, chunk.length - bytesWritten);
            }
            int audioLength = chunk.length / (this.sampleRate*2/1000);
            // 等待缓冲区中的音频播放完成
            Thread.sleep(audioLength - 10);
        }

        public void write(String b64Audio) {
            b64AudioBuffer.add(b64Audio);
        }

        public void cancel() {
            b64AudioBuffer.clear();
            RawAudioBuffer.clear();
        }

        public void waitForComplete() throws InterruptedException {
            while (!b64AudioBuffer.isEmpty() || !RawAudioBuffer.isEmpty()) {
                Thread.sleep(100);
            }
            line.drain();
        }

        public void shutdown() throws InterruptedException {
            stopped.set(true);
            decoderThread.join();
            playerThread.join();
            if (line != null && line.isRunning()) {
                line.drain();
                line.close();
            }
        }
    } // RealtimePcmPlayer 类定义结束
    // 新增录音方法
    private static void recordAndSend(TargetDataLine line, OmniRealtimeConversation conversation) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        byte[] buffer = new byte[3200];
        AtomicBoolean stopRecording = new AtomicBoolean(false);

        // 启动监听Enter键的线程
        Thread enterKeyListener = new Thread(() -> {
            try {
                System.in.read();
                stopRecording.set(true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        enterKeyListener.start();

        // 录音循环
        while (!stopRecording.get()) {
            int count = line.read(buffer, 0, buffer.length);
            if (count > 0) {
                out.write(buffer, 0, count);
            }
        }

        // 发送录音数据
        byte[] audioData = out.toByteArray();
        String audioB64 = Base64.getEncoder().encodeToString(audioData);
        conversation.appendAudio(audioB64);
        out.close();
    }

    public static void main(String[] args) throws InterruptedException, LineUnavailableException {
        OmniRealtimeParam param = OmniRealtimeParam.builder()
                .model("qwen-omni-turbo-realtime-latest")
                // .apikey("your-dashscope-api-key")
                .build();
        AtomicReference<CountDownLatch> responseDoneLatch = new AtomicReference<>(null);
        responseDoneLatch.set(new CountDownLatch(1));

        RealtimePcmPlayer audioPlayer = new RealtimePcmPlayer(24000);
        final AtomicReference<OmniRealtimeConversation> conversationRef = new AtomicReference<>(null);
        OmniRealtimeConversation conversation = new OmniRealtimeConversation(param, new OmniRealtimeCallback() {
            @Override
            public void onOpen() {
                System.out.println("connection opened");
            }
            @Override
            public void onEvent(JsonObject message) {
                String type = message.get("type").getAsString();
                switch(type) {
                    case "session.created":
                        System.out.println("start session: " + message.get("session").getAsJsonObject().get("id").getAsString());
                        break;
                    case "conversation.item.input_audio_transcription.completed":
                        System.out.println("question: " + message.get("transcript").getAsString());
                        break;
                    case "response.audio_transcript.delta":
                        System.out.println("got llm response delta: " + message.get("delta").getAsString());
                        break;
                    case "response.audio.delta":
                        String recvAudioB64 = message.get("delta").getAsString();
                        audioPlayer.write(recvAudioB64);
                        break;
                    case "response.done":
                        System.out.println("======RESPONSE DONE======");
                        if (conversationRef.get() != null) {
                            System.out.println("[Metric] response: " + conversationRef.get().getResponseId() +
                                    ", first text delay: " + conversationRef.get().getFirstTextDelay() +
                                    " ms, first audio delay: " + conversationRef.get().getFirstAudioDelay() + " ms");
                        }
                        responseDoneLatch.get().countDown();
                        break;
                    default:
                        break;
                }
            }
            @Override
            public void onClose(int code, String reason) {
                System.out.println("connection closed code: " + code + ", reason: " + reason);
            }
        });
        conversationRef.set(conversation);
        try {
            conversation.connect();
        } catch (NoApiKeyException e) {
            throw new RuntimeException(e);
        }
        OmniRealtimeConfig config = OmniRealtimeConfig.builder()
                .modalities(Arrays.asList(OmniRealtimeModality.AUDIO, OmniRealtimeModality.TEXT))
                .voice("Chelsie")
                .enableTurnDetection(false)
                .build();
        conversation.updateSession(config);

        // 新增麦克风录音功能
        AudioFormat format = new AudioFormat(16000, 16, 1, true, false);
        DataLine.Info info = new DataLine.Info(TargetDataLine.class, format);
        
        if (!AudioSystem.isLineSupported(info)) {
            System.out.println("Line not supported");
            return;
        }
        
        TargetDataLine line = null;
        try {
            line = (TargetDataLine) AudioSystem.getLine(info);
            line.open(format);
            line.start();
            
            while (true) {
                System.out.println("按Enter开始录音...");
                System.in.read();
                System.out.println("开始录音,请说话...再次按Enter停止录音并发送");
                recordAndSend(line, conversation);
                conversation.commit();
                conversation.createResponse(null, null);
                // 重置latch以便下次等待
                responseDoneLatch.set(new CountDownLatch(1));
            }
        } catch (LineUnavailableException | IOException e) {
            e.printStackTrace();
        } finally {
            if (line != null) {
                line.stop();
                line.close();
            }
        }
    }

运行OmniWithoutServerVad.main()方法,按 Enter 键开始说话,再按一次获取模型响应的音频。

WebSocket(Python)

  • 准备运行环境

    您的 Python 版本需要不低于 3.10。

    首先根据您的操作系统来安装 pyaudio。

    macOS

    brew install portaudio && pip install pyaudio

    Debian/Ubuntu

    sudo apt-get install python3-pyaudio
    
    或者
    
    pip install pyaudio

    CentOS

    sudo yum install -y portaudio portaudio-devel && pip install pyaudio

    Windows

    pip install pyaudio

    安装完成后,通过 pip 安装 websocket 相关的依赖:

    pip install websocket-client==1.8.0 websockets
  • 创建客户端

    在本地新建一个 python 文件,命名为omni_realtime_client.py,并将以下代码复制进文件中:

    omni_realtime_client.py

    # -- coding: utf-8 --
    
    import asyncio
    import websockets
    import json
    import base64
    import time
    from typing import Optional, Callable, List, Dict, Any
    from enum import Enum
    
    class TurnDetectionMode(Enum):
        SERVER_VAD = "server_vad"
        MANUAL = "manual"
    
    class OmniRealtimeClient:
        """
        与 Omni Realtime API 交互的演示客户端。
    
        该类提供了连接 Realtime API、发送文本和音频数据、处理响应以及管理 WebSocket 连接的相关方法。
    
        属性说明:
            base_url (str):
                Realtime API 的基础地址。
            api_key (str):
                用于身份验证的 API Key。
            model (str):
                用于聊天的 Omni 模型名称。
            voice (str):
                服务器合成语音所使用的声音。
            turn_detection_mode (TurnDetectionMode):
                轮次检测模式。
            on_text_delta (Callable[[str], None]):
                文本增量回调函数。
            on_audio_delta (Callable[[bytes], None]):
                音频增量回调函数。
            on_input_transcript (Callable[[str], None]):
                输入转录文本回调函数。
            on_interrupt (Callable[[], None]):
                用户打断回调函数,应在此停止音频播放。
            on_output_transcript (Callable[[str], None]):
                输出转录文本回调函数。
            extra_event_handlers (Dict[str, Callable[[Dict[str, Any]], None]]):
                其他事件处理器,事件名到处理函数的映射。
        """
        def __init__(
            self,
            base_url,
            api_key: str,
            model: str = "",
            voice: str = "Ethan",
            turn_detection_mode: TurnDetectionMode = TurnDetectionMode.MANUAL,
            on_text_delta: Optional[Callable[[str], None]] = None,
            on_audio_delta: Optional[Callable[[bytes], None]] = None,
            on_interrupt: Optional[Callable[[], None]] = None,
            on_input_transcript: Optional[Callable[[str], None]] = None,
            on_output_transcript: Optional[Callable[[str], None]] = None,
            extra_event_handlers: Optional[Dict[str, Callable[[Dict[str, Any]], None]]] = None
        ):
            self.base_url = base_url
            self.api_key = api_key
            self.model = model
            self.voice = voice
            self.ws = None
            self.on_text_delta = on_text_delta
            self.on_audio_delta = on_audio_delta
            self.on_interrupt = on_interrupt
            self.on_input_transcript = on_input_transcript
            self.on_output_transcript = on_output_transcript
            self.turn_detection_mode = turn_detection_mode
            self.extra_event_handlers = extra_event_handlers or {}
    
            # 当前回复状态
            self._current_response_id = None
            self._current_item_id = None
            self._is_responding = False
            # 输入/输出转录打印状态
            self._print_input_transcript = False
            self._output_transcript_buffer = ""
    
        async def connect(self) -> None:
            """与 Realtime API 建立 WebSocket 连接。"""
            url = f"{self.base_url}?model={self.model}"
            headers = {
                "Authorization": f"Bearer {self.api_key}"
            }
    
            self.ws = await websockets.connect(url, additional_headers=headers)
    
            # 设置默认会话配置
            if self.turn_detection_mode == TurnDetectionMode.MANUAL:
                await self.update_session({
                    "modalities": ["text", "audio"],
                    "voice": self.voice,
                    "input_audio_format": "pcm16",
                    "output_audio_format": "pcm16",
                    "input_audio_transcription": {
                        "model": "gummy-realtime-v1"
                    },
                    "turn_detection" : None
                })
            elif self.turn_detection_mode == TurnDetectionMode.SERVER_VAD:
                await self.update_session({
                    "modalities": ["text", "audio"],
                    "voice": self.voice,
                    "input_audio_format": "pcm16",
                    "output_audio_format": "pcm16",
                    "input_audio_transcription": {
                        "model": "gummy-realtime-v1"
                    },
                    "turn_detection": {
                        "type": "server_vad",
                        "threshold": 0.1,
                        "prefix_padding_ms": 500,
                        "silence_duration_ms": 900
                    }
                })
            else:
                raise ValueError(f"Invalid turn detection mode: {self.turn_detection_mode}")
    
        async def send_event(self, event) -> None:
            event['event_id'] = "event_" + str(int(time.time() * 1000))
            print(f" Send event: type={event['type']}, event_id={event['event_id']}")
            await self.ws.send(json.dumps(event))
    
        async def update_session(self, config: Dict[str, Any]) -> None:
            """更新会话配置。"""
            event = {
                "type": "session.update",
                "session": config
            }
            print("update session: ", event)
            await self.send_event(event)
    
        async def stream_audio(self, audio_chunk: bytes) -> None:
            """向 API 流式发送原始音频数据。"""
            # 仅支持 16bit 16kHz 单声道 PCM
            audio_b64 = base64.b64encode(audio_chunk).decode()
    
            append_event = {
                "type": "input_audio_buffer.append",
                "audio": audio_b64
            }
            await self.send_event(append_event)
    
        async def commit_audio_buffer(self) -> None:
            """提交音频缓冲区以触发处理。"""
            event = {
                "type": "input_audio_buffer.commit"
            }
            await self.send_event(event)
    
        async def append_image(self, image_chunk: bytes) -> None:
            """向视频缓冲区追加图像数据。
            图像数据可以来自本地文件,也可以来自实时视频流。
    
            注意:
                - 图像格式必须为 JPG 或 JPEG。推荐分辨率为 480P 或 720P,最高支持 1080P。
                - 单张图片大小不应超过 500KB。
                - 本方法会将图像数据编码为 Base64 后再发送。
                - 建议以每秒 2 帧的频率向服务器发送图像。
                - 在发送图像数据之前,需要先发送音频数据。
            """
            image_b64 = base64.b64encode(image_chunk).decode()
    
            event = {
                "type": "input_image_buffer.append",
                "image": image_b64
            }
            await self.send_event(event)
    
        async def create_response(self) -> None:
            """向 API 请求生成回复(仅在手动模式下需要调用)。"""
            event = {
                "type": "response.create",
                "response": {
                    "instructions": "You are a helpful assistant.",
                    "modalities": ["text", "audio"]
                }
            }
            print("create response: ", event)
            await self.send_event(event)
    
        async def cancel_response(self) -> None:
            """取消当前回复。"""
            event = {
                "type": "response.cancel"
            }
            await self.send_event(event)
    
        async def handle_interruption(self):
            """处理用户对当前回复的打断。"""
            if not self._is_responding:
                return
    
            print(" Handling interruption")
    
            # 1. 取消当前回复
            if self._current_response_id:
                await self.cancel_response()
    
            self._is_responding = False
            self._current_response_id = None
            self._current_item_id = None
    
        async def handle_messages(self) -> None:
            try:
                async for message in self.ws:
                    event = json.loads(message)
                    event_type = event.get("type")
                    
                    if event_type != "response.audio.delta":
                        print(" event: ", event)
                    else:
                        print(" event_type: ", event_type)
    
                    if event_type == "error":
                        print(" Error: ", event['error'])
                        continue
                    elif event_type == "response.created":
                        self._current_response_id = event.get("response", {}).get("id")
                        self._is_responding = True
                    elif event_type == "response.output_item.added":
                        self._current_item_id = event.get("item", {}).get("id")
                    elif event_type == "response.done":
                        self._is_responding = False
                        self._current_response_id = None
                        self._current_item_id = None
                    # Handle interruptions
                    elif event_type == "input_audio_buffer.speech_started":
                        print(" Speech detected")
                        if self._is_responding:
                            print(" Handling interruption")
                            await self.handle_interruption()
    
                        if self.on_interrupt:
                            print(" Handling on_interrupt, stop playback")
                            self.on_interrupt()
                    elif event_type == "input_audio_buffer.speech_stopped":
                        print(" Speech ended")
                    # Handle normal response events
                    elif event_type == "response.text.delta":
                        if self.on_text_delta:
                            self.on_text_delta(event["delta"])
                    elif event_type == "response.audio.delta":
                        if self.on_audio_delta:
                            audio_bytes = base64.b64decode(event["delta"])
                            self.on_audio_delta(audio_bytes)
                    elif event_type == "conversation.item.input_audio_transcription.completed":
                        transcript = event.get("transcript", "")
                        if self.on_input_transcript:
                            await asyncio.to_thread(self.on_input_transcript,transcript)
                            self._print_input_transcript = True
                    elif event_type == "response.audio_transcript.delta":
                        if self.on_output_transcript:
                            delta = event.get("delta", "")
                            if not self._print_input_transcript:
                                self._output_transcript_buffer += delta
                            else:
                                if self._output_transcript_buffer:
                                    await asyncio.to_thread(self.on_output_transcript,self._output_transcript_buffer)
                                    self._output_transcript_buffer = ""
                                await asyncio.to_thread(self.on_output_transcript,delta)
                    elif event_type == "response.audio_transcript.done":
                        self._print_input_transcript = False
                    elif event_type in self.extra_event_handlers:
                        self.extra_event_handlers[event_type](event)
    
            except websockets.exceptions.ConnectionClosed:
                print(" Connection closed")
            except Exception as e:
                print(" Error in message handling: ", str(e))
    
        async def close(self) -> None:
            """关闭 WebSocket 连接。"""
            if self.ws:
                await self.ws.close()
  • 选择交互模式

    • VAD 模式(Voice Activity Detection,自动检测语音起止)

      Realtime API 自动判断用户何时开始与停止说话并作出回应。

    • Manual 模式(按下即说,松开即发送)

      客户端控制语音起止。用户说话结束后,客户端需主动发送消息至服务端。

    VAD 模式

    omni_realtime_client.py的同级目录下新建另一个 python 文件,命名为vad_mode.py,并将以下代码复制进文件中:

    vad_mode.py

    # -- coding: utf-8 --
    import os, time, base64, asyncio
    from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
    import pyaudio
    import queue
    import threading
    
    # 创建一个全局音频队列和播放线程
    audio_queue = queue.Queue()
    audio_player = None
    # 添加全局中断标志
    interrupt_flag = threading.Event()
    
    # 初始化PyAudio
    p = pyaudio.PyAudio()
    RATE = 24000  # 采样率 24kHz
    CHUNK = 3200  # 每个音频块的大小
    FORMAT = pyaudio.paInt16  # 16位PCM格式
    CHANNELS = 1  # 单声道
    
    def clear_audio_queue():
        """清空音频队列"""
        with audio_queue.mutex:
            audio_queue.queue.clear()
        print("音频队列已清空")
    
    def handle_interrupt():
        """处理中断事件 - 立即停止音频播放"""
        print("检测到语音输入,停止音频播放")
        interrupt_flag.set()  # 设置中断标志
        clear_audio_queue()  # 清空队列
    
    def audio_player_thread():
        """后台线程用于播放音频数据"""
        stream = p.open(
            format=FORMAT,
            channels=CHANNELS,
            rate=24000,
            output=True,
            frames_per_buffer=CHUNK,
        )
    
        try:
            while True:
                try:
                    # 检查中断标志
                    if interrupt_flag.is_set():
                        print("音频播放被中断")
                        interrupt_flag.clear()  # 清除中断标志
                        continue
                    
                    # 从队列获取音频数据
                    audio_data = audio_queue.get(block=True, timeout=0.5)
                    if audio_data is None:  # 结束信号
                        break
                    
                    # 再次检查中断标志(在播放前)
                    if interrupt_flag.is_set():
                        print("音频播放被中断")
                        interrupt_flag.clear()  # 清除中断标志
                        audio_queue.task_done()
                        continue
                    
                    # 播放音频数据
                    stream.write(audio_data)
                    audio_queue.task_done()
                except queue.Empty:
                    # 如果队列为空,继续等待
                    continue
        finally:
            # 清理
            stream.stop_stream()
            stream.close()
    
    
    def start_audio_player():
        """启动音频播放线程"""
        global audio_player
        if audio_player is None or not audio_player.is_alive():
            audio_player = threading.Thread(target=audio_player_thread, daemon=True)
            audio_player.start()
    
    
    def handle_audio_data(audio_data):
        """处理接收到的音频数据"""
        # 打印接收到的音频数据长度(调试用)
        print(f"\n接收到音频数据: {len(audio_data)} 字节")
        # 将音频数据放入队列
        audio_queue.put(audio_data)
    
    
    async def start_microphone_streaming(client: OmniRealtimeClient):
        CHUNK = 3200
        FORMAT = pyaudio.paInt16
        CHANNELS = 1
        RATE = 24000
    
        p = pyaudio.PyAudio()
        stream = p.open(
            format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK
        )
    
        try:
            print("开始录音,请讲话...")
            while True:
                audio_data = stream.read(CHUNK)
                encoded_data = base64.b64encode(audio_data).decode("utf-8")
    
                eventd = {
                    "event_id": "event_" + str(int(time.time() * 1000)),
                    "type": "input_audio_buffer.append",
                    "audio": encoded_data,
                }
                await client.send_event(eventd)
    
                # 保持较短的等待时间以模拟实时交互
                await asyncio.sleep(0.05)
        finally:
            stream.stop_stream()
            stream.close()
            p.terminate()
    
    
    async def main():
        # 启动音频播放线程
        start_audio_player()
    
        realtime_client = OmniRealtimeClient(
            base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime",
            api_key=os.environ.get("DASHSCOPE_API_KEY"),
            model="qwen-omni-turbo-realtime",
            voice="Chelsie",
            on_text_delta=lambda text: print(f"\nAssistant: {text}", end="", flush=True),
            on_audio_delta=handle_audio_data,
            on_interrupt=handle_interrupt,  # 添加中断回调函数
            turn_detection_mode=TurnDetectionMode.SERVER_VAD,
        )
    
        try:
            await realtime_client.connect()
            # 启动消息处理和麦克风录音
            message_handler = asyncio.create_task(realtime_client.handle_messages())
            streaming_task = asyncio.create_task(
                start_microphone_streaming(realtime_client)
            )
    
            while True:
                await asyncio.Queue().get()
        except Exception as e:
            print(f"Error: {e}")
        finally:
            # 结束音频播放线程
            audio_queue.put(None)
            if audio_player:
                audio_player.join(timeout=1)
            await realtime_client.close()
            p.terminate()
    
    if __name__ == "__main__":
        asyncio.run(main())

    运行vad_mode.py,通过麦克风即可与 Realtime 模型实时对话,系统会检测您的音频起始位置并自动发送到服务器,无需您手动发送。

    Manual 模式

    omni_realtime_client.py的同级目录下新建另一个 python 文件,命名为manual_mode.py,并将以下代码复制进文件中:

    manual_mode.py

    # -- coding: utf-8 --
    import os
    import asyncio
    import time
    import threading
    import queue
    import pyaudio
    from omni_realtime_client import OmniRealtimeClient, TurnDetectionMode
    
    class AudioPlayer:
        """实时音频播放器类"""
        def __init__(self, sample_rate=24000, channels=1, sample_width=2):
            self.sample_rate = sample_rate
            self.channels = channels
            self.sample_width = sample_width  # 2 bytes for 16-bit
            self.audio_queue = queue.Queue()
            self.is_playing = False
            self.play_thread = None
            self.pyaudio_instance = None
            self.stream = None
            self._lock = threading.Lock()  # 添加锁来同步访问
            self._last_data_time = time.time()  # 记录最后接收数据的时间
            self._response_done = False  # 添加响应完成标志
            self._waiting_for_response = False # 标记是否正在等待服务器响应
            # 记录最后一次向音频流写入数据的时间及最近一次音频块的时长,用于更精确地判断播放结束
            self._last_play_time = time.time()
            self._last_chunk_duration = 0.0
            
        def start(self):
            """启动音频播放器"""
            with self._lock:
                if self.is_playing:
                    return
                    
                self.is_playing = True
                
                try:
                    self.pyaudio_instance = pyaudio.PyAudio()
                    
                    # 创建音频输出流
                    self.stream = self.pyaudio_instance.open(
                        format=pyaudio.paInt16,  # 16-bit
                        channels=self.channels,
                        rate=self.sample_rate,
                        output=True,
                        frames_per_buffer=1024
                    )
                    
                    # 启动播放线程
                    self.play_thread = threading.Thread(target=self._play_audio)
                    self.play_thread.daemon = True
                    self.play_thread.start()
                    
                    print("音频播放器已启动")
                except Exception as e:
                    print(f"启动音频播放器失败: {e}")
                    self._cleanup_resources()
                    raise
        
        def stop(self):
            """停止音频播放器"""
            with self._lock:
                if not self.is_playing:
                    return
                    
                self.is_playing = False
            
            # 清空队列
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except queue.Empty:
                    break
            
            # 等待播放线程结束(在锁外面等待,避免死锁)
            if self.play_thread and self.play_thread.is_alive():
                self.play_thread.join(timeout=2.0)
            
            # 再次获取锁来清理资源
            with self._lock:
                self._cleanup_resources()
            
            print("音频播放器已停止")
        
        def _cleanup_resources(self):
            """清理音频资源(必须在锁内调用)"""
            try:
                # 关闭音频流
                if self.stream:
                    if not self.stream.is_stopped():
                        self.stream.stop_stream()
                    self.stream.close()
                    self.stream = None
            except Exception as e:
                print(f"关闭音频流时出错: {e}")
            
            try:
                if self.pyaudio_instance:
                    self.pyaudio_instance.terminate()
                    self.pyaudio_instance = None
            except Exception as e:
                print(f"终止PyAudio时出错: {e}")
        
        def add_audio_data(self, audio_data):
            """添加音频数据到播放队列"""
            if self.is_playing and audio_data:
                self.audio_queue.put(audio_data)
                with self._lock:
                    self._last_data_time = time.time()  # 更新最后接收数据的时间
                    self._waiting_for_response = False # 收到数据,不再等待
        
        def stop_receiving_data(self):
            """标记不再接收新的音频数据"""
            with self._lock:
                self._response_done = True
                self._waiting_for_response = False # 响应结束,不再等待
        
        def prepare_for_next_turn(self):
            """为下一轮对话重置播放器状态。"""
            with self._lock:
                self._response_done = False
                self._last_data_time = time.time()
                self._last_play_time = time.time()
                self._last_chunk_duration = 0.0
                self._waiting_for_response = True # 开始等待下一轮响应
            
            # 清空上一轮可能残留的音频数据
            while not self.audio_queue.empty():
                try:
                    self.audio_queue.get_nowait()
                except queue.Empty:
                    break
    
        def is_finished_playing(self):
            """检查是否已经播放完所有音频数据"""
            with self._lock:
                queue_size = self.audio_queue.qsize()
                time_since_last_data = time.time() - self._last_data_time
                time_since_last_play = time.time() - self._last_play_time
                
                # ---------------------- 智能结束判定 ----------------------
                # 1. 首选:如果服务器已标记完成且播放队列为空
                #    进一步等待最近一块音频播放完毕(音频块时长 + 0.1s 容错)。
                if self._response_done and queue_size == 0:
                    min_wait = max(self._last_chunk_duration + 0.1, 0.5)  # 至少等待 0.5s
                    if time_since_last_play >= min_wait:
                        return True
    
                # 2. 备用:如果长时间没有新数据且播放队列为空
                #    当服务器没有明确发出 `response.done` 时,此逻辑作为保障
                if not self._waiting_for_response and queue_size == 0 and time_since_last_data > 1.0:
                    print("\n(超时未收到新音频,判定播放结束)")
                    return True
                
                return False
        
        def _play_audio(self):
            """播放音频数据的工作线程"""
            while True:
                # 检查是否应该停止
                with self._lock:
                    if not self.is_playing:
                        break
                    stream_ref = self.stream  # 获取流的引用
                
                try:
                    # 从队列中获取音频数据,超时0.1秒
                    audio_data = self.audio_queue.get(timeout=0.1)
                    
                    # 再次检查状态和流的有效性
                    with self._lock:
                        if self.is_playing and stream_ref and not stream_ref.is_stopped():
                            try:
                                # 播放音频数据
                                stream_ref.write(audio_data)
                                # 更新最近播放信息
                                self._last_play_time = time.time()
                                self._last_chunk_duration = len(audio_data) / (self.channels * self.sample_width) / self.sample_rate
                            except Exception as e:
                                print(f"写入音频流时出错: {e}")
                                break
                        
                    # 标记该数据块已处理完成
                    self.audio_queue.task_done()
    
                except queue.Empty:
                    # 队列为空时继续等待
                    continue
                except Exception as e:
                    print(f"播放音频时出错: {e}")
                    break
    
    class MicrophoneRecorder:
        """实时麦克风录音器"""
        def __init__(self, sample_rate=16000, channels=1, chunk_size=1024):
            self.sample_rate = sample_rate
            self.channels = channels
            self.chunk_size = chunk_size
            self.pyaudio_instance = None
            self.stream = None
            self.frames = []
            self._is_recording = False
            self._record_thread = None
    
        def _recording_thread(self):
            """录音工作线程"""
            # 在 _is_recording 为 True 期间,持续从音频流中读取数据
            while self._is_recording:
                try:
                    # 使用 exception_on_overflow=False 避免因缓冲区溢出而崩溃
                    data = self.stream.read(self.chunk_size, exception_on_overflow=False)
                    self.frames.append(data)
                except (IOError, OSError) as e:
                    # 当流被关闭时,读取操作可能会引发错误
                    print(f"录音流读取错误,可能已关闭: {e}")
                    break
    
        def start(self):
            """开始录音"""
            if self._is_recording:
                print("录音已在进行中。")
                return
            
            self.frames = []
            self._is_recording = True
            
            try:
                self.pyaudio_instance = pyaudio.PyAudio()
                self.stream = self.pyaudio_instance.open(
                    format=pyaudio.paInt16,
                    channels=self.channels,
                    rate=self.sample_rate,
                    input=True,
                    frames_per_buffer=self.chunk_size
                )
                
                self._record_thread = threading.Thread(target=self._recording_thread)
                self._record_thread.daemon = True
                self._record_thread.start()
                print("麦克风录音已开始...")
            except Exception as e:
                print(f"启动麦克风失败: {e}")
                self._is_recording = False
                self._cleanup()
                raise
    
        def stop(self):
            """停止录音并返回音频数据"""
            if not self._is_recording:
                return None
                
            self._is_recording = False
            
            # 等待录音线程安全退出
            if self._record_thread:
                self._record_thread.join(timeout=1.0)
            
            self._cleanup()
            
            print("麦克风录音已停止。")
            return b''.join(self.frames)
    
        def _cleanup(self):
            """安全地清理 PyAudio 资源"""
            if self.stream:
                try:
                    if self.stream.is_active():
                        self.stream.stop_stream()
                    self.stream.close()
                except Exception as e:
                    print(f"关闭音频流时出错: {e}")
            
            if self.pyaudio_instance:
                try:
                    self.pyaudio_instance.terminate()
                except Exception as e:
                    print(f"终止 PyAudio 实例时出错: {e}")
    
            self.stream = None
            self.pyaudio_instance = None
    
    async def interactive_test():
        """
        交互式测试脚本:允许多轮连续对话,每轮可以发送音频和图片。
        """
        # ------------------- 1. 初始化和连接 (一次性) -------------------
        api_key = os.environ.get("DASHSCOPE_API_KEY")
        if not api_key:
            print("请设置DASHSCOPE_API_KEY环境变量")
            return
    
        print("--- 实时多轮音视频对话客户端 ---")
        print("正在初始化音频播放器和客户端...")
        
        audio_player = AudioPlayer()
        audio_player.start()
    
        def on_audio_received(audio_data):
            audio_player.add_audio_data(audio_data)
    
        def on_response_done(event):
            print("\n(收到响应结束标记)")
            audio_player.stop_receiving_data()
    
        realtime_client = OmniRealtimeClient(
            base_url="wss://dashscope.aliyuncs.com/api-ws/v1/realtime",
            api_key=api_key,
            model="qwen-omni-turbo-realtime",
            voice="Ethan",
            on_text_delta=lambda text: print(f"助手回复: {text}", end="", flush=True),
            on_audio_delta=on_audio_received,
            turn_detection_mode=TurnDetectionMode.MANUAL,
            extra_event_handlers={"response.done": on_response_done}
        )
    
        message_handler_task = None
        try:
            await realtime_client.connect()
            print("已连接到服务器。输入 'q' 或 'quit' 可随时退出程序。")
            message_handler_task = asyncio.create_task(realtime_client.handle_messages())
            await asyncio.sleep(0.5)
    
            turn_counter = 1
            # ------------------- 2. 多轮对话循环 -------------------
            while True:
                print(f"\n--- 第 {turn_counter} 轮对话 ---")
                audio_player.prepare_for_next_turn()
    
                recorded_audio = None
                image_paths = []
                
                # --- 获取用户输入:从麦克风录音 ---
                loop = asyncio.get_event_loop()
                recorder = MicrophoneRecorder(sample_rate=16000) # 推荐使用16k采样率进行语音识别
    
                print("准备录音。按 Enter 键开始录音 (或输入 'q' 退出)...")
                user_input = await loop.run_in_executor(None, input)
                if user_input.strip().lower() in ['q', 'quit']:
                    print("用户请求退出...")
                    return
                
                try:
                    recorder.start()
                except Exception:
                    print("无法启动录音,请检查您的麦克风权限和设备。跳过本轮。")
                    continue
                
                print("录音中... 再次按 Enter 键停止录音。")
                await loop.run_in_executor(None, input)
    
                recorded_audio = recorder.stop()
    
                if not recorded_audio or len(recorded_audio) == 0:
                    print("未录制到有效音频,请重新开始本轮对话。")
                    continue
    
                # --- 获取图片输入 (可选) ---
                # 以下图片输入功能已被注释,暂时禁用。若需启用请取消下方代码注释。
                # print("\n请逐行输入【图片文件】的绝对路径 (可选)。完成后,输入 's' 或按 Enter 发送请求。")
                # while True:
                #     path = input("图片路径: ").strip()
                #     if path.lower() == 's' or path == '':
                #         break
                #     if path.lower() in ['q', 'quit']:
                #         print("用户请求退出...")
                #         return
                #     
                #     if not os.path.isabs(path):
                #         print("错误: 请输入绝对路径。")
                #         continue
                #     if not os.path.exists(path):
                #         print(f"错误: 文件不存在 -> {path}")
                #         continue
                #     image_paths.append(path)
                #     print(f"已添加图片: {os.path.basename(path)}")
                
                # --- 3. 发送数据并获取响应 ---
                print("\n--- 输入确认 ---")
                print(f"待处理音频: 1个 (来自麦克风), 图片: {len(image_paths)}个")
                print("------------------")
    
                # 3.1 发送录制的音频
                try:
                    print(f"发送麦克风录音 ({len(recorded_audio)}字节)")
                    await realtime_client.stream_audio(recorded_audio)
                    await asyncio.sleep(0.1)
                except Exception as e:
                    print(f"发送麦克风录音失败: {e}")
                    continue
    
                # 3.2 发送所有图片文件
                # 以下图片发送代码已被注释,暂时禁用。
                # for i, path in enumerate(image_paths):
                #     try:
                #         with open(path, "rb") as f:
                #             data = f.read()
                #         print(f"发送图片 {i+1}: {os.path.basename(path)} ({len(data)}字节)")
                #         await realtime_client.append_image(data)
                #         await asyncio.sleep(0.1)
                #     except Exception as e:
                #         print(f"发送图片 {os.path.basename(path)} 失败: {e}")
    
                # 3.3 提交并等待响应
                print("提交所有输入,请求服务器响应...")
                await realtime_client.commit_audio_buffer()
                await realtime_client.create_response()
    
                print("等待并播放服务器响应音频...")
                start_time = time.time()
                max_wait_time = 60
                while not audio_player.is_finished_playing():
                    if time.time() - start_time > max_wait_time:
                        print(f"\n等待超时 ({max_wait_time}秒), 进入下一轮。")
                        break
                    await asyncio.sleep(0.2)
                
                print("\n本轮音频播放完成!")
                turn_counter += 1
    
        except (asyncio.CancelledError, KeyboardInterrupt):
            print("\n程序被中断。")
        except Exception as e:
            print(f"发生未处理的错误: {e}")
        finally:
            # ------------------- 4. 清理资源 -------------------
            print("\n正在关闭连接并清理资源...")
            if message_handler_task and not message_handler_task.done():
                message_handler_task.cancel()
            
            if 'realtime_client' in locals() and realtime_client.ws and not realtime_client.ws.close:
                await realtime_client.close()
                print("连接已关闭。")
    
            audio_player.stop()
            print("程序退出。")
    
    if __name__ == "__main__":
        try:
            asyncio.run(interactive_test())
        except KeyboardInterrupt:
            print("\n程序被用户强制退出。") 

    运行manual_mode.py,按 Enter 键开始说话,再按一次获取模型响应的音频。

交互流程

VAD 模式

session.update事件的session.turn_detection 设为"server_vad"以启用 VAD 模式。此模式下,服务端自动检测语音起止并进行响应。适用于语音通话场景。

交互流程如下:

  1. 服务端检测到语音开始,发送input_audio_buffer.speech_started 事件。

  2. 客户端随时发送 input_audio_buffer.append事件追加音频至缓冲区。

  3. 服务端检测到语音结束,发送input_audio_buffer.speech_stopped事件。

  4. 服务端发送input_audio_buffer.commited 事件提交音频缓冲区。

  5. 服务端发送 conversation.item.created事件,包含从缓冲区创建的用户消息项。

生命周期

客户端事件

服务器事件

会话初始化

session.update

会话配置

session.created

会话已创建

session.updated

会话配置已更新

用户音频输入

input_audio_buffer.append

添加音频到缓冲区

input_image_buffer.append

添加图片到缓冲区

input_audio_buffer.speech_started

检测到语音开始

input_audio_buffer.speech_stopped

检测到语音结束

input_audio_buffer.commited

服务器收到提交的音频

服务器音频输出

response.created

服务端开始生成响应

response.output_item.added

响应时有新的输出内容

conversation.item.created

对话项被创建

response.content_part.added

新的输出内容添加到assistant message

response.audio_transcript.delta

增量生成的转录文字

response.audio.delta

模型增量生成的音频

response.audio_transcript.done

文本转录完成

response.audio.done

音频生成完成

response.content_part.done

Assistant mesasge 的文本或音频内容流式输出完成

response.output_item.done

Assistant mesasge 的整个输出项流式传输完成

response.done

响应完成

Manual 模式

session.update事件的session.turn_detection 设为 None以启用 Manual 模式。此模式下,客户端通过显式发送input_audio_buffer.commitresponse.create事件请求服务器响应。适用于按下即说场景,如聊天软件中的发送语音。

交互流程如下:

  1. 客户端发送 input_audio_buffer.append事件追加音频到缓冲区。

    也可以发送 input_image_buffer.append 事件追加图像到缓冲区。
  2. 客户端发送input_audio_buffer.commit事件提交音频缓冲区与图像缓冲区,创建新用户消息项。

  3. 服务端响应 input_audio_buffer.commited事件。

  4. 客户端发送response.create事件,等待服务端返回模型的输出。

  5. 服务端响应conversation.item.created事件。

生命周期

客户端事件

服务器事件

会话初始化

session.update

会话配置

session.created

会话已创建

session.updated

会话配置已更新

用户音频输入

input_audio_buffer.append

添加音频到缓冲区

input_image_buffer.append

添加图片到缓冲区

input_audio_buffer.commit

提交音频与图片到服务器

response.create

创建模型响应

input_audio_buffer.commited

服务器收到提交的音频

服务器音频输出

input_audio_buffer.clear

清除缓冲区的音频

response.created

服务端开始生成响应

response.output_item.added

响应时有新的输出内容

conversation.item.created

对话项被创建

response.content_part.added

新的输出内容添加到assistant message 项

response.audio_transcript.delta

增量生成的转录文字

response.audio.delta

模型增量生成的音频

response.audio_transcript.done

完成文本转录

response.audio.done

完成音频生成

response.content_part.done

Assistant mesasge 的文本或音频内容流式输出完成

response.output_item.done

Assistant mesasge 的整个输出项流式传输完成

response.done

响应完成

API 参考

常见问题

Q1:如何在线体验 Qwen-Omni-Realtime 模型?

A:您可以通过以下方式一键部署:

  1. 访问函数计算模板部署类型选择直接部署百炼 API-KEY 填入您的 API Key;单击创建并部署默认环境

  2. 等待约一分钟,在环境详情环境信息中获取访问域名将访问域名的http改成https(示例:https://omni-realtime.fcv3.xxxx.cn-hangzhou.fc.devsapp.net),通过该 HTTPS 链接与模型进行视频/语音通话。

通过资源信息-函数资源查看项目源代码。
函数计算阿里云百炼均为新用户提供免费额度,可以覆盖简单调试所需成本,额度耗尽后按量计费。只有在访问的情况下会产生费用。

Q2:怎么向模型输入图片?

A:通过客户端发送 input_image_buffer.append 事件。

若用于视频通话场景,可以对视频抽帧,以不超过每秒两帧的速度发送 input_image_buffer.append 事件。 DashScope SDK 代码请参见Omni-Realtime 示例代码

上一篇: 全模态(Qwen-Omni) 下一篇: 用户指南(应用)