AgentScope Runtime集成Tablestore

更新时间:
复制为 MD 格式

AgentScope Runtime 支持使用阿里云表格存储(Tablestore)作为 Memory Service 和 Session History Service 的持久化后端,为 AI Agent 应用提供高性能、高可用的会话历史和长期记忆存储能力。

2026-01-30_17-03-06 (1)

核心功能

服务

功能

适用场景

TablestoreMemoryService

长期记忆存储,支持全文检索

用户偏好、知识库、跨会话记忆

TablestoreSessionHistoryService

会话历史存储

对话上下文、消息记录

准备工作

在开始使用 Tablestore 存储服务之前,需要完成以下环境配置和依赖安装。

  1. 环境准备:确保已安装Python运行环境,可通过python3 --version命令查看版本信息。

  2. 下载项目:下载 AgentScope Runtime 1.0.5 Release 包agentscope-runtime.zip并解压,解压后进入项目目录。

    重要

    Tablestore 集成功能仅在 AgentScope Runtime 1.0.5 及之前的版本中可用,请确保使用指定版本的 Release 包。

    cd agentscope-runtime
  3. 创建虚拟环境:创建 Python 虚拟环境并激活。

    python3 -m venv .venv && source .venv/bin/activate
  4. 安装依赖:安装项目核心依赖 + Tablestore 扩展。

    pip3 install -e . && pip3 install tablestore-for-agent-memory langchain-community
  5. 配置环境变量:设置以下必需的环境变量,用于连接 Tablestore 和调用大模型。

    环境变量

    说明

    TABLESTORE_ENDPOINT

    Tablestore实例访问地址,可在表格存储控制台获取

    TABLESTORE_INSTANCE_NAME

    Tablestore实例名称

    TABLESTORE_ACCESS_KEY_ID

    阿里云账号或RAM用户的AccessKey ID

    TABLESTORE_ACCESS_KEY_SECRET

    阿里云账号或RAM用户的AccessKey Secret

    DASHSCOPE_API_KEY

    百炼平台API Key(用于调用大模型)

    export TABLESTORE_ENDPOINT="https://<instance>.<region-id>.ots.aliyuncs.com"
    export TABLESTORE_INSTANCE_NAME="<instance-name>"
    export TABLESTORE_ACCESS_KEY_ID="<your-access-key-id>"
    export TABLESTORE_ACCESS_KEY_SECRET="<your-access-key-secret>"
    export DASHSCOPE_API_KEY="<your-dashscope-api-key>"

示例代码:Memory

Memory Service 用于管理 AI Agent 的长期记忆,支持跨会话的信息存储和语义检索。

添加长期记忆

以下示例演示如何将用户信息作为长期记忆写入 Tablestore。将代码保存到 examples/tablestore_memory_add.py 文件。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name="agentscope_runtime_memory",
    )
    await memory_service.start()
    print("Memory Service 启动成功")

    user_id = "user_001"
    session_id = "session_001"

    messages = [
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="我喜欢吃川菜")],
        ),
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="我的生日是 3 月 15 日")],
        ),
    ]

    print(f"准备写入 {len(messages)} 条记忆:")
    for msg in messages:
        print(f"  - [{msg.role}] {msg.get_text_content()}")

    await memory_service.add_memory(
        user_id=user_id,
        messages=messages,
        session_id=session_id,
    )
    print(f"写入成功, user_id={user_id}, session_id={session_id}")

    await memory_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

运行示例:

python examples/tablestore_memory_add.py

搜索长期记忆

以下示例演示如何通过语义检索查询用户的长期记忆。将代码保存到 examples/tablestore_memory_search.py 文件。

说明

数据写入后,多元索引需要几秒钟完成同步。若使用后续示例代码查询不到数据,需等待多元索引同步完成。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name="agentscope_runtime_memory",
    )
    await memory_service.start()
    print("Memory Service 启动成功")

    user_id = "user_001"

    query = [
        Message(
            type=MessageType.MESSAGE,
            role=Role.USER,
            content=[TextContent(type=ContentType.TEXT, text="我喜欢什么口味的菜")],
        )
    ]
    print(f"搜索查询: {query[0].get_text_content()}")

    results = await memory_service.search_memory(user_id=user_id, messages=query)
    print(f"搜索结果, 找到 {len(results)} 条相关记忆:")
    for mem in results:
        print(f"  - [{mem.role}] {mem.get_text_content()}")

    all_memories = await memory_service.list_memory(user_id=user_id)
    print(f"用户 {user_id} 共有 {len(all_memories)} 条长期记忆")

    await memory_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

运行示例:

python examples/tablestore_memory_search.py

示例代码:Session

Session History Service 用于管理会话和消息记录,保存对话上下文。

创建会话并添加消息

