具身智能场景的AI实时互动接入

本文介绍如何将阿里云实时音视频(ARTC)服务集成到运行 Linux 系统的具身智能设备(如机器人)中。

前置准备

创建一个音视频通话智能体,详细步骤请参见音视频通话快速入门

阿里云提供了Python版的Linux Demo示例供参考代码实现。

核心概念与限制

在深入代码之前,请重点关注以下 Linux 平台集成的关键概念与限制:

  • ARTC SDK:Linux 平台当前没有独立的 AI 实时互动SDK。所有功能均通过通用的 Linux ARTC SDK 实现。

  • 自定义音视频 I/O:Linux SDK 仅支持自定义音频输入和播放模式。您需要自行负责从麦克风采集音频数据并推送给 SDK,同时从 SDK 获取音频数据并交由扬声器播放。SDK 不会直接操作声卡设备,这为嵌入式系统提供了最大的灵活性。

  • 平台架构:

    • x86 架构:官方直接提供的 Linux ARTC SDK 为 x86 架构。

    • ARM 架构:若您的设备为 ARM 架构,建议先使用 x86 版本在开发环境中完成流程验证。验证通过后,请联系阿里云商务并提供ARM平台工具链,以获取定制的 ARM 版本 SDK,再做替换。

集成步骤

下图清晰地展示了具身智能客户端与 AI 智能体之间通过 ARTC 服务的完整交互流程,Linux SDK集成步骤请参见Linux端实现音视频通信

image

1. 引擎初始化与参数配置

正确初始化 RTC 引擎并设置关键参数是实现低延迟 AI 互动的第一步。

# 引擎初始化
eventHandler = EngineEventListener() 
h5mode = False # 与Web端互通请设置为True,开启H5兼容模式
currentPath = os.getcwd()
coreServicePath = os.path.abspath(os.path.join(currentPath, "Release", "lib", "AliRtcCoreService"))
# 设置AI实时互动低延时配置
extra_jobj = {"user_specified_disable_audio_ranking": "true",
                            "user_specified_client_biz":1,
"neteq_maximum_delay":50,                            "user_specified_use_external_audio_record":"TRUE"}
extra = json.dumps(extra_jobj)
self.rtc_engine = AliRTCEngine.CreateAliRTCEngine(eventHandler, 42000, 45000, "/tmp", coreServicePath, h5mode, extra)

# 本地创建token,如实现了app server,可从app server获取token。
authInfo = AuthInfo()
authInfo.appid = '{appid}'
appkey = '{appkey}'
authInfo.userid = 'test'
authInfo.username = authInfo.userid
authInfo.channel = chid
expire = datetime.datetime.now() + datetime.timedelta(days=1)
authInfo.timestamp = int(time.mktime(expire.timetuple()))
authInfo.token = self.rtc_engine.GenerateToken(authInfo, appkey)

# 开启音频dump,调试音频问题使用,user_specified_audio_dump_path的值请修改成本地环境存在的路径
params = {
                "audio": {
                    "user_specified_audio_dump_on_call": "TRUE",
                    "user_specified_audio_dump_path": "/home/rtc/aliyun-audio/"
                }
            }
self.rtc_engine.SetParameter(json.dumps(params))

# 开启dataChannel收发功能
self.rtc_engine.SetParameter('{"data":{"enablePubDataChannel":true,"enableSubDataChannel":true}}')

# 开启音频推流
self.rtc_engine.PublishLocalAudioStream(True)
# 开启视频推流
self.rtc_engine.PublishLocalVideoStream(True)
self.rtc_engine.EnableLocalVideo(False)
# 开启自定义音频输入能力,设置来自业务输入音频的采样率和通道数
self.rtc_engine.SetExternalAudioSource(True, sampleRate=16000, channelsPerFrame=1)
# 设置实时互动角色
self.rtc_engine.SetClientRole(AliEngineClientRole.AliEngineClientRoleInteractive)

