Tablestore记忆存储在AgentRun中的实现

更新时间:
复制为 MD 格式

AgentRun 通过集成表格存储(Tablestore),为智能体提供三种持久化记忆能力:在同一对话中维持上下文的会话历史、跨会话保留用户偏好等结构化信息的长期记忆,以及可直接读写的会话状态。本文介绍如何创建并配置记忆存储,并通过可运行的代码示例演示三种记忆类型的使用方式。

记忆类型

类型

作用

触发方式

跨会话

Tablestore 表

会话历史

记录同一 session 内的完整对话轮次,维持多轮对话上下文

每次请求自动写入,传入相同 session_id 即可

session(元数据)+ message(消息)

长期记忆

从对话中提炼用户偏好等结构化信息,以向量形式持久化,让 Agent 随使用逐渐了解用户

Agent 调用 MCP 工具自动提取并存储(需开启 MCP)

mem0

会话状态

存储每次对话的元数据,如任务进度、用户设置等

仅支持通过 SDK 显式读写,AgentRun控制台快速创建的Agent不支持会话状态

session(状态)

快速使用

步骤一:创建记忆存储

  1. 登录函数计算控制台,在左侧菜单栏单击函数智能 > 智能体 AgentRun,进入AgentRun控制台。

    重要

    如果进入AgentRun控制台后,右上角有“体验新版”字样,单击切换到新版控制台。

  2. 在左侧菜单栏单击记忆,然后单击添加记忆存储,填写相关配置项:

    配置项

    说明

    名称

    必填,系统自动生成(如 mem-UiH9),可自定义

    向量数据库

    默认自动配置(推荐),自动创建并关联 Tablestore 向量数据库

    长期记忆

    默认开启,需选择大语言模型服务、向量模型服务和执行角色

    会话历史

    默认开启

    会话状态

    默认开启

    网络配置

    默认允许公网访问;也可配置 VPC 访问;至少选择一种

  3. 单击开始部署,等待记忆存储部署完成。

步骤二:创建 Agent 并体验记忆功能

  1. 在左侧菜单栏单击Agent运行时,然后单击创建Agent,选择快速创建,填写相关配置项:

    配置项

    说明

    Agent名称

    必填,系统自动生成(如 agent-quick-n31r8),可自定义

    模型服务

    选择已配置的模型,如 qwen3.5-plus

    系统提示词

    可选择模板,如“智能问答助手”

    工具与上下文 > 记忆

    添加步骤一创建的记忆存储

    说明

    使用长期记忆功能时,需按页面提示开启MCP,Agent 才能自动提取和读取长期记忆。

    访问凭证

    默认匿名访问,可选择已有凭证或创建新凭证

  2. 单击创建Agent,等待 Agent 启动完成。

  3. 模型对比测试窗口中:

    • 输入“我叫小明,最近在学习向量数据库”,观察 Agent 回复

      说明

      函数计算遵循Serverless(无服务器)架构,只有在请求到达时才创建实例,并能及时释放实例帮助用户节省成本。因此,首次提问时,会有较长的等待时间。

    • 在同一对话中继续提问“我叫什么?”,验证会话历史(Agent 能记住本轮对话内容)

    • 新建会话,提问“我叫什么?”,验证长期记忆(Agent 能跨会话记住用户信息)

代码集成

环境准备

pip3 install alibabacloud_agentrun20250910 openai tablestore agentrun-sdk agentrun-mem0ai google-adk litellm langchain-openai

配置环境变量(请勿将密钥硬编码在代码中):

# 阿里云账号凭据
export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"
export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"

# Agent调用配置(在控制台Agent运行时 > 详情 > 概览 > 访问信息中获取)
export AGENT_ENDPOINT="https://{ACCOUNT_ID}.agentrun-data.{REGION}.aliyuncs.com/agent-runtimes/{AGENT_NAME}/endpoints/Default/invocations"
# Agent调用凭证(在控制台凭证管理中获取)
export AGENT_API_KEY="your-agent-api-key"

