AgentRun 通过集成表格存储(Tablestore),为智能体提供三种持久化记忆能力:在同一对话中维持上下文的会话历史、跨会话保留用户偏好等结构化信息的长期记忆,以及可直接读写的会话状态。本文介绍如何创建并配置记忆存储,并通过可运行的代码示例演示三种记忆类型的使用方式。
记忆类型
类型 | 作用 | 触发方式 | 跨会话 | Tablestore 表 |
会话历史 | 记录同一 session 内的完整对话轮次,维持多轮对话上下文 | 每次请求自动写入,传入相同 | 否 |
|
长期记忆 | 从对话中提炼用户偏好等结构化信息,以向量形式持久化,让 Agent 随使用逐渐了解用户 | Agent 调用 MCP 工具自动提取并存储(需开启 MCP) | 是 |
|
会话状态 | 存储每次对话的元数据,如任务进度、用户设置等 | 仅支持通过 SDK 显式读写,AgentRun控制台快速创建的Agent不支持会话状态 | 否 |
|
快速使用
步骤一:创建记忆存储
登录函数计算控制台,在左侧菜单栏单击,进入AgentRun控制台。
重要如果进入AgentRun控制台后,右上角有“体验新版”字样,单击切换到新版控制台。
在左侧菜单栏单击记忆,然后单击添加记忆存储,填写相关配置项:
配置项
说明
名称
必填,系统自动生成(如
mem-UiH9),可自定义向量数据库
默认自动配置(推荐),自动创建并关联 Tablestore 向量数据库
长期记忆
默认开启,需选择大语言模型服务、向量模型服务和执行角色
会话历史
默认开启
会话状态
默认开启
网络配置
默认允许公网访问;也可配置 VPC 访问;至少选择一种
单击开始部署,等待记忆存储部署完成。
步骤二:创建 Agent 并体验记忆功能
在左侧菜单栏单击Agent运行时,然后单击创建Agent,选择快速创建,填写相关配置项:
配置项
说明
Agent名称
必填,系统自动生成(如
agent-quick-n31r8),可自定义模型服务
选择已配置的模型,如
qwen3.5-plus系统提示词
可选择模板,如“智能问答助手”
添加步骤一创建的记忆存储
说明使用长期记忆功能时,需按页面提示开启MCP,Agent 才能自动提取和读取长期记忆。
访问凭证
默认匿名访问,可选择已有凭证或创建新凭证
单击创建Agent,等待 Agent 启动完成。
在模型对比测试窗口中:
输入“我叫小明,最近在学习向量数据库”,观察 Agent 回复
说明函数计算遵循Serverless(无服务器)架构,只有在请求到达时才创建实例,并能及时释放实例帮助用户节省成本。因此,首次提问时,会有较长的等待时间。
在同一对话中继续提问“我叫什么?”,验证会话历史(Agent 能记住本轮对话内容)
新建会话,提问“我叫什么?”,验证长期记忆(Agent 能跨会话记住用户信息)
代码集成
环境准备
pip3 install alibabacloud_agentrun20250910 openai tablestore agentrun-sdk agentrun-mem0ai google-adk litellm langchain-openai配置环境变量(请勿将密钥硬编码在代码中):
# 阿里云账号凭据
export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"
export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"
# Agent调用配置(在控制台Agent运行时 > 详情 > 概览 > 访问信息中获取)
export AGENT_ENDPOINT="https://{ACCOUNT_ID}.agentrun-data.{REGION}.aliyuncs.com/agent-runtimes/{AGENT_NAME}/endpoints/Default/invocations"
# Agent调用凭证(在控制台凭证管理中获取)
export AGENT_API_KEY="your-agent-api-key"
# 记忆存储名称(步骤一创建后获取)
export MEMORY_COLLECTION_NAME="mem-xxxx"
# DashScope API Key(LangChain 和 Google ADK 示例使用)
export DASHSCOPE_API_KEY="your-dashscope-api-key"
# Tablestore 配置(会话状态示例使用)
export TABLESTORE_ENDPOINT="https://{INSTANCE}.{REGION}.ots.aliyuncs.com"
export TABLESTORE_INSTANCE="your-instance-name"会话历史
三轮对话测试:第 1 轮告知姓名和职业,第 2 轮直接问"我是谁",验证 Agent 是否记住了上文;第 3 轮继续追问,验证完整上下文是否贯穿始终。每次运行生成新的 session_id,互不干扰。
import os
import re
import threading
import time
import uuid
from openai import OpenAI
# ── 配置 ──────────────────────────────────────────────────────────────────────
AGENT_ENDPOINT = os.environ["AGENT_ENDPOINT"]
AGENT_API_KEY = os.environ["AGENT_API_KEY"]
MODEL = os.getenv("MODEL", "qwen3.5-plus")
# 从 Endpoint URL 解析账号 ID 和 Agent 名称。
# Endpoint 格式:https://{ACCOUNT_ID}.agentrun-data.{REGION}.aliyuncs.com/agent-runtimes/{AGENT_NAME}/endpoints/Default/invocations
# 注意:这里使用账号 ID 仅适合单用户演示场景。在多用户应用中,
# user_id 应代表终端用户(如应用内的用户 ID),而非开发者账号。
_account_match = re.match(r"https://(\d+)\.agentrun-data\.", AGENT_ENDPOINT)
USER_ID = _account_match.group(1) if _account_match else "default_user"
_agent_match = re.search(r"/agent-runtimes/([^/]+)/endpoints/", AGENT_ENDPOINT)
AGENT_ID = _agent_match.group(1) if _agent_match else "default_agent"
client = OpenAI(
base_url=f"{AGENT_ENDPOINT}/openai/v1",
api_key=AGENT_API_KEY,
)
# ── 对话函数 ──────────────────────────────────────────────────────────────────
def _spinner(stop_event: threading.Event):
"""在等待首个 Token 期间显示转圈动画。"""
frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
i = 0
while not stop_event.is_set():
print(f'\r助手: 正在思考... {frames[i % len(frames)]}', end='', flush=True)
time.sleep(0.1)
i += 1
print('\r' + ' ' * 35 + '\r', end='', flush=True)
def chat(session_id: str, history: list, message: str) -> str:
"""向 Agent 发送消息,通过 session_id 和消息历史维持会话上下文。
history 为本轮之前的对话历史,格式为
[{"role": "user", "content": "..."}, {"role": "assistant", "content": "..."}, ...]
只保留 user/assistant 的文本内容(不含 tool_calls),历史消息与本轮消息
一起发送,Agent 可从上下文中直接读取之前的对话内容。
session_id 通过请求头 X-AgentRun-Conversation-ID 传递,AgentRun 用其
关联 Tablestore 中的会话记录(用 extra_body 传递无效,服务端不读请求体)。
"""
print(f"用户: {message}")
# 启动转圈动画(覆盖冷启动和 MCP 工具调用的等待时间)
stop_event = threading.Event()
spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True)
spinner.start()
stream = client.chat.completions.create(
model=MODEL,
messages=history + [{"role": "user", "content": message}],
stream=True,
extra_headers={
"X-AgentRun-Conversation-ID": session_id,
"X-AgentRun-User-ID": USER_ID,
"X-AgentRun-Agent-ID": AGENT_ID,
},
)
full_reply = []
first_token = True
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
if first_token:
stop_event.set()
spinner.join()
print("助手: ", end="", flush=True)
first_token = False
print(token, end="", flush=True)
full_reply.append(token)
stop_event.set()
spinner.join()
print("\n")
return "".join(full_reply)
# ── 主程序 ────────────────────────────────────────────────────────────────────
def main():
# 每次运行生成新的 session_id,避免与上次运行的历史混淆。
# 在实际应用中,session_id 通常由应用层管理(如绑定用户的对话 ID)。
session_id = str(uuid.uuid4())
history = [] # 维护对话历史(只含 user/assistant 文本消息)
print("═" * 50)
print("示例:多轮对话(会话历史)")
print("═" * 50)
print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。")
# 第一轮:自我介绍
print("\n[第 1 轮]")
q1 = "我叫小明,我是一名数据工程师,最近在研究 Tablestore"
r1 = chat(session_id, history, q1)
history.extend([
{"role": "user", "content": q1},
{"role": "assistant", "content": r1},
])
# 第二轮:测试 Agent 是否记住了第一轮的内容
print("[第 2 轮]")
q2 = "我叫什么名字?我从事什么工作?我最近在研究什么?"
r2 = chat(session_id, history, q2)
history.extend([
{"role": "user", "content": q2},
{"role": "assistant", "content": r2},
])
# 第三轮:继续深入话题
print("[第 3 轮]")
q3 = "针对我的工作,你有什么 Tablestore 使用建议?"
chat(session_id, history, q3)
if __name__ == "__main__":
main()推荐使用流式模式(stream=True):Agent 在处理消息时会调用 MCP 记忆工具(读写 Tablestore),流式模式可避免客户端因等待 MCP 工具调用而超时。
将上述代码保存为 conversation_history.py 并运行:
python3 conversation_history.py长期记忆
方式一:通过MCP集成
在记忆详情的中启动服务配置,启动后获取 MCP 服务 URL(SSE),配置到支持 MCP 工具调用的 Agent 中,即可自动提取、更新、删除长期记忆。
通过控制台使用Agent运行时,可在创建Agent配置记忆时开启MCP(未提示表示已开启),Agent会在每轮对话中自动调用 MCP 工具检索和更新长期记忆。对话中出现可提取的用户信息时(如:“我喜欢喝咖啡”),Agent 会自动存入向量库供后续跨 session 使用。
方式二:直接操作 mem0 向量存储
与方式一不同,方式二需要由代码显式调用 memory.add() 写入记忆——mem0 不会自动监听对话,也不会自动判断"哪些内容值得记录"。这种方式适合在自定义 Agent 框架或数据管道中,由业务逻辑精确控制记忆的写入时机和内容。
本示例写入一段用户描述,mem0 内部调用 LLM 将其拆解为多条结构化记忆(如“名叫 Alice”、“喜欢喝咖啡”),然后用三个不同角度的语义查询验证检索效果。
import os
import time
from agentrun.memory_collection import MemoryCollection
# ── 配置 ──────────────────────────────────────────────────────────────────────
MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]
# agentrun-sdk 自动从以下环境变量读取凭证(无需在代码中显式传入):
# ALIBABA_CLOUD_ACCESS_KEY_ID / ALIBABA_CLOUD_ACCESS_KEY_SECRET
# AGENTRUN_REGION(默认 cn-hangzhou)
# ── 初始化 mem0 客户端 ────────────────────────────────────────────────────────
# to_mem0_memory() 自动完成两件事:
# 1. 从 AgentRun 控制平面读取记忆存储配置(Tablestore 实例、集合、向量维度等)
# 2. 使用读取到的配置初始化 agentrun-mem0ai Memory 客户端
memory = MemoryCollection.to_mem0_memory(MEMORY_COLLECTION_NAME)
# ── 核心操作 ──────────────────────────────────────────────────────────────────
def add_memories(user_id: str):
"""向记忆存储中添加用户信息。
memory.add() 支持两种输入:
- str:直接写入一段描述文本,mem0 内部会自动提炼为结构化记忆
- list[dict]:完整的对话消息列表(role/content),mem0 从中提取用户偏好
本示例使用文本输入演示基本用法。
"""
print(f"\n[添加记忆] user_id={user_id}")
result = memory.add(
"我叫 Alice,是一名 Python 工程师,平时喜欢喝咖啡,最近在研究向量数据库。",
user_id=user_id,
metadata={"source_app": "openmemory", "mcp_client": "python_sdk"},
)
results = result.get("results", [])
print(f" 写入 {len(results)} 条记忆:")
for i, res in enumerate(results, 1):
print(f" {i}. [{res.get('event')}] {res.get('memory', '')}")
def search_memories(user_id: str, query: str):
"""从记忆存储中检索与查询语义最相关的记忆。"""
print(f"\n[检索记忆] 查询: {query!r}")
results = memory.search(query, user_id=user_id)
hits = results.get("results", [])
if not hits:
print(" (未找到相关记忆)")
return
for i, hit in enumerate(hits, 1):
score = hit.get("score", 0)
content = hit.get("memory", "")
print(f" {i}. [{score:.4f}] {content}")
# ── 主程序 ────────────────────────────────────────────────────────────────────
def main():
print("═" * 55)
print("长期记忆示例 —— 直接操作 mem0 向量存储")
print("═" * 55)
print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。")
user_id = "alice-demo"
# 写入记忆
add_memories(user_id)
# 等待 Tablestore 向量索引刷新(首次写入后搜索需要短暂延迟)
print("\n等待向量索引刷新...")
time.sleep(3)
# 语义检索:不同的查询角度
print()
print("──" * 27)
search_memories(user_id, "她喜欢什么饮料?")
search_memories(user_id, "她是做什么工作的?")
search_memories(user_id, "她最近在研究什么技术?")
if __name__ == "__main__":
main()将上述代码保存为 long_term_memory.py 并运行:
python3 long_term_memory.py方式三:LangChain 生态集成
以旅行规划助手为例,演示在 LangChain 对话循环中集成 mem0 长期记忆的完整模式:每轮对话开始时先检索与用户问题相关的历史记忆并注入 prompt,生成回复后再将本轮对话内容写回 mem0,使记忆在对话过程中持续积累。多轮后可观察到 Agent 能主动引用早先提到的旅行偏好。
import os
import threading
import time
from typing import List
from langchain_core.messages import SystemMessage, HumanMessage, BaseMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from agentrun.memory_collection import MemoryCollection
# ── 配置 ──────────────────────────────────────────────────────────────────────
MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]
DASHSCOPE_API_KEY = os.environ["DASHSCOPE_API_KEY"]
# ── 初始化 LLM 和 mem0 客户端 ─────────────────────────────────────────────────
# 通过 DashScope OpenAI 兼容接口调用大模型
llm = ChatOpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key=DASHSCOPE_API_KEY,
model="qwen-max",
)
# 初始化 mem0 客户端(自动从 AgentRun 控制平面读取 Tablestore 配置)
mem0 = MemoryCollection.to_mem0_memory(MEMORY_COLLECTION_NAME)
# ── Prompt 模板 ───────────────────────────────────────────────────────────────
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content="""你是一位专业的旅行规划助手,能够根据用户的偏好和历史对话提供个性化的旅行建议。
如果有相关记忆上下文,请在回答中加以参考,让建议更符合用户需求。"""),
MessagesPlaceholder(variable_name="context"),
("human", "{input}"),
])
# ── 核心函数 ──────────────────────────────────────────────────────────────────
def _spinner(stop_event: threading.Event):
"""在等待 LLM 推理期间显示转圈动画。"""
frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
i = 0
while not stop_event.is_set():
print(f'\r助手: 正在思考... {frames[i % len(frames)]}', end='', flush=True)
time.sleep(0.1)
i += 1
print('\r' + ' ' * 35 + '\r', end='', flush=True)
def retrieve_context(query: str, user_id: str) -> List[BaseMessage]:
"""从 mem0 检索与用户问题相关的记忆,作为 LangChain 消息列表注入 prompt。"""
try:
memories = mem0.search(query, user_id=user_id)
memory_list = memories.get("results", [])
if not memory_list:
return []
serialized = " ".join(m["memory"] for m in memory_list)
print(f" [mem0 检索到 {len(memory_list)} 条相关记忆]")
return [SystemMessage(content=f"用户相关信息:{serialized}")]
except Exception as e:
print(f" [mem0 检索失败: {e}]")
return []
def generate_response(user_input: str, context: List[BaseMessage]) -> str:
"""携带记忆上下文调用 LLM 生成回复。"""
chain = prompt | llm
response = chain.invoke({"context": context, "input": user_input})
return response.content
def save_interaction(user_id: str, user_input: str, assistant_response: str) -> int:
"""将本轮对话写回 mem0,供后续对话检索使用。返回新增记忆条数。"""
interaction = [
{"role": "user", "content": user_input},
{"role": "assistant", "content": assistant_response},
]
try:
result = mem0.add(interaction, user_id=user_id, metadata={"source_app": "openmemory", "mcp_client": "python_sdk"})
return len(result.get("results", []))
except Exception as e:
print(f" [mem0 保存失败: {e}]")
return 0
def chat_turn(user_input: str, user_id: str) -> str:
"""单轮对话:检索记忆 → 生成回复 → 保存记忆。"""
context = retrieve_context(user_input, user_id)
# 用一个 spinner 同时覆盖 LLM 推理和 mem0 保存,全部完成后再打印回复
stop_event = threading.Event()
spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True)
spinner.start()
response = generate_response(user_input, context)
count = save_interaction(user_id, user_input, response)
stop_event.set()
spinner.join()
print(f"助手: {response}\n")
if count:
print(f" [mem0 新增 {count} 条记忆]\n")
return response
# ── 主程序 ────────────────────────────────────────────────────────────────────
def main():
print("═" * 60)
print("旅行规划助手(LangChain + mem0 长期记忆)")
print("═" * 60)
print("提示:首次请求函数计算需要创建实例(冷启动),可能需要较长等待时间,请耐心等待。")
print("输入 /quit 退出\n")
user_id = "travel-user-demo"
while True:
try:
user_input = input("你: ").strip()
except (EOFError, KeyboardInterrupt):
break
if not user_input or user_input == "/quit":
break
response = chat_turn(user_input, user_id)
if __name__ == "__main__":
main()将上述代码保存为 langchain_memory.py 并运行:
python3 langchain_memory.py会话状态
方式一:通过 Tablestore SDK 查询会话数据
AgentRun 将每次对话的会话元数据写入 session 表,消息内容写入 message 表。本示例直接扫描这两张表,列出最近的会话记录,并读取其中一个会话的完整对话内容。适用于数据分析、会话审计等只读场景。
import json
import os
from datetime import datetime
import tablestore as ots
# ── 配置 ──────────────────────────────────────────────────────────────────────
ENDPOINT = os.environ["TABLESTORE_ENDPOINT"]
INSTANCE = os.environ["TABLESTORE_INSTANCE"]
ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]
ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]
client = ots.OTSClient(ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET, INSTANCE)
# ── 查询函数 ──────────────────────────────────────────────────────────────────
def list_sessions(user_id: str = None, limit: int = 10) -> list[dict]:
"""查询 session 表中的会话元数据列表。"""
if user_id:
min_pk = [("user_id", user_id), ("session_id", ots.INF_MIN)]
max_pk = [("user_id", user_id), ("session_id", ots.INF_MAX)]
else:
min_pk = [("user_id", ots.INF_MIN), ("session_id", ots.INF_MIN)]
max_pk = [("user_id", ots.INF_MAX), ("session_id", ots.INF_MAX)]
_, _, rows, _ = client.get_range(
"session", ots.Direction.FORWARD,
min_pk, max_pk,
columns_to_get=[], max_version=1, limit=limit,
)
sessions = []
for row in rows:
pks = dict(row.primary_key)
attrs = {c[0]: c[1] for c in row.attribute_columns}
# update_time 单位为微秒
update_time_us = int(attrs.get("update_time", 0))
sessions.append({
"user_id": pks["user_id"],
"session_id": pks["session_id"],
"agent_id": attrs.get("agent_id", ""),
"update_time": datetime.fromtimestamp(update_time_us / 1e6).strftime("%Y-%m-%d %H:%M:%S"),
})
return sessions
def get_session_messages(session_id: str) -> list[dict]:
"""查询 message 表中指定会话的所有对话消息。"""
min_pk = [("session_id", session_id), ("create_time", ots.INF_MIN), ("message_id", ots.INF_MIN)]
max_pk = [("session_id", session_id), ("create_time", ots.INF_MAX), ("message_id", ots.INF_MAX)]
_, _, rows, _ = client.get_range(
"message", ots.Direction.FORWARD,
min_pk, max_pk,
columns_to_get=[], max_version=1,
)
messages = []
for row in rows:
attrs = {c[0]: c[1] for c in row.attribute_columns}
if "content" in attrs:
try:
messages.extend(json.loads(attrs["content"]))
except json.JSONDecodeError:
pass
return messages
# ── 主程序 ────────────────────────────────────────────────────────────────────
def main():
print("═" * 55)
print("AgentRun 会话状态 —— Tablestore 数据查询示例")
print("═" * 55)
# ── 1. 列出所有会话 ───────────────────────────────────────────────────────
print("\n[会话列表]")
sessions = list_sessions(limit=5)
if not sessions:
print(" (暂无会话记录)")
return
for s in sessions:
print(f" session_id : {s['session_id']}")
print(f" agent_id : {s['agent_id']}")
print(f" 更新时间 : {s['update_time']}")
print()
# ── 2. 读取最近一个会话的消息 ─────────────────────────────────────────────
latest = sessions[-1]
sid = latest["session_id"]
print(f"\n[会话详情] session_id: {sid}")
messages = get_session_messages(sid)
if not messages:
print(" (该会话暂无消息)")
else:
for msg in messages:
role = msg.get("role", "?")
content = msg.get("content", "")[:100]
ellipsis = "..." if len(msg.get("content", "")) > 100 else ""
print(f" [{role}] {content}{ellipsis}")
if __name__ == "__main__":
main()将上述代码保存为 session_state.py 并运行:
python3 session_state.py方式二:Google ADK 集成(OTSSessionService)
演示如何将 Tablestore 作为 Google ADK Agent 的会话状态后端。通过 OTSSessionService,ADK Agent 的每次对话都会自动持久化到 Tablestore,进程重启后可继续之前的会话,无需重新建立上下文。示例包含一个能查询天气的工具,用于验证 Agent 的工具调用和会话记录是否正常持久化。
from __future__ import annotations
import asyncio
import os
import sys
import threading
import time
from typing import Any
from google.adk.agents import Agent
from google.adk.models.lite_llm import LiteLlm
from google.adk.runners import Runner
from google.genai import types
from agentrun.conversation_service import SessionStore
from agentrun.conversation_service.adapters import OTSSessionService
# ── 配置 ──────────────────────────────────────────────────────────────────────
MEMORY_COLLECTION_NAME = os.environ.get("MEMORY_COLLECTION_NAME", "")
DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", "")
if not MEMORY_COLLECTION_NAME:
print("ERROR: 请设置环境变量 MEMORY_COLLECTION_NAME")
sys.exit(1)
APP_NAME = "adk-chat-demo"
USER_ID = "demo_user"
SESSION_FILE = ".adk_session_id" # 本地持久化 session ID,删除此文件可开始新会话
# ── 工具定义 ──────────────────────────────────────────────────────────────────
def _spinner(stop_event: threading.Event):
"""在等待 Agent 推理期间显示转圈动画。"""
frames = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏']
i = 0
while not stop_event.is_set():
print(f'\rAgent: 正在思考... {frames[i % len(frames)]}', end='', flush=True)
time.sleep(0.1)
i += 1
print('\r' + ' ' * 35 + '\r', end='', flush=True)
def get_weather(city: str) -> dict[str, Any]:
"""查询指定城市的天气信息。"""
data = {
"北京": {"weather": "晴", "temperature": "5~15°C"},
"上海": {"weather": "多云", "temperature": "12~20°C"},
"杭州": {"weather": "阴", "temperature": "10~18°C"},
}
return data.get(city, {"error": "暂无该城市数据"})
# ── Step 1: 初始化 SessionStore ───────────────────────────────────────────────
# SessionStore.from_memory_collection() 从 AgentRun 控制平面读取记忆存储配置,
# 自动获取 Tablestore 实例和 endpoint,无需手动配置 OTS 连接参数。
store = SessionStore.from_memory_collection(MEMORY_COLLECTION_NAME)
store.init_tables()
store.init_search_index()
# ── Step 2: 创建 OTSSessionService ───────────────────────────────────────────
session_service = OTSSessionService(session_store=store)
# ── Step 3: 创建 Agent + Runner ───────────────────────────────────────────────
# 通过 LiteLLM 调用 DashScope OpenAI 兼容接口
custom_model = LiteLlm(
model="openai/qwen-max",
api_key=DASHSCOPE_API_KEY,
api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
agent = Agent(
name="smart_assistant",
model=custom_model,
instruction="你是一个友好的中文智能助手,用户问天气时调用 get_weather 工具。",
tools=[get_weather],
)
runner = Runner(
agent=agent,
app_name=APP_NAME,
session_service=session_service,
)
# ── Step 4: 对话(自动持久化到 Tablestore) ───────────────────────────────────
async def chat(session_id: str, text: str) -> str:
"""发送消息并返回 Agent 回复。"""
content = types.Content(
role="user",
parts=[types.Part(text=text)],
)
reply_parts: list[str] = []
async for event in runner.run_async(
user_id=USER_ID,
session_id=session_id,
new_message=content,
):
if event.is_final_response() and event.content and event.content.parts:
for part in event.content.parts:
if part.text:
reply_parts.append(part.text)
return "\n".join(reply_parts)
# ── 主程序 ────────────────────────────────────────────────────────────────────
async def main() -> None:
# 从本地文件读取上次的 session ID,尝试恢复会话;找不到则创建新会话
session = None
if os.path.exists(SESSION_FILE):
saved_id = open(SESSION_FILE).read().strip()
session = await session_service.get_session(
app_name=APP_NAME, user_id=USER_ID, session_id=saved_id
)
if session:
print(f"继续之前的会话: {session.id}")
if session is None:
session = await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
state={"user:language": "zh-CN"},
)
open(SESSION_FILE, "w").write(session.id)
print(f"新会话已创建: {session.id}")
print("输入 /quit 退出,/new 开始新会话\n")
while True:
try:
user_input = input("你: ").strip()
except (EOFError, KeyboardInterrupt):
break
if not user_input:
continue
if user_input == "/quit":
break
if user_input == "/new":
session = await session_service.create_session(
app_name=APP_NAME,
user_id=USER_ID,
state={"user:language": "zh-CN"},
)
open(SESSION_FILE, "w").write(session.id)
print(f"新会话已创建: {session.id}\n")
continue
stop_event = threading.Event()
spinner = threading.Thread(target=_spinner, args=(stop_event,), daemon=True)
spinner.start()
reply = await chat(session.id, user_input)
stop_event.set()
spinner.join()
print(f"Agent: {reply}\n")
if __name__ == "__main__":
asyncio.run(main())将上述代码保存为 adk_session.py 并运行:
python3 adk_session.py管理记忆存储
查询记忆存储的配置详情,包括关联的 Tablestore 实例名称、向量集合、向量维度、嵌入模型和长期记忆提取模型,以及当前账号下的所有记忆存储列表。适用于排查配置问题或了解记忆存储的底层资源信息。
import os
from alibabacloud_agentrun20250910.client import Client
from alibabacloud_tea_openapi import models as open_api_models
# ── 配置 ──────────────────────────────────────────────────────────────────────
ACCESS_KEY_ID = os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"]
ACCESS_KEY_SECRET = os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"]
MEMORY_COLLECTION_NAME = os.environ["MEMORY_COLLECTION_NAME"]
REGION = os.getenv("AGENTRUN_REGION", "cn-hangzhou")
config = open_api_models.Config(
access_key_id=ACCESS_KEY_ID,
access_key_secret=ACCESS_KEY_SECRET,
endpoint=f"agentrun.{REGION}.aliyuncs.com",
)
client = Client(config)
# ── 工具函数 ──────────────────────────────────────────────────────────────────
def get_memory_collection(name: str) -> dict:
"""查询记忆存储的详细配置。"""
resp = client.get_memory_collection(name)
return resp.body.data.to_map()
def list_memory_collections() -> list:
"""列出当前账号下的所有记忆存储。"""
from alibabacloud_agentrun20250910 import models as ar_models
req = ar_models.ListMemoryCollectionsRequest()
resp = client.list_memory_collections(req)
return [item.to_map() for item in (resp.body.data.items or [])]
# ── 主程序 ────────────────────────────────────────────────────────────────────
def main():
# ── 查询单个记忆存储 ──────────────────────────────────────────────────────
print("═" * 55)
print(f"查询记忆存储:{MEMORY_COLLECTION_NAME}")
print("═" * 55)
info = get_memory_collection(MEMORY_COLLECTION_NAME)
print(f"名称:{info.get('memoryCollectionName')}")
print(f"ID:{info.get('memoryCollectionId')}")
print(f"会话历史:{'已启用' if info.get('enableConversationHistory') else '未启用'}")
print(f"会话状态:{'已启用' if info.get('enableConversationState') else '未启用'}")
vector_cfg = info.get("vectorStoreConfig", {})
print(f"\n向量数据库:{vector_cfg.get('provider')}")
vector_detail = vector_cfg.get("config", {})
print(f" 实例:{vector_detail.get('instanceName')}")
print(f" 集合:{vector_detail.get('collectionName')}")
print(f" 向量维度:{vector_detail.get('vectorDimension')}")
embedder_cfg = info.get("embedderConfig", {})
embedder_detail = embedder_cfg.get("config", {})
print(f"\n嵌入模型:{embedder_detail.get('model')}")
print(f"模型服务:{embedder_cfg.get('modelServiceName')}")
llm_cfg = info.get("llmConfig", {})
llm_detail = llm_cfg.get("config", {})
print(f"\n长期记忆提取模型:{llm_detail.get('model')}")
print(f"模型服务:{llm_cfg.get('modelServiceName')}")
# ── 列出所有记忆存储 ──────────────────────────────────────────────────────
print("\n" + "═" * 55)
print("当前账号下的所有记忆存储")
print("═" * 55)
collections = list_memory_collections()
if not collections:
print("(无)")
for c in collections:
status = c.get("status", "")
print(f" - {c.get('memoryCollectionName')} [{status}]")
if __name__ == "__main__":
main()将上述代码保存为 manage_memory.py 并运行:
python3 manage_memory.py