以下示例演示如何创建会话并写入对话消息。将代码保存到 examples/tablestore_session_create.py 文件。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name="agentscope_runtime_session",
        message_table_name="agentscope_runtime_message",
    )
    await session_service.start()
    print("Session History Service 启动成功")

    user_id = "user_001"
    session_id = "session_003"

    session = await session_service.create_session(
        user_id=user_id,
        session_id=session_id,
    )
    print(f"创建会话成功: session_id={session.id}, user_id={user_id}")

    user_msg = Message(
        type=MessageType.MESSAGE,
        role=Role.USER,
        content=[TextContent(type=ContentType.TEXT, text="你好,我叫张三")],
    )
    await session_service.append_message(session, user_msg)
    print(f"写入消息: [{user_msg.role}] {user_msg.get_text_content()}")

    assistant_msg = Message(
        type=MessageType.MESSAGE,
        role=Role.ASSISTANT,
        content=[TextContent(type=ContentType.TEXT, text="你好张三!有什么可以帮助你的?")],
    )
    await session_service.append_message(session, assistant_msg)
    print(f"写入消息: [{assistant_msg.role}] {assistant_msg.get_text_content()}")

    print(f"会话 {session_id} 已写入 2 条消息")

    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

运行示例:

python examples/tablestore_session_create.py

获取会话历史

以下示例演示如何获取指定会话的历史消息记录。将代码保存到 examples/tablestore_session_get.py 文件。

说明

数据写入后,多元索引需要几秒钟完成同步。若使用后续示例代码查询不到数据,需等待多元索引同步完成。

import os
import asyncio

from tablestore import AsyncOTSClient
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService


async def main():
    endpoint = os.getenv("TABLESTORE_ENDPOINT")
    instance_name = os.getenv("TABLESTORE_INSTANCE_NAME")
    access_key_id = os.getenv("TABLESTORE_ACCESS_KEY_ID")
    access_key_secret = os.getenv("TABLESTORE_ACCESS_KEY_SECRET")

    client = AsyncOTSClient(endpoint, access_key_id, access_key_secret, instance_name)

    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name="agentscope_runtime_session",
        message_table_name="agentscope_runtime_message",
    )
    await session_service.start()
    print("Session History Service 启动成功")

    user_id = "user_001"
    session_id = "session_003"

    session = await session_service.get_session(user_id, session_id)
    print(f"获取会话 {session_id}, 共 {len(session.messages)} 条消息:")
    for msg in session.messages:
        text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
        role = msg.role if hasattr(msg, "role") else "unknown"
        print(f"  - [{role}] {text}")

    sessions = await session_service.list_sessions(user_id=user_id)
    print(f"用户 {user_id} 共有 {len(sessions)} 个会话")

    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())

运行示例:

python examples/tablestore_session_get.py

示例代码:交互式聊天 Demo

以下示例展示如何构建一个完整的交互式聊天应用,集成大模型调用、会话历史存储和长期记忆检索。将代码保存到 examples/tablestore_chat_demo.py 文件。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 本示例展示如何构建一个交互式聊天 Demo,使用 Tablestore 存储会话历史和长期记忆
# 使用前请配置环境变量:
#   TABLESTORE_ENDPOINT, TABLESTORE_INSTANCE_NAME,
#   TABLESTORE_ACCESS_KEY_ID, TABLESTORE_ACCESS_KEY_SECRET, DASHSCOPE_API_KEY

import os
import asyncio
import uuid
import sys
import io
import logging

from tablestore import AsyncOTSClient
from dashscope import Generation
from rich.console import Console
from rich.live import Live
from rich.spinner import Spinner

console = Console()

from agentscope_runtime.engine.services.memory import TablestoreMemoryService
from agentscope_runtime.engine.services.session_history import TablestoreSessionHistoryService
from agentscope_runtime.engine.schemas.agent_schemas import (
    Message,
    MessageType,
    Role,
    TextContent,
    ContentType,
)

memory_service = None
session_service = None
current_session = None
current_session_id = None
user_id = "cli_user"
chat_history = []

# Table and index names
MEMORY_TABLE = "chat_demo_memory"
SESSION_TABLE = "chat_demo_session"
MESSAGE_TABLE = "chat_demo_message"
# Memory service uses a fixed index name in SDK
MEMORY_SEARCH_INDEX = "agentscope_runtime_knowledge_search_index_name"
# Session service uses custom index names to avoid conflicts with other demos
SESSION_SECONDARY_INDEX = f"{SESSION_TABLE}_secondary_index"
SESSION_SEARCH_INDEX = f"{SESSION_TABLE}_search_index"
MESSAGE_SECONDARY_INDEX = f"{MESSAGE_TABLE}_secondary_index"
MESSAGE_SEARCH_INDEX = f"{MESSAGE_TABLE}_search_index"