# 记忆存储名称(步骤一创建后获取)
export MEMORY_COLLECTION_NAME="mem-xxxx"

# DashScope API Key(LangChain 和 Google ADK 示例使用)
export DASHSCOPE_API_KEY="your-dashscope-api-key"

# Tablestore 配置(会话状态示例使用)
export TABLESTORE_ENDPOINT="https://{INSTANCE}.{REGION}.ots.aliyuncs.com"
export TABLESTORE_INSTANCE="your-instance-name"

会话历史

三轮对话测试:第 1 轮告知姓名和职业,第 2 轮直接问"我是谁",验证 Agent 是否记住了上文;第 3 轮继续追问,验证完整上下文是否贯穿始终。每次运行生成新的 session_id,互不干扰。

import os
import re
import threading
import time
import uuid

from openai import OpenAI

# ── 配置 ──────────────────────────────────────────────────────────────────────

AGENT_ENDPOINT = os.environ["AGENT_ENDPOINT"]
AGENT_API_KEY = os.environ["AGENT_API_KEY"]
MODEL = os.getenv("MODEL", "qwen3.5-plus")

# 从 Endpoint URL 解析账号 ID 和 Agent 名称。
# Endpoint 格式:https://{ACCOUNT_ID}.agentrun-data.{REGION}.aliyuncs.com/agent-runtimes/{AGENT_NAME}/endpoints/Default/invocations
# 注意:这里使用账号 ID 仅适合单用户演示场景。在多用户应用中,
# user_id 应代表终端用户(如应用内的用户 ID),而非开发者账号。
_account_match = re.match(r"https://(\d+)\.agentrun-data\.", AGENT_ENDPOINT)
USER_ID = _account_match.group(1) if _account_match else "default_user"

_agent_match = re.search(r"/agent-runtimes/([^/]+)/endpoints/", AGENT_ENDPOINT)
AGENT_ID = _agent_match.group(1) if _agent_match else "default_agent"
client = OpenAI(
    base_url=f"{AGENT_ENDPOINT}/openai/v1",
    api_key=AGENT_API_KEY,
)


# ── 对话函数 ──────────────────────────────────────────────────────────────────

def _spinner(stop_event: threading.Event):
    """在等待首个 Token 期间显示转圈动画。"""
    frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
    i = 0
    while not stop_event.is_set():
        print(f'\r助手: 正在思考... {frames[i % len(frames)]}', end='', flush=True)
        time.sleep(0.1)
        i += 1
    print('\r' + ' ' * 35 + '\r', end='', flush=True)


def chat(session_id: str, history: list, message: str) -> str:
    """向 Agent 发送消息,通过 session_id 和消息历史维持会话上下文。

    history 为本轮之前的对话历史,格式为
        [{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...]
    只保留 user/assistant 的文本内容(不含 tool_calls),历史消息与本轮消息
    一起发送,Agent 可从上下文中直接读取之前的对话内容。

    session_id 通过请求头 X-AgentRun-Conversation-ID 传递,AgentRun 用其
    关联 Tablestore 中的会话记录(用 extra_body 传递无效,服务端不读请求体)。
    """
    print(f"用户: {message}")

    # 启动转圈动画(覆盖冷启动和 MCP 工具调用的等待时间)
    stop_event = threading.Event()
    spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True)
    spinner.start()

    stream = client.chat.completions.create(
        model=MODEL,
        messages=history + [{"role": "user", "content": message}],
        stream=True,
        extra_headers={
            "X-AgentRun-Conversation-ID": session_id,
            "X-AgentRun-User-ID": USER_ID,
            "X-AgentRun-Agent-ID": AGENT_ID,
        },
    )

    full_reply = []
    first_token = True
    for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            token = chunk.choices[0].delta.content
            if first_token:
                stop_event.set()
                spinner.join()
                print("助手: ", end="", flush=True)
                first_token = False
            print(token, end="", flush=True)
            full_reply.append(token)

    stop_event.set()
    spinner.join()
    print("\n")
    return "".join(full_reply)


