AgentScope集成长记忆服务
AgentScope是阿里巴巴开源的多智能体平台,提供了灵活的Agent构建能力;AnalyticDB for PostgreSQL长记忆引擎,能够从对话中自动提取用户的关键事实并通过向量相似度进行语义检索。
本文介绍如何将AgentScope的ReActAgent与AnalyticDB for PostgreSQL长记忆服务对接,构建具有跨会话长期记忆能力的AI Agent。
应用场景
个性化助手:Agent自动记住用户偏好(如旅行习惯、饮食喜好),后续对话中提供个性化建议。
客服系统:跨会话记住客户历史问题和处理结果,避免用户重复描述。
教育辅导:记录学生的知识薄弱点和学习进度,持续提供针对性辅导。
前提条件
已安装Python 3.10及以上版本。
已获取阿里云百炼API Key,用于Agent对话推理。
已获取AnalyticDB for PostgreSQL长记忆服务的URL和API Key。
产品架构
组件说明
组件 | 作用 |
AgentScope ReActAgent | AI Agent推理框架,通过“思考→行动→观察”循环自主决策。 |
阿里云百炼 qwen-plus | 大语言模型,用于Agent对话推理。 |
AnalyticDB for PostgreSQL长记忆服务 | 远程记忆管理,自动完成事实提取、向量化、存储与检索。 |
本方案无需在客户端部署向量数据库或Embedding模型。事实提取、向量化、语义检索均由AnalyticDB for PostgreSQL长记忆服务完成,客户端仅通过REST API进行交互。
操作步骤
步骤一:安装依赖
创建项目目录和Python虚拟环境。
mkdir -p agentscope-adbpg-demo && cd agentscope-adbpg-demo python3 -m venv .venv source .venv/bin/activate安装依赖包。
pip install agentscope python-dotenv重要agentscope版本应该为2.0以下。
步骤二:配置环境变量
创建.env文件,配置以下环境变量。
DASHSCOPE_API_KEY=<your-dashscope-api-key>
ADBPG_BASE_URL=https://api-longmemory-cn-chengdu.opentrust.net
ADBPG_API_KEY=<your-adbpg-longmemory-api-key>环境变量 | 说明 |
DASHSCOPE_API_KEY | 阿里云百炼API Key,用于Agent对话推理(qwen-plus模型) |
ADBPG_BASE_URL | AnalyticDB for PostgreSQL长记忆服务地址。 |
ADBPG_API_KEY | AnalyticDB for PostgreSQL长记忆服务的API Key。 |
步骤三:编写长期记忆适配模块
创建adbpg_long_term_memory.py文件。该模块继承AgentScope的LongTermMemoryBase基类,通过REST API对接AnalyticDB for PostgreSQL长记忆服务。
# -*- coding: utf-8 -*-
"""对接 AnalyticDB for PostgreSQL 长记忆服务的长期记忆实现。"""
from typing import Any
import aiohttp
from agentscope.memory._long_term_memory._long_term_memory_base import (
LongTermMemoryBase,
)
from agentscope.message import Msg, TextBlock
from agentscope.tool import ToolResponse
class ADBPGLongTermMemory(LongTermMemoryBase):
"""通过 REST API 对接 AnalyticDB for PostgreSQL 长记忆服务。"""
def __init__(
self,
base_url: str,
api_key: str,
user_name: str,
agent_name: str | None = None,
) -> None:
super().__init__()
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.user_id = user_name
self.agent_id = agent_name
def _headers(self) -> dict:
return {
"Authorization": f"Token {self.api_key}",
"Content-Type": "application/json",
}
async def _post(self, path: str, data: dict) -> dict:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.base_url}{path}", json=data, headers=self._headers()
) as resp:
resp.raise_for_status()
return await resp.json()
async def record(self, msgs: list[Msg | None], **kwargs: Any) -> Any:
messages = [
{"role": m.role, "content": str(m.content)} for m in msgs if m
]
data = {"messages": messages, "user_id": self.user_id}
if self.agent_id:
data["agent_id"] = self.agent_id
return await self._post("/v3/memories/add/", data)
async def retrieve(
self, msg: Msg | list[Msg] | None, limit: int = 5, **kwargs: Any
) -> str:
if isinstance(msg, Msg):
msg = [msg]
query = " ".join(m.get_text_content() for m in msg if m)
data = {
"query": query,
"filters": {"user_id": self.user_id},
"top_k": limit,
}
if self.agent_id:
data["filters"]["agent_id"] = self.agent_id
result = await self._post("/v3/memories/search/", data)
return "\n".join(
r.get("memory", "") for r in result.get("results", [])
)
async def record_to_memory(
self, thinking: str, content: list[str], **kwargs: Any
) -> ToolResponse:
text = "\n".join(([thinking] if thinking else []) + content)
data = {
"messages": [{"role": "user", "content": text}],
"user_id": self.user_id,
}
if self.agent_id:
data["agent_id"] = self.agent_id
result = await self._post("/v3/memories/add/", data)
return ToolResponse(
content=[TextBlock(type="text", text=f"已记录: {result}")]
)
async def retrieve_from_memory(
self, keywords: list[str], limit: int = 5, **kwargs: Any
) -> ToolResponse:
results = []
for kw in keywords:
data = {
"query": kw,
"filters": {"user_id": self.user_id},
"top_k": limit,
}
if self.agent_id:
data["filters"]["agent_id"] = self.agent_id
r = await self._post("/v3/memories/search/", data)
results.extend(
item.get("memory", "") for item in r.get("results", [])
)
return ToolResponse(
content=[
TextBlock(
type="text",
text="\n".join(results) or "未找到相关记忆",
)
]
)参数说明
参数 | 说明 |
base_url | AnalyticDB for PostgreSQL长记忆服务地址。 |
api_key | 服务API Key,通过 |
user_name | 用户标识,用于隔离不同用户的记忆。 |
agent_name | Agent标识,用于区分不同Agent的记忆(可选) |
API接口说明
操作 | HTTP接口 | 说明 |
记录记忆 | POST /v3/memories/add/ | 发送对话消息,服务端自动提取事实并存储。 |
检索记忆 | POST /v3/memories/search/ | 基于查询文本进行语义相似度检索。 |
步骤四:编写Demo代码
创建memory_example.py文件。
# -*- coding: utf-8 -*-
"""AgentScope + AnalyticDB for PostgreSQL 长记忆服务 Demo"""
import asyncio
import os
from dotenv import load_dotenv
from adbpg_long_term_memory import ADBPGLongTermMemory
from agentscope.agent import ReActAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.message import Msg
from agentscope.model import DashScopeChatModel
from agentscope.tool import Toolkit
load_dotenv()
async def main() -> None:
"""运行记忆示例。"""
# ── 1. 初始化长期记忆 ─────────────────────────────────────
long_term_memory = ADBPGLongTermMemory(
base_url=os.environ["ADBPG_BASE_URL"],
api_key=os.environ["ADBPG_API_KEY"],
user_name="user_123",
agent_name="Friday",
)
print("=== AgentScope + ADBPG 长记忆 Demo ===\n")
# ── 2. 基础记忆操作:记录与检索 ────────────────────────────
print("1. 记录对话到长期记忆")
print("-" * 40)
results = await long_term_memory.record(
msgs=[
Msg(role="user", content="帮我订酒店,最好是民宿", name="user"),
],
)
print(f"记录结果: {results}\n")
print("2. 从长期记忆中检索")
print("-" * 40)
memories = await long_term_memory.retrieve(
msg=[
Msg(role="user", content="我喜欢住什么类型的酒店?", name="user"),
],
)
print(f"检索结果: {memories}\n")
# ── 3. ReActAgent 集成长期记忆 ─────────────────────────────
print("3. ReActAgent + 长期记忆")
print("-" * 40)
agent = ReActAgent(
name="Friday",
sys_prompt=(
"你是一个名叫 Friday 的助手。"
"如果你认为有关于用户偏好的重要信息,"
"请使用 `record_to_memory` 工具记录到长期记忆。"
"如果需要从长期记忆中检索信息,"
"请使用 `retrieve_from_memory` 工具。"
),
model=DashScopeChatModel(
model_name="qwen-plus",
api_key=os.environ["DASHSCOPE_API_KEY"],
stream=False,
),
formatter=DashScopeChatFormatter(),
toolkit=Toolkit(),
memory=InMemoryMemory(),
long_term_memory=long_term_memory,
long_term_memory_mode="both",
)
await agent.memory.clear()
# 对话 1:告知偏好 → Agent 自动记录
msg = Msg(
role="user",
content="我去杭州旅游的时候,喜欢住民宿",
name="user",
)
response = await agent(msg)
print(f"Agent 回复: {response.get_text_content()}\n")
# 对话 2:询问偏好 → Agent 自动检索
msg = Msg(role="user", content="我有什么偏好?", name="user")
response = await agent(msg)
print(f"Agent 回复: {response.get_text_content()}\n")
# 对话 3:补充偏好 → Agent 继续记录
msg = Msg(role="user", content="我喜欢去西湖游玩", name="user")
response = await agent(msg)
print(f"Agent 回复: {response.get_text_content()}\n")
if __name__ == "__main__":
asyncio.run(main())long_term_memory_mode参数说明
模式 | 说明 |
agent_control | Agent通过工具调用自主管理长期记忆,自行决定何时记录和检索。 |
static_control | 每次对话开始时自动从长期记忆检索相关信息,注入到上下文中。 |
both | 同时启用上述两种模式(推荐) |
步骤五:运行Demo
python memory_example.py预期输出:
=== AgentScope + ADBPG 长记忆 Demo ===
1. 记录对话到长期记忆
----------------------------------------
记录结果: {'results': [{'message': 'Memory processing has been queued for background execution', 'status': 'PENDING', ...}]}
2. 从长期记忆中检索
----------------------------------------
检索结果: 喜欢住民宿
3. ReActAgent + 长期记忆
----------------------------------------
Agent 回复: 已记录:您去杭州旅游时喜欢住民宿。如有其他偏好或需要规划行程,欢迎随时告诉我!
Agent 回复: 根据您的长期记忆,您有一个明确的偏好:
去杭州旅游时喜欢住民宿。
Agent 回复: 已记录:您喜欢去西湖游玩
结合之前的信息,您的杭州旅行偏好已有两条:
- 喜欢住民宿
- 喜欢去西湖游玩AnalyticDB for PostgreSQL长记忆服务采用异步处理机制,记录操作返回PENDING状态表示记忆已入队等待后台处理(通常几秒内完成)。首次运行时步骤2的检索可能为空,第二次运行即可检索到已记录的记忆。
工作原理
当用户发送消息时,ReActAgent执行以下推理循环:
记录流程
Agent接收用户消息,进入ReAct推理循环。
Agent分析用户意图,判断消息中包含偏好等关键信息。
Agent调用
record_to_memory工具,发送HTTP请求到长记忆服务。服务端自动从文本中提取结构化事实,向量化后存入数据库。
Agent收到确认结果,生成最终回复。
检索流程
Agent接收用户提问,进入ReAct推理循环。
Agent判断需要从长期记忆中获取信息。
Agent调用
retrieve_from_memory工具,发送语义检索请求。服务端将查询向量化,执行近似最近邻搜索,返回相关记忆。
Agent根据检索结果生成回复。
项目文件结构
agentscope-adbpg-demo/
├── .venv/ # Python 虚拟环境
├── .env # 环境变量配置
├── adbpg_long_term_memory.py # 长记忆适配模块
└── memory_example.py # Demo 主程序