# Join Channel
joinConfig = JoinChannelConfig()
joinConfig.channelProfile = ChannelProfile.ChannelProfileInteractiveLive
joinConfig.subscribeAudioFormat = AudioFormat.AudioFormatPcmBeforMixing
joinConfig.isAudioOnly = False
joinConfig.publishAvsyncMode = PublishAvsyncMode.PublishAvsyncNoDelay
joinConfig.subscribeMode = SubscribeMode.SubscribeAutomatically
joinConfig.publishMode = PublishMode.PublishAutomatically
# AI实时互动场景中,具身智能端必须以human的身份入会
self.rtc_engine.JoinChannelWithProperty(authInfo.token, AliEngineUserParam(
    channelId=authInfo.channel,
    userId = authInfo.userid,
    userName = authInfo.username,
    capabilityProfile = AliCapabilityProfile.AliCapabilityProfileAiHuman
), joinConfig)

2. 设置回声消除( AEC)

开启回声消除 (AEC):为避免机器人扬声器播放的声音被麦克风再次采集造成回声,需要在成功入会后开启 AEC 功能。

OnJoinChannelResult 回调函数中执行此操作:

def OnJoinChannelResult(self, result:int, channel:str, userId:str) -> None:
  self.rtc_engine.SetParameter('{"audio":{"user_specified_external_audio_aec":"TRUE"}}')

3.自定义输入与播放

Linux SDK仅支持自定义音频输入和播放,不支持 SDK 直接控制声卡进行采集和播放,参考文档自定义音频采集

  • 发送用户音频数据:您的应用程序需要从麦克风或其他来源采集 PCM 音频数据,然后通过以下接口推送给 SDK 进行编码和发送。

self.rtc_engine.PushExternalAudioFrameRawData(audioSamples, sampleLength, timestamp)
  • 接收 AI 音频数据:AI 智能体生成的语音(PCM 数据)会通过 OnSubscribeAudioFrame 回调函数返回。您可以在此函数中获取音频数据,并通过扬声器播放,或进行其他处理。

def OnSubscribeAudioFrame(self, uid: str, frame: AliRTCEngine.AudioFrame) -> None:
  pass

功能实践

基础篇:和具身智能机器人对话,如何透传lm输出的指令数据?

当用户发出指令(如“向我招招手”)时,需要从大语言模型(LLM)生成指令传递到机器人端执行。以下是三种推荐的实现方式:

  1. LLM Function Calling

    • ASR 识别用户语音并转为文本,发送给 LLM。

    • LLM 理解意图,并调用预先定义好的 Function Call(自定义插件)。

    • 该插件直接触发您业务服务端的相应接口,从而远程操控机器人执行动作。

  2. 过滤播报 + 客户端指令解析

    • ASR -> LLM。LLM 生成回复,其中包含口播文本(如“好的,看我给你表演一个”)和机器指令(如 {"action": "wave_hand"})。

    • 利用 AI 实时互动的“过滤播报”能力,播报时过滤掉指令内容,详细请参见过滤播报内容

    • 机器人客户端接收到消息,解析指令并调用本地 API 执行动作。

  3. 服务端回调 + Data Channel

进阶篇:接收实时字幕与自定义消息

AI 互动过程中的实时字幕、智能体状态、以及上文提到的自定义指令,都是通过 Data Channel 传输的。客户端需要监听 OnDataChannelMsg 回调来接收这些信息。详细请参见实时字幕

def OnDataChannelMsg(self, uid: str, msg: AliRTCEngine.AliEngineDataChannelMsg) -> None:
    dataChannelMsg = msg.data.decode('utf-8')
    try:
        print(dataChannelMsg)
        msg = json.loads(dataChannelMsg)
        if msg["data"] and msg["data"]["text"]:
            # 提取实时字幕文本
            print(msg["data"]["text"])
    except Exception as e:
        pass