# ── 主程序 ────────────────────────────────────────────────────────────────────

def main():
    # 每次运行生成新的 session_id,避免与上次运行的历史混淆。
    # 在实际应用中,session_id 通常由应用层管理(如绑定用户的对话 ID)。
    session_id = str(uuid.uuid4())
    history = []  # 维护对话历史(只含 user/assistant 文本消息)

    print("═" * 50)
    print("示例:多轮对话(会话历史)")
    print("═" * 50)
    print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。")

    # 第一轮:自我介绍
    print("\n[第 1 轮]")
    q1 = "我叫小明,我是一名数据工程师,最近在研究 Tablestore"
    r1 = chat(session_id, history, q1)
    history.extend([
        {"role": "user", "content": q1},
        {"role": "assistant", "content": r1},
    ])

    # 第二轮:测试 Agent 是否记住了第一轮的内容
    print("[第 2 轮]")
    q2 = "我叫什么名字?我从事什么工作?我最近在研究什么?"
    r2 = chat(session_id, history, q2)
    history.extend([
        {"role": "user", "content": q2},
        {"role": "assistant", "content": r2},
    ])

    # 第三轮:继续深入话题
    print("[第 3 轮]")
    q3 = "针对我的工作,你有什么 Tablestore 使用建议?"
    chat(session_id, history, q3)


if __name__ == "__main__":
    main()
说明

推荐使用流式模式(stream=True):Agent 在处理消息时会调用 MCP 记忆工具(读写 Tablestore),流式模式可避免客户端因等待 MCP 工具调用而超时。

将上述代码保存为 conversation_history.py 并运行:

python3 conversation_history.py

长期记忆

方式一:通过MCP集成

在记忆详情的长期记忆 > MCP集成启动服务配置,启动后获取 MCP 服务 URL(SSE),配置到支持 MCP 工具调用的 Agent 中,即可自动提取、更新、删除长期记忆。

说明

通过控制台使用Agent运行时,可在创建Agent配置记忆时开启MCP(未提示表示已开启),Agent会在每轮对话中自动调用 MCP 工具检索和更新长期记忆。对话中出现可提取的用户信息时(如:“我喜欢喝咖啡”),Agent 会自动存入向量库供后续跨 session 使用。

方式二:直接操作 mem0 向量存储

与方式一不同,方式二需要由代码显式调用 memory.add() 写入记忆——mem0 不会自动监听对话,也不会自动判断"哪些内容值得记录"。这种方式适合在自定义 Agent 框架或数据管道中,由业务逻辑精确控制记忆的写入时机和内容。

本示例写入一段用户描述,mem0 内部调用 LLM 将其拆解为多条结构化记忆(如“名叫 Alice”、“喜欢喝咖啡”),然后用三个不同角度的语义查询验证检索效果。

import os
import time

from agentrun.memory_collection import MemoryCollection

# ── 配置 ──────────────────────────────────────────────────────────────────────

MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]

# agentrun-sdk 自动从以下环境变量读取凭证(无需在代码中显式传入):
#   ALIBABA_CLOUD_ACCESS_KEY_ID / ALIBABA_CLOUD_ACCESS_KEY_SECRET
#   AGENTRUN_REGION(默认 cn-hangzhou)


# ── 初始化 mem0 客户端 ────────────────────────────────────────────────────────

# to_mem0_memory() 自动完成两件事:
#   1. 从 AgentRun 控制平面读取记忆存储配置(Tablestore 实例、集合、向量维度等)
#   2. 使用读取到的配置初始化 agentrun-mem0ai Memory 客户端
memory = MemoryCollection.to_mem0_memory(MEMORY_COLLECTION_NAME)


# ── 核心操作 ──────────────────────────────────────────────────────────────────

