AgentScope Runtime 支持使用阿里云表格存储(Tablestore)作为 Memory Service 和 Session History Service 的持久化后端,为 AI Agent 应用提供高性能、高可用的会话历史和长期记忆存储能力。

核心功能
服务 | 功能 | 适用场景 |
TablestoreMemoryService | 长期记忆存储,支持全文检索 | 用户偏好、知识库、跨会话记忆 |
TablestoreSessionHistoryService | 会话历史存储 | 对话上下文、消息记录 |
准备工作
在开始使用 Tablestore 存储服务之前,需要完成以下环境配置和依赖安装。
环境准备:确保已安装Python运行环境,可通过
python3 --version命令查看版本信息。下载项目:下载 AgentScope Runtime 1.0.5 Release 包agentscope-runtime.zip并解压,解压后进入项目目录。
重要Tablestore 集成功能仅在 AgentScope Runtime 1.0.5 及之前的版本中可用,请确保使用指定版本的 Release 包。
cd agentscope-runtime创建虚拟环境:创建 Python 虚拟环境并激活。
python3 -m venv .venv && source .venv/bin/activate安装依赖:安装项目核心依赖 + Tablestore 扩展。
pip3 install -e . && pip3 install tablestore-for-agent-memory langchain-community配置环境变量:设置以下必需的环境变量,用于连接 Tablestore 和调用大模型。
环境变量
说明
TABLESTORE_ENDPOINTTablestore实例访问地址,可在表格存储控制台获取
TABLESTORE_INSTANCE_NAMETablestore实例名称
TABLESTORE_ACCESS_KEY_ID阿里云账号或RAM用户的AccessKey ID
TABLESTORE_ACCESS_KEY_SECRET阿里云账号或RAM用户的AccessKey Secret
DASHSCOPE_API_KEY百炼平台API Key(用于调用大模型)
export TABLESTORE_ENDPOINT="https://<instance>.<region-id>.ots.aliyuncs.com" export TABLESTORE_INSTANCE_NAME="<instance-name>" export TABLESTORE_ACCESS_KEY_ID="<your-access-key-id>" export TABLESTORE_ACCESS_KEY_SECRET="<your-access-key-secret>" export DASHSCOPE_API_KEY="<your-dashscope-api-key>"
示例代码:Memory
Memory Service 用于管理 AI Agent 的长期记忆,支持跨会话的信息存储和语义检索。
添加长期记忆
以下示例演示如何将用户信息作为长期记忆写入 Tablestore。将代码保存到 examples/tablestore_memory_add.py 文件。
import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
Message,
MessageType,
Role,
TextContent,
ContentType,
)
async def main():
endpoint = os.getenv("TABLESTORE_ENDPOINT")
instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
memory_service = TablestoreMemoryService(
tablestore_client=client,
table_name="agentscope_runtime_memory",
)
await memory_service.start()
print("Memory Service 启动成功")
user_id = "user_001"
session_id = "session_001"
messages = [
Message(
type=MessageType.MESSAGE,
role=Role.USER,
content=[TextContent(type=ContentType.TEXT, text="我喜欢吃川菜")],
),
Message(
type=MessageType.MESSAGE,
role=Role.USER,
content=[TextContent(type=ContentType.TEXT, text="我的生日是 3 月 15 日")],
),
]
print(f"准备写入 {len(messages)} 条记忆:")
for msg in messages:
print(f" - [{msg.role}] {msg.get_text_content()}")
await memory_service.add_memory(
user_id=user_id,
messages=messages,
session_id=session_id,
)
print(f"写入成功, user_id={user_id}, session_id={session_id}")
await memory_service.stop()
if __name__ == "__main__":
asyncio.run(main())运行示例:
python examples/tablestore_memory_add.py搜索长期记忆
以下示例演示如何通过语义检索查询用户的长期记忆。将代码保存到 examples/tablestore_memory_search.py 文件。
数据写入后,多元索引需要几秒钟完成同步。若使用后续示例代码查询不到数据,需等待多元索引同步完成。
import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
Message,
MessageType,
Role,
TextContent,
ContentType,
)
async def main():
endpoint = os.getenv("TABLESTORE_ENDPOINT")
instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
memory_service = TablestoreMemoryService(
tablestore_client=client,
table_name="agentscope_runtime_memory",
)
await memory_service.start()
print("Memory Service 启动成功")
user_id = "user_001"
query = [
Message(
type=MessageType.MESSAGE,
role=Role.USER,
content=[TextContent(type=ContentType.TEXT, text="我喜欢什么口味的菜")],
)
]
print(f"搜索查询: {query[0].get_text_content()}")
results = await memory_service.search_memory(user_id=user_id, messages=query)
print(f"搜索结果, 找到 {len(results)} 条相关记忆:")
for mem in results:
print(f" - [{mem.role}] {mem.get_text_content()}")
all_memories = await memory_service.list_memory(user_id=user_id)
print(f"用户 {user_id} 共有 {len(all_memories)} 条长期记忆")
await memory_service.stop()
if __name__ == "__main__":
asyncio.run(main())运行示例:
python examples/tablestore_memory_search.py示例代码:Session
Session History Service 用于管理会话和消息记录,保存对话上下文。
创建会话并添加消息
以下示例演示如何创建会话并写入对话消息。将代码保存到 examples/tablestore_session_create.py 文件。
import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
Message,
MessageType,
Role,
TextContent,
ContentType,
)
async def main():
endpoint = os.getenv("TABLESTORE_ENDPOINT")
instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
session_service = TablestoreSessionHistoryService(
tablestore_client=client,
session_table_name="agentscope_runtime_session",
message_table_name="agentscope_runtime_message",
)
await session_service.start()
print("Session History Service 启动成功")
user_id = "user_001"
session_id = "session_003"
session = await session_service.create_session(
user_id=user_id,
session_id=session_id,
)
print(f"创建会话成功: session_id={session.id}, user_id={user_id}")
user_msg = Message(
type=MessageType.MESSAGE,
role=Role.USER,
content=[TextContent(type=ContentType.TEXT, text="你好,我叫张三")],
)
await session_service.append_message(session, user_msg)
print(f"写入消息: [{user_msg.role}] {user_msg.get_text_content()}")
assistant_msg = Message(
type=MessageType.MESSAGE,
role=Role.ASSISTANT,
content=[TextContent(type=ContentType.TEXT, text="你好张三!有什么可以帮助你的?")],
)
await session_service.append_message(session, assistant_msg)
print(f"写入消息: [{assistant_msg.role}] {assistant_msg.get_text_content()}")
print(f"会话 {session_id} 已写入 2 条消息")
await session_service.stop()
if __name__ == "__main__":
asyncio.run(main())运行示例:
python examples/tablestore_session_create.py获取会话历史
以下示例演示如何获取指定会话的历史消息记录。将代码保存到 examples/tablestore_session_get.py 文件。
数据写入后,多元索引需要几秒钟完成同步。若使用后续示例代码查询不到数据,需等待多元索引同步完成。
import os
import asyncio
from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
async def main():
endpoint = os.getenv("TABLESTORE_ENDPOINT")
instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")
client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)
session_service = TablestoreSessionHistoryService(
tablestore_client=client,
session_table_name="agentscope_runtime_session",
message_table_name="agentscope_runtime_message",
)
await session_service.start()
print("Session History Service 启动成功")
user_id = "user_001"
session_id = "session_003"
session = await session_service.get_session(user_id, session_id)
print(f"获取会话 {session_id}, 共 {len(session.messages)} 条消息:")
for msg in session.messages:
text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
role = msg.role if hasattr(msg, "role") else "unknown"
print(f" - [{role}] {text}")
sessions = await session_service.list_sessions(user_id=user_id)
print(f"用户 {user_id} 共有 {len(sessions)} 个会话")
await session_service.stop()
if __name__ == "__main__":
asyncio.run(main())运行示例:
python examples/tablestore_session_get.py示例代码:交互式聊天 Demo
以下示例展示如何构建一个完整的交互式聊天应用,集成大模型调用、会话历史存储和长期记忆检索。将代码保存到 examples/tablestore_chat_demo.py 文件。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 本示例展示如何构建一个交互式聊天 Demo,使用 Tablestore 存储会话历史和长期记忆
# 使用前请配置环境变量:
# TABLESTORE_ENDPOINT, TABLESTORE_INSTANCE_NAME,
# TABLESTORE_ACCESS_KEY_ID, TABLESTORE_ACCESS_KEY_SECRET, DASHSCOPE_API_KEY
import os
import asyncio
import uuid
import sys
import io
import logging
from tablestore import AsyncOTSClient
from dashscope import Generation
from rich.console import Console
from rich.live import Live
from rich.spinner import Spinner
console = Console()
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
Message,
MessageType,
Role,
TextContent,
ContentType,
)
memory_service = None
session_service = None
current_session = None
current_session_id = None
user_id = "cli_user"
chat_history = []
# Table and index names
MEMORY_TABLE = "chat_demo_memory"
SESSION_TABLE = "chat_demo_session"
MESSAGE_TABLE = "chat_demo_message"
# Memory service uses a fixed index name in SDK
MEMORY_SEARCH_INDEX = "agentscope_runtime_knowledge_search_index_name"
# Session service uses custom index names to avoid conflicts with other demos
SESSION_SECONDARY_INDEX = f"{SESSION_TABLE}_secondary_index"
SESSION_SEARCH_INDEX = f"{SESSION_TABLE}_search_index"
MESSAGE_SECONDARY_INDEX = f"{MESSAGE_TABLE}_secondary_index"
MESSAGE_SEARCH_INDEX = f"{MESSAGE_TABLE}_search_index"
async def delete_table_with_indexes(client, table_name):
"""Delete a table and all its indexes using native Tablestore SDK."""
import tablestore
try:
# Delete search indexes first
try:
search_indexes = await client.list_search_index(table_name)
for index_tuple in search_indexes:
await client.delete_search_index(index_tuple[0], index_tuple[1])
except Exception:
pass
# Delete the table
await client.delete_table(table_name)
print(f" Deleted table: {table_name}")
except tablestore.OTSServiceError as e:
if "OTSObjectNotExist" not in str(e):
print(f" Warning: Failed to delete {table_name}: {e}")
async def init_services():
global memory_service, session_service
client = AsyncOTSClient(
os.getenv("TABLESTORE_ENDPOINT"),
os.getenv("TABLESTORE_ACCESS_KEY_ID"),
os.getenv("TABLESTORE_ACCESS_KEY_SECRET"),
os.getenv("TABLESTORE_INSTANCE_NAME"),
)
# Check existing tables using native SDK
existing_tables = await client.list_table()
memory_exists = MEMORY_TABLE in existing_tables
session_existing = [t for t in [SESSION_TABLE, MESSAGE_TABLE] if t in existing_tables]
session_all_exist = len(session_existing) == 2
session_none_exist = len(session_existing) == 0
# Handle incomplete session tables - delete existing ones and recreate
if not session_all_exist and not session_none_exist:
print(f"Incomplete session tables detected: {session_existing}")
print("Cleaning up...")
for t in session_existing:
await delete_table_with_indexes(client, t)
# Wait for deletion to complete
await asyncio.sleep(3)
session_all_exist = False
# Create service instances
memory_service = TablestoreMemoryService(
tablestore_client=client,
table_name=MEMORY_TABLE,
)
session_service = TablestoreSessionHistoryService(
tablestore_client=client,
session_table_name=SESSION_TABLE,
message_table_name=MESSAGE_TABLE,
)
# Suppress SDK logs during init
logging.getLogger("tablestore_for_agent_memory").setLevel(logging.CRITICAL)
original_stderr = sys.stderr
sys.stderr = io.StringIO()
try:
# Init memory service
if memory_exists:
_init_memory_service_without_create(memory_service)
else:
await memory_service.start()
# Init session service - always use custom init to avoid index name conflicts
if session_all_exist:
_init_session_service_without_create(session_service)
else:
# Create tables and indexes using SDK with custom index names
await _create_session_tables(client)
_init_session_service_without_create(session_service)
finally:
sys.stderr = original_stderr
logging.getLogger("tablestore_for_agent_memory").setLevel(logging.WARNING)
print("Services initialized")
async def _create_session_tables(client):
"""Create session and message tables with custom index names."""
from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore
# Create the memory store with custom index names
store = AsyncMemoryStore(
tablestore_client=client,
session_table_name=SESSION_TABLE,
message_table_name=MESSAGE_TABLE,
session_secondary_index_name=SESSION_SECONDARY_INDEX,
session_search_index_name=SESSION_SEARCH_INDEX,
message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
message_search_index_name=MESSAGE_SEARCH_INDEX,
)
# Init tables and indexes
await store.init_table()
# Wait for table creation
await asyncio.sleep(1)
def _init_memory_service_without_create(service):
"""Initialize memory service store without creating tables."""
from tablestore_for_agent_memory.knowledge.async_knowledge_store import AsyncKnowledgeStore
service._knowledge_store = AsyncKnowledgeStore(
tablestore_client=service._tablestore_client,
vector_dimension=service._vector_dimension,
enable_multi_tenant=False,
table_name=service._table_name,
search_index_name=MEMORY_SEARCH_INDEX,
search_index_schema=service._search_index_schema,
text_field=service._text_field,
embedding_field=service._embedding_field,
vector_metric_type=service._vector_metric_type,
)
def _init_session_service_without_create(service):
"""Initialize session service store without creating tables."""
from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore
service._memory_store = AsyncMemoryStore(
tablestore_client=service._tablestore_client,
session_table_name=SESSION_TABLE,
message_table_name=MESSAGE_TABLE,
session_secondary_index_name=SESSION_SECONDARY_INDEX,
session_search_index_name=SESSION_SEARCH_INDEX,
message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
message_search_index_name=MESSAGE_SEARCH_INDEX,
session_secondary_index_meta=service._session_secondary_index_meta,
session_search_index_schema=service._session_search_index_schema,
message_search_index_schema=service._message_search_index_schema,
)
async def create_new_session():
global current_session, current_session_id, chat_history
current_session_id = f"session_{uuid.uuid4().hex[:8]}"
current_session = await session_service.create_session(
user_id=user_id,
session_id=current_session_id,
)
chat_history = []
print(f"New session created: {current_session_id}")
def create_message(role: str, text: str) -> Message:
return Message(
type=MessageType.MESSAGE,
role=role,
content=[TextContent(type=ContentType.TEXT, text=text)],
)
async def save_message(role: str, text: str):
if current_session is None:
return
msg = create_message(role, text)
await session_service.append_message(current_session, msg)
async def search_memory(query: str) -> str:
msg = create_message(Role.USER, query)
results = await memory_service.search_memory(user_id=user_id, messages=[msg])
if not results:
return ""
texts = [r.get_text_content() for r in results[:3] if r.get_text_content()]
return "\n".join([f" - {t}" for t in texts])
async def add_memory(text: str):
msg = create_message(Role.USER, text)
await memory_service.add_memory(
user_id=user_id,
messages=[msg],
session_id=current_session_id,
)
print(f"Memory saved: {text}")
def call_llm(messages: list, memory_context: str = "") -> str:
system_prompt = "You are a helpful AI assistant."
if memory_context:
system_prompt += f"\n\nUser's historical memory:\n{memory_context}"
dashscope_messages = [{"role": "system", "content": system_prompt}]
for role, content in messages:
dashscope_messages.append({"role": role, "content": content})
response = Generation.call(
model="qwen-turbo",
messages=dashscope_messages,
api_key=os.getenv("DASHSCOPE_API_KEY"),
result_format="message",
)
if response.status_code == 200:
return response.output.choices[0].message.content
return f"Error: {response.code} - {response.message}"
async def chat(user_input: str):
global chat_history
memory_context = await search_memory(user_input)
if memory_context:
print(f"[Memory] Found related memories:\n{memory_context}")
messages = chat_history + [("user", user_input)]
with Live(Spinner("dots", text="Thinking...", style="cyan"), console=console, refresh_per_second=10):
response = call_llm(messages, memory_context)
await save_message(Role.USER, user_input)
await save_message(Role.ASSISTANT, response)
chat_history.append(("user", user_input))
chat_history.append(("assistant", response))
console.print(f"\n[bold green]Assistant:[/bold green] {response}\n")
async def list_sessions():
sessions = await session_service.list_sessions(user_id=user_id)
print(f"Total {len(sessions)} sessions:")
for s in sessions[:10]:
print(f" - {s.id}")
async def list_memories():
memories = await memory_service.list_memory(user_id=user_id)
print(f"Total {len(memories)} memories:")
for m in memories[:10]:
text = m.get_text_content()
if text:
print(f" - {text[:60]}...")
async def load_session(session_id: str):
global current_session, current_session_id, chat_history
current_session_id = session_id
current_session = await session_service.get_session(user_id, session_id)
chat_history = []
for msg in current_session.messages:
text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
role = msg.role if hasattr(msg, "role") else "unknown"
chat_history.append((role, text))
print(f"Loaded session: {session_id}, {len(current_session.messages)} messages")
def print_help():
print("""
Commands:
/new - Create new session
/sessions - List all sessions
/load <id> - Load session by ID
/remember <x> - Save memory
/memories - List all memories
/history - Show current chat history
/help - Show this help
/quit - Exit
Otherwise, just type your message to chat.
""")
async def main():
await init_services()
await create_new_session()
print_help()
while True:
try:
user_input = input("You: ").strip()
except (EOFError, KeyboardInterrupt):
print("\nBye!")
break
if not user_input:
continue
if user_input == "/quit":
print("Bye!")
break
elif user_input == "/help":
print_help()
elif user_input == "/new":
await create_new_session()
elif user_input == "/sessions":
await list_sessions()
elif user_input == "/memories":
await list_memories()
elif user_input == "/history":
print(f"Current session: {current_session_id}")
for role, text in chat_history:
print(f" [{role}] {text[:80]}...")
elif user_input.startswith("/load "):
session_id = user_input[6:].strip()
await load_session(session_id)
elif user_input.startswith("/remember "):
text = user_input[10:].strip()
await add_memory(text)
else:
await chat(user_input)
await memory_service.stop()
await session_service.stop()
if __name__ == "__main__":
asyncio.run(main())
运行示例:
python examples/tablestore_chat_demo.py交互命令:
命令
功能
/new创建新会话
/sessions列出所有会话
/load <id>加载指定会话
/remember <内容>保存长期记忆
/memories列出所有记忆
/history显示当前会话历史
/help显示帮助信息
/quit退出