async def delete_table_with_indexes(client, table_name):
    """Delete a table and all its indexes using native Tablestore SDK."""
    import tablestore

    try:
        # Delete search indexes first
        try:
            search_indexes = await client.list_search_index(table_name)
            for index_tuple in search_indexes:
                await client.delete_search_index(index_tuple[0], index_tuple[1])
        except Exception:
            pass

        # Delete the table
        await client.delete_table(table_name)
        print(f"  Deleted table: {table_name}")
    except tablestore.OTSServiceError as e:
        if "OTSObjectNotExist" not in str(e):
            print(f"  Warning: Failed to delete {table_name}: {e}")


async def init_services():
    global memory_service, session_service

    client = AsyncOTSClient(
        os.getenv("TABLESTORE_ENDPOINT"),
        os.getenv("TABLESTORE_ACCESS_KEY_ID"),
        os.getenv("TABLESTORE_ACCESS_KEY_SECRET"),
        os.getenv("TABLESTORE_INSTANCE_NAME"),
    )

    # Check existing tables using native SDK
    existing_tables = await client.list_table()

    memory_exists = MEMORY_TABLE in existing_tables
    session_existing = [t for t in [SESSION_TABLE, MESSAGE_TABLE] if t in existing_tables]
    session_all_exist = len(session_existing) == 2
    session_none_exist = len(session_existing) == 0

    # Handle incomplete session tables - delete existing ones and recreate
    if not session_all_exist and not session_none_exist:
        print(f"Incomplete session tables detected: {session_existing}")
        print("Cleaning up...")
        for t in session_existing:
            await delete_table_with_indexes(client, t)
        # Wait for deletion to complete
        await asyncio.sleep(3)
        session_all_exist = False

    # Create service instances
    memory_service = TablestoreMemoryService(
        tablestore_client=client,
        table_name=MEMORY_TABLE,
    )
    session_service = TablestoreSessionHistoryService(
        tablestore_client=client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
    )

    # Suppress SDK logs during init
    logging.getLogger("tablestore_for_agent_memory").setLevel(logging.CRITICAL)
    original_stderr = sys.stderr
    sys.stderr = io.StringIO()

    try:
        # Init memory service
        if memory_exists:
            _init_memory_service_without_create(memory_service)
        else:
            await memory_service.start()

        # Init session service - always use custom init to avoid index name conflicts
        if session_all_exist:
            _init_session_service_without_create(session_service)
        else:
            # Create tables and indexes using SDK with custom index names
            await _create_session_tables(client)
            _init_session_service_without_create(session_service)

    finally:
        sys.stderr = original_stderr
        logging.getLogger("tablestore_for_agent_memory").setLevel(logging.WARNING)

    print("Services initialized")


async def _create_session_tables(client):
    """Create session and message tables with custom index names."""
    from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore

    # Create the memory store with custom index names
    store = AsyncMemoryStore(
        tablestore_client=client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
        session_secondary_index_name=SESSION_SECONDARY_INDEX,
        session_search_index_name=SESSION_SEARCH_INDEX,
        message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
        message_search_index_name=MESSAGE_SEARCH_INDEX,
    )
    # Init tables and indexes
    await store.init_table()
    # Wait for table creation
    await asyncio.sleep(1)


def _init_memory_service_without_create(service):
    """Initialize memory service store without creating tables."""
    from tablestore_for_agent_memory.knowledge.async_knowledge_store import AsyncKnowledgeStore

    service._knowledge_store = AsyncKnowledgeStore(
        tablestore_client=service._tablestore_client,
        vector_dimension=service._vector_dimension,
        enable_multi_tenant=False,
        table_name=service._table_name,
        search_index_name=MEMORY_SEARCH_INDEX,
        search_index_schema=service._search_index_schema,
        text_field=service._text_field,
        embedding_field=service._embedding_field,
        vector_metric_type=service._vector_metric_type,
    )


def _init_session_service_without_create(service):
    """Initialize session service store without creating tables."""
    from tablestore_for_agent_memory.memory.async_memory_store import AsyncMemoryStore

    service._memory_store = AsyncMemoryStore(
        tablestore_client=service._tablestore_client,
        session_table_name=SESSION_TABLE,
        message_table_name=MESSAGE_TABLE,
        session_secondary_index_name=SESSION_SECONDARY_INDEX,
        session_search_index_name=SESSION_SEARCH_INDEX,
        message_secondary_index_name=MESSAGE_SECONDARY_INDEX,
        message_search_index_name=MESSAGE_SEARCH_INDEX,
        session_secondary_index_meta=service._session_secondary_index_meta,
        session_search_index_schema=service._session_search_index_schema,
        message_search_index_schema=service._message_search_index_schema,
    )


async def create_new_session():
    global current_session, current_session_id, chat_history
    current_session_id = f"session_{uuid.uuid4().hex[:8]}"
    current_session = await session_service.create_session(
        user_id=user_id,
        session_id=current_session_id,
    )
    chat_history = []
    print(f"New session created: {current_session_id}")