def add_memories(user_id: str):
    """向记忆存储中添加用户信息。

    memory.add() 支持两种输入:
      - str:直接写入一段描述文本,mem0 内部会自动提炼为结构化记忆
      - list[dict]:完整的对话消息列表(role/content),mem0 从中提取用户偏好
    本示例使用文本输入演示基本用法。
    """
    print(f"\n[添加记忆] user_id={user_id}")

    result = memory.add(
        "我叫 Alice,是一名 Python 工程师,平时喜欢喝咖啡,最近在研究向量数据库。",
        user_id=user_id,
        metadata={"source_app": "openmemory", "mcp_client": "python_sdk"},
    )
    results = result.get("results", [])
    print(f"  写入 {len(results)} 条记忆:")
    for i, res in enumerate(results, 1):
        print(f"    {i}. [{res.get('event')}] {res.get('memory', '')}")


def search_memories(user_id: str, query: str):
    """从记忆存储中检索与查询语义最相关的记忆。"""
    print(f"\n[检索记忆] 查询: {query!r}")

    results = memory.search(query, user_id=user_id)
    hits = results.get("results", [])
    if not hits:
        print("  (未找到相关记忆)")
        return

    for i, hit in enumerate(hits, 1):
        score = hit.get("score", 0)
        content = hit.get("memory", "")
        print(f"  {i}. [{score:.4f}] {content}")


# ── 主程序 ────────────────────────────────────────────────────────────────────

def main():
    print("═" * 55)
    print("长期记忆示例 —— 直接操作 mem0 向量存储")
    print("═" * 55)
    print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。")

    user_id = "alice-demo"

    # 写入记忆
    add_memories(user_id)

    # 等待 Tablestore 向量索引刷新(首次写入后搜索需要短暂延迟)
    print("\n等待向量索引刷新...")
    time.sleep(3)

    # 语义检索:不同的查询角度
    print()
    print("──" * 27)
    search_memories(user_id, "她喜欢什么饮料?")
    search_memories(user_id, "她是做什么工作的?")
    search_memories(user_id, "她最近在研究什么技术?")


if __name__ == "__main__":
    main()

将上述代码保存为 long_term_memory.py 并运行:

python3 long_term_memory.py

方式三:LangChain 生态集成

以旅行规划助手为例,演示在 LangChain 对话循环中集成 mem0 长期记忆的完整模式:每轮对话开始时先检索与用户问题相关的历史记忆并注入 prompt,生成回复后再将本轮对话内容写回 mem0,使记忆在对话过程中持续积累。多轮后可观察到 Agent 能主动引用早先提到的旅行偏好。

import os
import threading
import time
from typing import List

from langchain_core.messages import SystemMessage, HumanMessage, BaseMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI

from agentrun.memory_collection import MemoryCollection

# ── 配置 ──────────────────────────────────────────────────────────────────────

MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]
DASHSCOPE_API_KEY = os.environ["DASHSCOPE_API_KEY"]

# ── 初始化 LLM 和 mem0 客户端 ─────────────────────────────────────────────────

# 通过 DashScope OpenAI 兼容接口调用大模型
llm = ChatOpenAI(
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    api_key=DASHSCOPE_API_KEY,
    model="qwen-max",
)

# 初始化 mem0 客户端(自动从 AgentRun 控制平面读取 Tablestore 配置)
mem0 = MemoryCollection.to_mem0_memory(MEMORY_COLLECTION_NAME)

# ── Prompt 模板 ───────────────────────────────────────────────────────────────

prompt = ChatPromptTemplate.from_messages([
    SystemMessage(content="""你是一位专业的旅行规划助手,能够根据用户的偏好和历史对话提供个性化的旅行建议。
如果有相关记忆上下文,请在回答中加以参考,让建议更符合用户需求。"""),
    MessagesPlaceholder(variable_name="context"),
    ("human", "{input}"),
])


# ── 核心函数 ──────────────────────────────────────────────────────────────────

def _spinner(stop_event: threading.Event):
    """在等待 LLM 推理期间显示转圈动画。"""
    frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
    i = 0
    while not stop_event.is_set():
        print(f'\r助手: 正在思考... {frames[i % len(frames)]}', end='', flush=True)
        time.sleep(0.1)
        i += 1
    print('\r' + ' ' * 35 + '\r', end='', flush=True)


def retrieve_context(query: str, user_id: str) -> List[BaseMessage]:
    """从 mem0 检索与用户问题相关的记忆,作为 LangChain 消息列表注入 prompt。"""
    try:
        memories = mem0.search(query, user_id=user_id)
        memory_list = memories.get("results", [])
        if not memory_list:
            return []
        serialized = " ".join(m["memory"] for m in memory_list)
        print(f"  [mem0 检索到 {len(memory_list)} 条相关记忆]")
        return [SystemMessage(content=f"用户相关信息:{serialized}")]
    except Exception as e:
        print(f"  [mem0 检索失败: {e}]")
        return []


def generate_response(user_input: str, context: List[BaseMessage]) -> str:
    """携带记忆上下文调用 LLM 生成回复。"""
    chain = prompt | llm
    response = chain.invoke({"context": context, "input": user_input})
    return response.content


def save_interaction(user_id: str, user_input: str, assistant_response: str) -> int:
    """将本轮对话写回 mem0,供后续对话检索使用。返回新增记忆条数。"""
    interaction = [
        {"role": "user", "content": user_input},
        {"role": "assistant", "content": assistant_response},
    ]
    try:
        result = mem0.add(interaction, user_id=user_id, metadata={"source_app": "openmemory", "mcp_client": "python_sdk"})
        return len(result.get("results", []))
    except Exception as e:
        print(f"  [mem0 保存失败: {e}]")
        return 0


def chat_turn(user_input: str, user_id: str) -> str:
    """单轮对话:检索记忆 → 生成回复 → 保存记忆。"""
    context = retrieve_context(user_input, user_id)

    # 用一个 spinner 同时覆盖 LLM 推理和 mem0 保存,全部完成后再打印回复
    stop_event = threading.Event()
    spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True)
    spinner.start()

    response = generate_response(user_input, context)
    count = save_interaction(user_id, user_input, response)

    stop_event.set()
    spinner.join()
    print(f"助手: {response}\n")
    if count:
        print(f"  [mem0 新增 {count} 条记忆]\n")

    return response


# ── 主程序 ────────────────────────────────────────────────────────────────────

def main():
    print("═" * 60)
    print("旅行规划助手(LangChain + mem0 长期记忆)")
    print("═" * 60)
    print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。")
    print("输入 /quit 退出\n")

    user_id = "travel-user-demo"

    while True:
        try:
            user_input = input("你: ").strip()
        except (EOFError, KeyboardInterrupt):
            break
        if not user_input or user_input == "/quit":
            break
        response = chat_turn(user_input, user_id)


if __name__ == "__main__":
    main()

将上述代码保存为 langchain_memory.py 并运行:

python3 langchain_memory.py

会话状态

方式一:通过 Tablestore SDK 查询会话数据

AgentRun 将每次对话的会话元数据写入 session 表,消息内容写入 message 表。本示例直接扫描这两张表,列出最近的会话记录,并读取其中一个会话的完整对话内容。适用于数据分析、会话审计等只读场景。

import json
import os
from datetime import datetime

import tablestore as ots

# ── 配置 ──────────────────────────────────────────────────────────────────────

ENDPOINT = os.environ["TABLESTORE_ENDPOINT"]
INSTANCE = os.environ["TABLESTORE_INSTANCE"]
ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]
ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]

client = ots.OTSClient(ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET, INSTANCE)


# ── 查询函数 ──────────────────────────────────────────────────────────────────