def create_message(role: str, text: str) -> Message:
    return Message(
        type=MessageType.MESSAGE,
        role=role,
        content=[TextContent(type=ContentType.TEXT, text=text)],
    )


async def save_message(role: str, text: str):
    if current_session is None:
        return
    msg = create_message(role, text)
    await session_service.append_message(current_session, msg)


async def search_memory(query: str) -> str:
    msg = create_message(Role.USER, query)
    results = await memory_service.search_memory(user_id=user_id, messages=[msg])
    if not results:
        return ""
    texts = [r.get_text_content() for r in results[:3] if r.get_text_content()]
    return "\n".join([f"  - {t}" for t in texts])


async def add_memory(text: str):
    msg = create_message(Role.USER, text)
    await memory_service.add_memory(
        user_id=user_id,
        messages=[msg],
        session_id=current_session_id,
    )
    print(f"Memory saved: {text}")


def call_llm(messages: list, memory_context: str = "") -> str:
    system_prompt = "You are a helpful AI assistant."
    if memory_context:
        system_prompt += f"\n\nUser's historical memory:\n{memory_context}"

    dashscope_messages = [{"role": "system", "content": system_prompt}]
    for role, content in messages:
        dashscope_messages.append({"role": role, "content": content})

    response = Generation.call(
        model="qwen-turbo",
        messages=dashscope_messages,
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        result_format="message",
    )
    if response.status_code == 200:
        return response.output.choices[0].message.content
    return f"Error: {response.code} - {response.message}"


async def chat(user_input: str):
    global chat_history
    memory_context = await search_memory(user_input)
    if memory_context:
        print(f"[Memory] Found related memories:\n{memory_context}")

    messages = chat_history + [("user", user_input)]

    with Live(Spinner("dots", text="Thinking...", style="cyan"), console=console, refresh_per_second=10):
        response = call_llm(messages, memory_context)

    await save_message(Role.USER, user_input)
    await save_message(Role.ASSISTANT, response)
    chat_history.append(("user", user_input))
    chat_history.append(("assistant", response))

    console.print(f"\n[bold green]Assistant:[/bold green] {response}\n")


async def list_sessions():
    sessions = await session_service.list_sessions(user_id=user_id)
    print(f"Total {len(sessions)} sessions:")
    for s in sessions[:10]:
        print(f"  - {s.id}")


async def list_memories():
    memories = await memory_service.list_memory(user_id=user_id)
    print(f"Total {len(memories)} memories:")
    for m in memories[:10]:
        text = m.get_text_content()
        if text:
            print(f"  - {text[:60]}...")


async def load_session(session_id: str):
    global current_session, current_session_id, chat_history
    current_session_id = session_id
    current_session = await session_service.get_session(user_id, session_id)
    chat_history = []
    for msg in current_session.messages:
        text = msg.get_text_content() if hasattr(msg, "get_text_content") else str(msg)
        role = msg.role if hasattr(msg, "role") else "unknown"
        chat_history.append((role, text))
    print(f"Loaded session: {session_id}, {len(current_session.messages)} messages")


def print_help():
    print("""
Commands:
  /new          - Create new session
  /sessions     - List all sessions
  /load <id>    - Load session by ID
  /remember <x> - Save memory
  /memories     - List all memories
  /history      - Show current chat history
  /help         - Show this help
  /quit         - Exit

Otherwise, just type your message to chat.
""")


async def main():
    await init_services()
    await create_new_session()
    print_help()

    while True:
        try:
            user_input = input("You: ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\nBye!")
            break

        if not user_input:
            continue

        if user_input == "/quit":
            print("Bye!")
            break
        elif user_input == "/help":
            print_help()
        elif user_input == "/new":
            await create_new_session()
        elif user_input == "/sessions":
            await list_sessions()
        elif user_input == "/memories":
            await list_memories()
        elif user_input == "/history":
            print(f"Current session: {current_session_id}")
            for role, text in chat_history:
                print(f"  [{role}] {text[:80]}...")
        elif user_input.startswith("/load "):
            session_id = user_input[6:].strip()
            await load_session(session_id)
        elif user_input.startswith("/remember "):
            text = user_input[10:].strip()
            await add_memory(text)
        else:
            await chat(user_input)

    await memory_service.stop()
    await session_service.stop()


if __name__ == "__main__":
    asyncio.run(main())
  • 运行示例:

    python examples/tablestore_chat_demo.py
  • 交互命令:

    命令

    功能

    /new

    创建新会话

    /sessions

    列出所有会话

    /load <id>

    加载指定会话

    /remember <内容>

    保存长期记忆

    /memories

    列出所有记忆

    /history

    显示当前会话历史

    /help

    显示帮助信息

    /quit

    退出

相关文档