def list_sessions(user_id: str = None, limit: int = 10) -> list[dict]:
    """查询 session 表中的会话元数据列表。"""
    if user_id:
        min_pk = [("user_id", user_id), ("session_id", ots.INF_MIN)]
        max_pk = [("user_id", user_id), ("session_id", ots.INF_MAX)]
    else:
        min_pk = [("user_id", ots.INF_MIN), ("session_id", ots.INF_MIN)]
        max_pk = [("user_id", ots.INF_MAX), ("session_id", ots.INF_MAX)]

    _, _, rows, _ = client.get_range(
        "session", ots.Direction.FORWARD,
        min_pk, max_pk,
        columns_to_get=[], max_version=1, limit=limit,
    )
    sessions = []
    for row in rows:
        pks = dict(row.primary_key)
        attrs = {c[0]: c[1] for c in row.attribute_columns}
        # update_time 单位为微秒
        update_time_us = int(attrs.get("update_time", 0))
        sessions.append({
            "user_id": pks["user_id"],
            "session_id": pks["session_id"],
            "agent_id": attrs.get("agent_id", ""),
            "update_time": datetime.fromtimestamp(update_time_us / 1e6).strftime("%Y-%m-%d %H:%M:%S"),
        })
    return sessions


def get_session_messages(session_id: str) -> list[dict]:
    """查询 message 表中指定会话的所有对话消息。"""
    min_pk = [("session_id", session_id), ("create_time", ots.INF_MIN), ("message_id", ots.INF_MIN)]
    max_pk = [("session_id", session_id), ("create_time", ots.INF_MAX), ("message_id", ots.INF_MAX)]

    _, _, rows, _ = client.get_range(
        "message", ots.Direction.FORWARD,
        min_pk, max_pk,
        columns_to_get=[], max_version=1,
    )
    messages = []
    for row in rows:
        attrs = {c[0]: c[1] for c in row.attribute_columns}
        if "content" in attrs:
            try:
                messages.extend(json.loads(attrs["content"]))
            except json.JSONDecodeError:
                pass
    return messages


# ── 主程序 ────────────────────────────────────────────────────────────────────

def main():
    print("═" * 55)
    print("AgentRun 会话状态 —— Tablestore 数据查询示例")
    print("═" * 55)

    # ── 1. 列出所有会话 ───────────────────────────────────────────────────────
    print("\n[会话列表]")
    sessions = list_sessions(limit=5)
    if not sessions:
        print("  (暂无会话记录)")
        return

    for s in sessions:
        print(f"  session_id : {s['session_id']}")
        print(f"  agent_id   : {s['agent_id']}")
        print(f"  更新时间   : {s['update_time']}")
        print()

    # ── 2. 读取最近一个会话的消息 ─────────────────────────────────────────────
    latest = sessions[-1]
    sid = latest["session_id"]
    print(f"\n[会话详情] session_id: {sid}")
    messages = get_session_messages(sid)

    if not messages:
        print("  (该会话暂无消息)")
    else:
        for msg in messages:
            role = msg.get("role", "?")
            content = msg.get("content", "")[:100]
            ellipsis = "..." if len(msg.get("content", "")) > 100 else ""
            print(f"  [{role}] {content}{ellipsis}")


if __name__ == "__main__":
    main()

将上述代码保存为 session_state.py 并运行:

python3 session_state.py

方式二:Google ADK 集成(OTSSessionService)

演示如何将 Tablestore 作为 Google ADK Agent 的会话状态后端。通过 OTSSessionService,ADK Agent 的每次对话都会自动持久化到 Tablestore,进程重启后可继续之前的会话,无需重新建立上下文。示例包含一个能查询天气的工具,用于验证 Agent 的工具调用和会话记录是否正常持久化。

from __future__ import annotations

import asyncio
import os
import sys
import threading
import time
from typing import Any

from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm
from google.adk.runners import Runner
from google.genai import types

from agentrun.conversation_service import SessionStore
from agentrun.conversation_service.adapters import OTSSessionService

# ── 配置 ──────────────────────────────────────────────────────────────────────

MEMORY_COLLECTION_NAME = os.environ.get("MEMORY_COLLECTION_NAME", "")
DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", "")

if not MEMORY_COLLECTION_NAME:
    print("ERROR: 请设置环境变量 MEMORY_COLLECTION_NAME")
    sys.exit(1)

APP_NAME = "adk-chat-demo"
USER_ID = "demo_user"
SESSION_FILE = ".adk_session_id"  # 本地持久化 session ID,删除此文件可开始新会话


# ── 工具定义 ──────────────────────────────────────────────────────────────────

def _spinner(stop_event: threading.Event):
    """在等待 Agent 推理期间显示转圈动画。"""
    frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
    i = 0
    while not stop_event.is_set():
        print(f'\rAgent: 正在思考... {frames[i % len(frames)]}', end='', flush=True)
        time.sleep(0.1)
        i += 1
    print('\r' + ' ' * 35 + '\r', end='', flush=True)

def get_weather(city: str) -> dict[str, Any]:
    """查询指定城市的天气信息。"""
    data = {
        "北京": {"weather": "晴", "temperature": "5~15°C"},
        "上海": {"weather": "多云", "temperature": "12~20°C"},
        "杭州": {"weather": "阴", "temperature": "10~18°C"},
    }
    return data.get(city, {"error": "暂无该城市数据"})


# ── Step 1: 初始化 SessionStore ───────────────────────────────────────────────

# SessionStore.from_memory_collection() 从 AgentRun 控制平面读取记忆存储配置,
# 自动获取 Tablestore 实例和 endpoint,无需手动配置 OTS 连接参数。
store = SessionStore.from_memory_collection(MEMORY_COLLECTION_NAME)
store.init_tables()
store.init_search_index()

# ── Step 2: 创建 OTSSessionService ───────────────────────────────────────────

session_service = OTSSessionService(session_store=store)

# ── Step 3: 创建 Agent + Runner ───────────────────────────────────────────────

# 通过 LiteLLM 调用 DashScope OpenAI 兼容接口
custom_model = LiteLlm(
    model="openai/qwen-max",
    api_key=DASHSCOPE_API_KEY,
    api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
)

agent = Agent(
    name="smart_assistant",
    model=custom_model,
    instruction="你是一个友好的中文智能助手,用户问天气时调用 get_weather 工具。",
    tools=[get_weather],
)

runner = Runner(
    agent=agent,
    app_name=APP_NAME,
    session_service=session_service,
)


# ── Step 4: 对话(自动持久化到 Tablestore) ───────────────────────────────────

async def chat(session_id: str, text: str) -> str:
    """发送消息并返回 Agent 回复。"""
    content = types.Content(
        role="user",
        parts=[types.Part(text=text)],
    )
    reply_parts: list[str] = []
    async for event in runner.run_async(
        user_id=USER_ID,
        session_id=session_id,
        new_message=content,
    ):
        if event.is_final_response() and event.content and event.content.parts:
            for part in event.content.parts:
                if part.text:
                    reply_parts.append(part.text)
    return "\n".join(reply_parts)


# ── 主程序 ────────────────────────────────────────────────────────────────────

async def main() -> None:
    # 从本地文件读取上次的 session ID,尝试恢复会话;找不到则创建新会话
    session = None
    if os.path.exists(SESSION_FILE):
        saved_id = open(SESSION_FILE).read().strip()
        session = await session_service.get_session(
            app_name=APP_NAME, user_id=USER_ID, session_id=saved_id
        )
        if session:
            print(f"继续之前的会话: {session.id}")

    if session is None:
        session = await session_service.create_session(
            app_name=APP_NAME,
            user_id=USER_ID,
            state={"user:language": "zh-CN"},
        )
        open(SESSION_FILE, "w").write(session.id)
        print(f"新会话已创建: {session.id}")

    print("输入 /quit 退出,/new 开始新会话\n")

    while True:
        try:
            user_input = input("你: ").strip()
        except (EOFError, KeyboardInterrupt):
            break
        if not user_input:
            continue
        if user_input == "/quit":
            break
        if user_input == "/new":
            session = await session_service.create_session(
                app_name=APP_NAME,
                user_id=USER_ID,
                state={"user:language": "zh-CN"},
            )
            open(SESSION_FILE, "w").write(session.id)
            print(f"新会话已创建: {session.id}\n")
            continue

        stop_event = threading.Event()
        spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True)
        spinner.start()

        reply = await chat(session.id, user_input)

        stop_event.set()
        spinner.join()
        print(f"Agent: {reply}\n")


if __name__ == "__main__":
    asyncio.run(main())

将上述代码保存为 adk_session.py 并运行:

python3 adk_session.py

管理记忆存储

查询记忆存储的配置详情,包括关联的 Tablestore 实例名称、向量集合、向量维度、嵌入模型和长期记忆提取模型,以及当前账号下的所有记忆存储列表。适用于排查配置问题或了解记忆存储的底层资源信息。

import os

from alibabacloud_agentrun20250910.client import Client
from alibabacloud_tea_openapi import models as open_api_models

# ── 配置 ──────────────────────────────────────────────────────────────────────

ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]
ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]
MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]
REGION = os.getenv("AGENTRUN_REGION", "cn-hangzhou")

config = open_api_models.Config(
    access_key_id=ACCESS_KEY_ID,
    access_key_secret=ACCESS_KEY_SECRET,
    endpoint=f"agentrun.{REGION}.aliyuncs.com",
)
client = Client(config)


# ── 工具函数 ──────────────────────────────────────────────────────────────────

def get_memory_collection(name: str) -> dict:
    """查询记忆存储的详细配置。"""
    resp = client.get_memory_collection(name)
    return resp.body.data.to_map()


def list_memory_collections() -> list:
    """列出当前账号下的所有记忆存储。"""
    from alibabacloud_agentrun20250910 import models as ar_models
    req = ar_models.ListMemoryCollectionsRequest()
    resp = client.list_memory_collections(req)

    return [item.to_map() for item in (resp.body.data.items or [])]



# ── 主程序 ────────────────────────────────────────────────────────────────────

def main():
    # ── 查询单个记忆存储 ──────────────────────────────────────────────────────
    print("═" * 55)
    print(f"查询记忆存储:{MEMORY_COLLECTION_NAME}")
    print("═" * 55)

    info = get_memory_collection(MEMORY_COLLECTION_NAME)

    print(f"名称:{info.get('memoryCollectionName')}")
    print(f"ID:{info.get('memoryCollectionId')}")
    print(f"会话历史:{'已启用' if info.get('enableConversationHistory') else '未启用'}")
    print(f"会话状态:{'已启用' if info.get('enableConversationState') else '未启用'}")

    vector_cfg = info.get("vectorStoreConfig", {})
    print(f"\n向量数据库:{vector_cfg.get('provider')}")
    vector_detail = vector_cfg.get("config", {})
    print(f"  实例:{vector_detail.get('instanceName')}")
    print(f"  集合:{vector_detail.get('collectionName')}")
    print(f"  向量维度:{vector_detail.get('vectorDimension')}")

    embedder_cfg = info.get("embedderConfig", {})
    embedder_detail = embedder_cfg.get("config", {})
    print(f"\n嵌入模型:{embedder_detail.get('model')}")
    print(f"模型服务:{embedder_cfg.get('modelServiceName')}")

    llm_cfg = info.get("llmConfig", {})
    llm_detail = llm_cfg.get("config", {})
    print(f"\n长期记忆提取模型:{llm_detail.get('model')}")
    print(f"模型服务:{llm_cfg.get('modelServiceName')}")

    # ── 列出所有记忆存储 ──────────────────────────────────────────────────────
    print("\n" + "═" * 55)
    print("当前账号下的所有记忆存储")
    print("═" * 55)

    collections = list_memory_collections()
    if not collections:
        print("(无)")
    for c in collections:
        status = c.get("status", "")
        print(f"  - {c.get('memoryCollectionName')}  [{status}]")


if __name__ == "__main__":
    main()

将上述代码保存为 manage_memory.py 并运行:

python3 manage_memory.py

相关文档