Agent Memory SDK

基于TablestoreAgent Memory SDK框架,主要支持MemoryKnowledge场景,为AI Agent应用提供持久化、高性能的记忆存储和语义检索能力,帮助开发者快速构建具有上下文理解和长期记忆能力的智能应用。

核心架构

image

架构优势

  • 轻量化设计:抽象通用存储接口,降低业务开发复杂度,在技术深度与易用性之间实现平衡。开发者无需直接处理底层数据库接口调用,专注于业务逻辑开发即可快速产出结果。

  • 场景驱动设计:针对Memory实时记忆存储和Knowledge长期语义检索两大核心场景,提供完整的解决方案。在满足基础存储需求的同时,集成摘要记录、事实数据提取、用户画像标签挖掘等业务场景功能,实现存储与应用的深度融合。

  • 业务价值验证:基于成熟的业界最佳实践,开发者无需进行复杂的技术调研,可直接在自有业务场景中快速验证和落地AI应用的商业价值。

快速接入

以下通过Python示例演示SDK的完整接入和使用流程,Java接入方式请参考使用说明

环境准备

确保已安装Python运行环境,可通过python -version命令查看版本信息。

安装SDK

pip install tablestore-for-agent-memory

配置环境变量

设置以下必需的环境变量:

  • TABLESTORE_ACCESS_KEY_ID:阿里云账号或RAM用户的AccessKey ID。

  • TABLESTORE_ACCESS_KEY_SECRET:阿里云账号或RAM用户的AccessKey Secret。

  • TABLESTORE_INSTANCE_NAME:实例名称,可在表格存储控制台获取。

  • TABLESTORE_ENDPOINT:实例访问地址,可在表格存储控制台获取。

示例代码:Memory场景

Memory场景主要用于管理AI Agent的会话记忆,包括会话管理和消息存储等核心功能。以下示例演示了如何创建会话、记录对话以及查询历史记录的完整流程。

创建会话和写入对话记录

import tablestore
from tablestore_for_agent_memory.base.common import MetaType, microseconds_timestamp
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
from tablestore_for_agent_memory.base.base_memory_store import Session, Message
import os

# 从环境变量读取配置
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# 检查必需的环境变量
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"错误: 缺少必需的环境变量: {', '.join(missing_vars)}")
    print("请设置以下环境变量:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# 创建 tablestore 的 sdk client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# 根据 Session 的更新时间进行 list_recent_sessions 时候,需要返回哪些字段
session_secondary_index_meta = {
    "meta_string": MetaType.STRING,
    "meta_long": MetaType.INTEGER,
    "meta_double": MetaType.DOUBLE,
    "meta_boolean": MetaType.BOOLEAN,
    "meta_bytes": MetaType.BINARY,
}

# session 表的多元索引的meta信息
session_search_index_schema = [
    tablestore.FieldSchema(
        "title",
        tablestore.FieldType.TEXT,
        analyzer=tablestore.AnalyzerType.FUZZY,
        analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
    ),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# message 表的多元索引的 meta 信息
message_search_index_schema = [
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# 初始化 MemoryStore
memory_store = MemoryStore(
    tablestore_client=tablestore_client,
    session_secondary_index_meta=session_secondary_index_meta,
    session_search_index_schema=session_search_index_schema,
    message_search_index_schema=message_search_index_schema,
)

print("开始创建表和索引...")
# 创建表(包括二级索引),仅需执行一次
try:
    memory_store.init_table()
    memory_store.init_search_index()
    print("表和索引创建成功")
except Exception as e:
    print(f"表和索引已存在或创建失败: {e}")

# ====== 创建新会话并进行两轮对话 ======
print("\n====== 创建新会话 ======")

# 创建一个 Session
session = Session(user_id="test_user_1", session_id="session_001")
session.update_time = microseconds_timestamp()
session.title = "表格存储咨询"
session.metadata = {
    "meta_string": "web_source",
    "meta_long": 1,
    "meta_double": 1.0,
    "meta_boolean": True,
    "model_name": "qwen-max"
}

# 保存会话
memory_store.put_session(session)
print(f"创建会话成功: user_id={session.user_id}, session_id={session.session_id}")

# ====== 第一轮对话 ======
print("\n====== 第一轮对话 ======")

# 用户提问
message_1 = Message(
    session_id="session_001",
    message_id="msg_001",
    create_time=microseconds_timestamp()
)
message_1.content = "你好,请帮我介绍一下表格存储(Tablestore)是什么?"
message_1.metadata = {
    "meta_string": "web",
    "message_type": "user",
    "meta_long": 1
}
memory_store.put_message(message_1)
print(f"用户: {message_1.content}")

# 更新会话的更新时间
session.update_time = microseconds_timestamp()
memory_store.update_session(session)

# 大模型回复
message_2 = Message(
    session_id="session_001",
    message_id="msg_002",
    create_time=microseconds_timestamp()
)
message_2.content = "表格存储(Tablestore)是阿里云自研的第一代飞天产品,提供海量结构化数据存储以及快速的查询和分析服务。它支持多种数据模型,包括宽表模型、IM消息模型、时序模型等,可以满足不同场景的数据存储需求。"
message_2.metadata = {
    "message_type": "assistant",
    "model": "qwen-max"
}
memory_store.put_message(message_2)
print(f"助手: {message_2.content}")

# ====== 第二轮对话 ======
print("\n====== 第二轮对话 ======")

# 用户继续提问
message_3 = Message(
    session_id="session_001",
    message_id="msg_003",
    create_time=microseconds_timestamp()
)
message_3.content = "表格存储有哪些典型的应用场景?"
message_3.metadata = {
    "meta_string": "web",
    "message_type": "user",
    "meta_long": 2
}
memory_store.put_message(message_3)
print(f"用户: {message_3.content}")

# 更新会话的更新时间
session.update_time = microseconds_timestamp()
memory_store.update_session(session)

# 大模型回复
message_4 = Message(
    session_id="session_001",
    message_id="msg_004",
    create_time=microseconds_timestamp()
)
message_4.content = """表格存储的典型应用场景包括:
1. AI Agent 记忆存储:存储知识库、长期记忆、AI会话消息等信息
2. 元数据管理:存储海量文件、视频、图片的元信息
3. 消息数据:存储IM聊天消息、Feed流消息等
4. 轨迹溯源:车联网轨迹、物流轨迹等时序数据
5. 科学大数据:气象数据、基因数据等海量数据存储
6. 推荐系统:用户画像、物品特征等数据存储
7. 风控系统:实时风控规则、历史行为数据存储"""
message_4.metadata = {
    "message_type": "assistant",
    "model": "qwen-max"
}
memory_store.put_message(message_4)
print(f"助手: {message_4.content}")

print("\n====== 会话创建和对话完成 ======")
print(f"会话ID: {session.session_id}")
print(f"用户ID: {session.user_id}")
print(f"共完成 2 轮对话,4 条消息")

查询历史会话列表

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os

# 从环境变量读取配置
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# 检查必需的环境变量
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"错误: 缺少必需的环境变量: {', '.join(missing_vars)}")
    print("请设置以下环境变量:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# 创建 tablestore 的 sdk client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# 根据 Session 的更新时间进行 list_recent_sessions 时候,需要返回哪些字段
session_secondary_index_meta = {
    "meta_string": MetaType.STRING,
    "meta_long": MetaType.INTEGER,
    "meta_double": MetaType.DOUBLE,
    "meta_boolean": MetaType.BOOLEAN,
    "meta_bytes": MetaType.BINARY,
}

# session 表的多元索引的meta信息
session_search_index_schema = [
    tablestore.FieldSchema(
        "title",
        tablestore.FieldType.TEXT,
        analyzer=tablestore.AnalyzerType.FUZZY,
        analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
    ),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# message 表的多元索引的 meta 信息
message_search_index_schema = [
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# 初始化 MemoryStore
memory_store = MemoryStore(
    tablestore_client=tablestore_client,
    session_secondary_index_meta=session_secondary_index_meta,
    session_search_index_schema=session_search_index_schema,
    message_search_index_schema=message_search_index_schema,
)

print("====== 查询历史会话列表 ======\n")

# 查询指定用户的最近会话列表
user_id = "test_user_1"
max_count = 10  # 最多返回10个会话

print(f"查询用户 {user_id} 的最近会话...")

try:
    # 获取会话列表(按更新时间倒序)
    sessions = list(memory_store.list_recent_sessions(user_id=user_id, max_count=max_count))
    
    if not sessions:
        print(f"\n用户 {user_id} 暂无历史会话")
    else:
        print(f"\n共找到 {len(sessions)} 个会话:\n")
        
        for idx, session in enumerate(sessions, 1):
            print(f"会话 {idx}:")
            print(f"  - 会话ID: {session.session_id}")
            print(f"  - 用户ID: {session.user_id}")
            print(f"  - 标题: {session.title if hasattr(session, 'title') and session.title else '无标题'}")
            print(f"  - 创建时间: {session.create_time if hasattr(session, 'create_time') else '未知'}")
            print(f"  - 更新时间: {session.update_time if hasattr(session, 'update_time') else '未知'}")
            
            # 显示元数据
            if session.metadata:
                print(f"  - 元数据:")
                for key, value in session.metadata.items():
                    print(f"      {key}: {value}")
            print()
            
except Exception as e:
    print(f"查询会话列表失败: {e}")

print("====== 查询完成 ======")

查询指定会话详情

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os

# 从环境变量读取配置
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# 检查必需的环境变量
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"错误: 缺少必需的环境变量: {', '.join(missing_vars)}")
    print("请设置以下环境变量:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# 创建 tablestore 的 sdk client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# 根据 Session 的更新时间进行 list_recent_sessions 时候,需要返回哪些字段
session_secondary_index_meta = {
    "meta_string": MetaType.STRING,
    "meta_long": MetaType.INTEGER,
    "meta_double": MetaType.DOUBLE,
    "meta_boolean": MetaType.BOOLEAN,
    "meta_bytes": MetaType.BINARY,
}

# session 表的多元索引的meta信息
session_search_index_schema = [
    tablestore.FieldSchema(
        "title",
        tablestore.FieldType.TEXT,
        analyzer=tablestore.AnalyzerType.FUZZY,
        analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
    ),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# message 表的多元索引的 meta 信息
message_search_index_schema = [
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# 初始化 MemoryStore
memory_store = MemoryStore(
    tablestore_client=tablestore_client,
    session_secondary_index_meta=session_secondary_index_meta,
    session_search_index_schema=session_search_index_schema,
    message_search_index_schema=message_search_index_schema,
)

print("====== 查询指定会话的详情 ======\n")

# 指定要查询的会话
user_id = "test_user_1"
session_id = "session_001"

print(f"查询会话详情...")
print(f"用户ID: {user_id}")
print(f"会话ID: {session_id}\n")

try:
    # 获取会话详细信息
    session = memory_store.get_session(user_id=user_id, session_id=session_id)
    
    if session:
        print("会话详细信息:")
        print("=" * 50)
        print(f"用户ID: {session.user_id}")
        print(f"会话ID: {session.session_id}")
        print(f"标题: {session.title if hasattr(session, 'title') and session.title else '无标题'}")
        print(f"创建时间: {session.create_time if hasattr(session, 'create_time') else '未知'}")
        print(f"更新时间: {session.update_time if hasattr(session, 'update_time') else '未知'}")
        
        # 显示完整的元数据信息
        if session.metadata:
            print("\n元数据信息:")
            print("-" * 50)
            for key, value in session.metadata.items():
                print(f"  {key}: {value}")
        else:
            print("\n元数据: 无")
            
        print("=" * 50)
    else:
        print(f"未找到指定的会话 (user_id={user_id}, session_id={session_id})")
        
except Exception as e:
    print(f"查询会话详情失败: {e}")
    import traceback
    traceback.print_exc()

print("\n====== 查询完成 ======")

查询指定会话完整对话记录

import tablestore
from tablestore_for_agent_memory.base.common import MetaType
from tablestore_for_agent_memory.memory.memory_store import MemoryStore
import os

# 从环境变量读取配置
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# 检查必需的环境变量
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"错误: 缺少必需的环境变量: {', '.join(missing_vars)}")
    print("请设置以下环境变量:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# 创建 tablestore 的 sdk client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# 根据 Session 的更新时间进行 list_recent_sessions 时候,需要返回哪些字段
session_secondary_index_meta = {
    "meta_string": MetaType.STRING,
    "meta_long": MetaType.INTEGER,
    "meta_double": MetaType.DOUBLE,
    "meta_boolean": MetaType.BOOLEAN,
    "meta_bytes": MetaType.BINARY,
}

# session 表的多元索引的meta信息
session_search_index_schema = [
    tablestore.FieldSchema(
        "title",
        tablestore.FieldType.TEXT,
        analyzer=tablestore.AnalyzerType.FUZZY,
        analyzer_parameter=tablestore.FuzzyAnalyzerParameter(1, 4),
    ),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# message 表的多元索引的 meta 信息
message_search_index_schema = [
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# 初始化 MemoryStore
memory_store = MemoryStore(
    tablestore_client=tablestore_client,
    session_secondary_index_meta=session_secondary_index_meta,
    session_search_index_schema=session_search_index_schema,
    message_search_index_schema=message_search_index_schema,
)

print("====== 查询指定会话的完整对话记录 ======\n")

# 指定要查询的会话
session_id = "session_001"

print(f"查询会话对话记录...")
print(f"会话ID: {session_id}\n")

try:
    # 获取会话的所有消息记录
    messages = list(memory_store.list_messages(session_id=session_id))
    
    if not messages:
        print(f"会话 {session_id} 暂无对话记录")
    else:
        # 按创建时间正序排序(从旧到新)
        messages.sort(key=lambda m: m.create_time)
        
        print(f"共找到 {len(messages)} 条消息\n")
        print("=" * 80)
        
        # 按轮次显示对话
        round_num = 0
        for idx, message in enumerate(messages):
            # 判断是否是用户消息(新的一轮)
            message_type = message.metadata.get("message_type", "unknown")
            
            if message_type == "user":
                round_num += 1
                print(f"\n第 {round_num} 轮对话:")
                print("-" * 80)
            
            # 显示消息详情
            role = "用户" if message_type == "user" else "助手"
            print(f"\n[{role}] (消息ID: {message.message_id})")
            print(f"内容: {message.content}")
            print(f"创建时间: {message.create_time}")
            
            # 显示元数据(如果有)
            if message.metadata and len(message.metadata) > 1:  # 排除只有message_type的情况
                print("元数据:")
                for key, value in message.metadata.items():
                    if key != "message_type":  # message_type已经显示过了
                        print(f"  - {key}: {value}")
        
        print("\n" + "=" * 80)
        print(f"\n对话统计: 共 {round_num} 轮对话,{len(messages)} 条消息")
        
except Exception as e:
    print(f"查询对话记录失败: {e}")
    import traceback
    traceback.print_exc()

print("\n====== 查询完成 ======")

示例代码:Knowledge场景

Knowledge场景专注于构建AI知识库,支持海量文档的向量化存储和智能检索。以下示例展示如何创建知识库、导入文档,并通过向量检索、全文检索等方式实现智能问答。

示例代码使用阿里云百炼的text-embedding-v2模型进行向量化,需要先安装相关依赖并将API Key配置为环境变量OPENAI_API_KEY
pip install openai

创建知识库和写入知识

数据写入后,多元索引需要几秒钟完成同步。若使用后续示例代码查询不到数据,需等待多元索引同步完成。

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from tablestore_for_agent_memory.base.base_knowledge_store import Document
from openai import OpenAI
import os

# 通用 Embedding 类(使用 OpenAI 协议)
class OpenAIEmbedding:
    """使用 OpenAI 协议调用 Embedding 模型(支持百炼、OpenAI 等)"""
    
    def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
        """
        初始化 Embedding 客户端
        
        Args:
            api_key: API 密钥
            base_url: API 基础 URL(百炼使用 https://dashscope.aliyuncs.com/compatible-mode/v1)
            model: 模型名称
            dimension: 向量维度
        """
        self.client = OpenAI(
            api_key=api_key,
            base_url=base_url
        )
        self.model = model
        self.dimension = dimension
    
    def embedding(self, text):
        """将文本转换为向量"""
        try:
            response = self.client.embeddings.create(
                model=self.model,
                input=text
            )
            return response.data[0].embedding
        except Exception as e:
            print(f"Embedding 调用异常: {e}")
            return None


# 从环境变量读取配置
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
openai_api_key = os.getenv('OPENAI_API_KEY')

# 检查必需的环境变量
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name,
    'OPENAI_API_KEY': openai_api_key
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"错误: 缺少必需的环境变量: {', '.join(missing_vars)}")
    print("请设置以下环境变量:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# 创建 tablestore 的 sdk client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# 需要索引哪些 meta 字段
search_index_schema = [
    tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# 使用百炼 text-embedding-v2 模型(通过 OpenAI 协议调用,1536 维)
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
embedding_model = OpenAIEmbedding(
    api_key=openai_api_key,
    base_url=base_url,
    model="text-embedding-v2",
    dimension=1536
)

# 初始化 KnowledgeStore
knowledge_store = KnowledgeStore(
    tablestore_client=tablestore_client,
    vector_dimension=1536,  # text-embedding-v2 的向量维度是 1536
    enable_multi_tenant=True,  # 开启多租户能力
    search_index_schema=search_index_schema,
)

print("开始创建表和索引...")
# 创建表(包括多元索引),仅需执行一次
try:
    knowledge_store.init_table()
    print("表和索引创建成功")
except Exception as e:
    print(f"表和索引已存在或创建失败: {e}")

print("\n====== 写入 Tablestore 知识库文档 ======\n")

# 准备多条关于 Tablestore 的知识文档(主题多样化)
documents_data = [
    {
        "id": "doc_001",
        "text": "表格存储(Tablestore)是阿里云自研的第一代飞天产品,提供海量结构化数据存储以及快速的查询和分析服务。表格存储的分布式存储和强大的索引引擎能够支持单表PB级存储、千万TPS以及毫秒级延迟的服务能力。",
        "category": "产品介绍",
        "meta_long": 1
    },
    {
        "id": "doc_002",
        "text": "表格存储支持宽表模型,单表支持PB级数据存储和千万QPS,适合存储用户画像、订单详情等场景。同时支持时序模型,可以高效存储和查询物联网设备、监控系统产生的时序数据。",
        "category": "数据模型",
        "meta_long": 2
    },
    {
        "id": "doc_003",
        "text": "表格存储提供多种索引类型:主键索引支持快速的点查询和范围查询;全局二级索引可以基于非主键列进行查询;多元索引支持复杂的查询条件组合和全文检索;向量检索支持 AI 场景的相似度搜索。",
        "category": "索引功能",
        "meta_long": 3
    },
    {
        "id": "doc_004",
        "text": "表格存储适用于多种场景:元数据管理可以存储海量文件、视频、图片的元信息;消息数据用于存储 IM 聊天消息、Feed 流消息;轨迹溯源存储车联网轨迹、物流轨迹等时序数据;推荐系统存储用户画像和物品特征。",
        "category": "应用场景",
        "meta_long": 4
    },
    {
        "id": "doc_005",
        "text": "表格存储的多元索引支持丰富的查询能力,包括精确查询、范围查询、前缀查询、通配符查询、全文检索、地理位置查询、嵌套查询等。同时支持排序、聚合、统计分析等高级功能。",
        "category": "查询能力",
        "meta_long": 5
    },
    {
        "id": "doc_006",
        "text": "表格存储提供 Agent Memory 能力,包括 Memory Store 用于存储会话和消息记录,Knowledge Store 用于存储知识库文档并支持向量检索。这些能力可以帮助构建智能问答、对话机器人等 AI 应用。",
        "category": "AI 能力",
        "meta_long": 6
    },
    {
        "id": "doc_007",
        "text": "表格存储的向量检索功能支持海量向量数据的存储和高效检索,可以应用于图像搜索、语义搜索、推荐系统等场景。支持 L2 距离、余弦相似度等多种相似度算法。",
        "category": "向量检索",
        "meta_long": 7
    },
    {
        "id": "doc_008",
        "text": "表格存储提供多种数据保护机制:支持数据备份和恢复;提供数据生命周期管理,可以自动过期和删除旧数据;支持数据加密存储,保障数据安全。",
        "category": "数据保护",
        "meta_long": 8
    }
]

# 写入文档
tenant_id = "user_tablestore_001"
success_count = 0

for doc_data in documents_data:
    try:
        # 创建 Document 对象
        document = Document(document_id=doc_data["id"], tenant_id=tenant_id)
        document.text = doc_data["text"]
        
        # 生成向量
        document.embedding = embedding_model.embedding(document.text)
        
        if document.embedding is None:
            print(f"✗ 生成向量失败,跳过文档 {doc_data['id']}")
            continue
        
        # 设置元数据
        document.metadata["category"] = doc_data["category"]
        document.metadata["meta_long"] = doc_data["meta_long"]
        document.metadata["meta_boolean"] = True
        document.metadata["user_id"] = tenant_id
        
        # 写入数据库
        knowledge_store.put_document(document)
        
        success_count += 1
        print(f"✓ 写入文档 {doc_data['id']}: {doc_data['category']}")
        print(f"  内容: {doc_data['text'][:60]}...")
        print()
        
    except Exception as e:
        print(f"✗ 写入文档 {doc_data['id']} 失败: {e}")

print("=" * 80)
print(f"\n写入完成: 成功 {success_count}/{len(documents_data)} 条文档")
print(f"租户ID: {tenant_id}")
print(f"文档类别: {', '.join(set([d['category'] for d in documents_data]))}")
print("\n提示: 数据写入后,多元索引可能需要几秒钟时间完成同步")

向量检索

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore
from openai import OpenAI
import os


# 通用 Embedding 类(使用 OpenAI 协议)
class OpenAIEmbedding:
    """使用 OpenAI 协议调用 Embedding 模型(支持百炼、OpenAI 等)"""
    
    def __init__(self, api_key, base_url=None, model="text-embedding-v2", dimension=1536):
        """
        初始化 Embedding 客户端
        
        Args:
            api_key: API 密钥
            base_url: API 基础 URL(百炼使用 https://dashscope.aliyuncs.com/compatible-mode/v1)
            model: 模型名称
            dimension: 向量维度
        """
        self.client = OpenAI(
            api_key=api_key,
            base_url=base_url
        )
        self.model = model
        self.dimension = dimension
    
    def embedding(self, text):
        """将文本转换为向量"""
        try:
            response = self.client.embeddings.create(
                model=self.model,
                input=text
            )
            return response.data[0].embedding
        except Exception as e:
            print(f"Embedding 调用异常: {e}")
            return None


# 从环境变量读取配置
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')
openai_api_key = os.getenv('OPENAI_API_KEY')

# 检查必需的环境变量
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name,
    'OPENAI_API_KEY': openai_api_key
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"错误: 缺少必需的环境变量: {', '.join(missing_vars)}")
    print("请设置以下环境变量:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# 创建 tablestore 的 sdk client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# 需要索引哪些 meta 字段
search_index_schema = [
    tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# 使用百炼 text-embedding-v2 模型(通过 OpenAI 协议调用,1536 维)
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
embedding_model = OpenAIEmbedding(
    api_key=openai_api_key,
    base_url=base_url,
    model="text-embedding-v2",
    dimension=1536
)

# 初始化 KnowledgeStore
knowledge_store = KnowledgeStore(
    tablestore_client=tablestore_client,
    vector_dimension=1536,  # text-embedding-v2 的向量维度是 1536
    enable_multi_tenant=True,  # 开启多租户能力
    search_index_schema=search_index_schema,
)

print("====== 向量检索测试 ======\n")

# 查询问题
query_text = "表格存储支持哪些索引类型?"
tenant_id = "user_tablestore_001"

print(f"查询问题: {query_text}")
print(f"租户ID: {tenant_id}")
print(f"返回结果数: Top 3\n")

try:
    # 将查询文本转换为向量
    print("正在生成查询向量...")
    query_vector = embedding_model.embedding(query_text)
    
    if query_vector is None:
        print("生成查询向量失败")
    else:
        print(f"查询向量生成成功,维度: {len(query_vector)}\n")
        
        # 执行向量检索
        response = knowledge_store.vector_search(
            query_vector=query_vector,
            tenant_id=tenant_id,
            limit=3  # 只返回 top 3
        )
    
        if not response.hits:
            print("未找到相关文档")
        else:
            print("=" * 80)
            print(f"找到 {len(response.hits)} 个相关文档:\n")
            
            for idx, hit in enumerate(response.hits, 1):
                doc = hit.document
                score = hit.score
                
                print(f"【结果 {idx}】")
                print(f"文档ID: {doc.document_id}")
                print(f"相似度分数: {score:.4f}")
                
                if hasattr(doc, 'metadata') and 'category' in doc.metadata:
                    print(f"类别: {doc.metadata['category']}")
                
                print(f"内容: {doc.text}")
                print("-" * 80)
            
            print()
        
except Exception as e:
    print(f"向量检索失败: {e}")
    import traceback
    traceback.print_exc()

print("\n====== 检索完成 ======")

全文检索

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os


# 从环境变量读取配置
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# 检查必需的环境变量
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"错误: 缺少必需的环境变量: {', '.join(missing_vars)}")
    print("请设置以下环境变量:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# 创建 tablestore 的 sdk client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# 需要索引哪些 meta 字段
search_index_schema = [
    tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# 初始化 KnowledgeStore
knowledge_store = KnowledgeStore(
    tablestore_client=tablestore_client,
    vector_dimension=1536,  # text-embedding-v2 的向量维度是 1536
    enable_multi_tenant=True,  # 开启多租户能力
    search_index_schema=search_index_schema,
)

print("====== 全文检索测试 ======\n")

# 查询关键词
query_keyword = "向量检索"
tenant_id = "user_tablestore_001"

print(f"查询关键词: {query_keyword}")
print(f"租户ID: {tenant_id}")
print(f"返回结果数: Top 3\n")

try:
    # 执行全文检索(使用 text_match 进行文本匹配)
    response = knowledge_store.search_documents(
        tenant_id=tenant_id,
        metadata_filter=Filters.text_match("text", query_keyword),
        limit=3  # 只返回 top 3
    )
    
    if not response.hits:
        print("未找到包含关键词的文档")
    else:
        print("=" * 80)
        print(f"找到 {len(response.hits)} 个包含关键词的文档:\n")
        
        for idx, hit in enumerate(response.hits, 1):
            doc = hit.document
            score = hit.score
            
            print(f"【结果 {idx}】")
            print(f"文档ID: {doc.document_id}")
            print(f"匹配分数: {score if score is not None else 'N/A'}")
            
            if hasattr(doc, 'metadata') and 'category' in doc.metadata:
                print(f"类别: {doc.metadata['category']}")
            
            # 高亮显示匹配的关键词
            content = doc.text
            if query_keyword in content:
                # 简单的高亮显示
                highlighted = content.replace(query_keyword, f"【{query_keyword}】")
                print(f"内容: {highlighted}")
            else:
                print(f"内容: {content}")
            
            print("-" * 80)
        
        print()
        
except Exception as e:
    print(f"全文检索失败: {e}")
    import traceback
    traceback.print_exc()

print("\n====== 检索完成 ======")

print("\n补充说明:")
print("- 全文检索会在文档的 text 字段中搜索包含查询关键词的文档")
print("- 可以使用通配符、短语查询等高级语法")
print("- 支持中文分词和模糊匹配")

通用检索

import tablestore
from tablestore_for_agent_memory.knowledge.knowledge_store import KnowledgeStore, Filters
import os


# 从环境变量读取配置
endpoint = os.getenv('TABLESTORE_ENDPOINT')
access_key_id = os.getenv('TABLESTORE_ACCESS_KEY_ID')
access_key_secret = os.getenv('TABLESTORE_ACCESS_KEY_SECRET')
instance_name = os.getenv('TABLESTORE_INSTANCE_NAME')

# 检查必需的环境变量
required_env_vars = {
    'TABLESTORE_ENDPOINT': endpoint,
    'TABLESTORE_ACCESS_KEY_ID': access_key_id,
    'TABLESTORE_ACCESS_KEY_SECRET': access_key_secret,
    'TABLESTORE_INSTANCE_NAME': instance_name
}

missing_vars = [var for var, value in required_env_vars.items() if not value]
if missing_vars:
    print(f"错误: 缺少必需的环境变量: {', '.join(missing_vars)}")
    print("请设置以下环境变量:")
    for var in missing_vars:
        print(f"  export {var}=your_value")
    exit(1)

# 创建 tablestore 的 sdk client
tablestore_client = tablestore.OTSClient(
    endpoint,
    access_key_id,
    access_key_secret,
    instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)

# 需要索引哪些 meta 字段
search_index_schema = [
    tablestore.FieldSchema("user_id", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("category", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_string", tablestore.FieldType.KEYWORD),
    tablestore.FieldSchema("meta_long", tablestore.FieldType.LONG),
    tablestore.FieldSchema("meta_double", tablestore.FieldType.DOUBLE),
    tablestore.FieldSchema("meta_boolean", tablestore.FieldType.BOOLEAN),
]

# 初始化 KnowledgeStore
knowledge_store = KnowledgeStore(
    tablestore_client=tablestore_client,
    vector_dimension=1536,  # text-embedding-v2 的向量维度是 1536
    enable_multi_tenant=True,  # 开启多租户能力
    search_index_schema=search_index_schema,
)

print("====== 通用检索测试 ======\n")

tenant_id = "user_tablestore_001"

print("通用检索支持基于元数据的灵活过滤查询,不依赖向量或全文检索")
print(f"租户ID: {tenant_id}")
print(f"返回结果数: Top 3\n")

# 测试场景 1: 按类别过滤
print("【场景 1】查询类别为 '应用场景' 的文档")
print("-" * 80)

try:
    response = knowledge_store.search_documents(
        tenant_id=tenant_id,
        limit=3,
        metadata_filter=Filters.eq("category", "应用场景"),
        meta_data_to_get=["text", "category", "meta_long"]  # 只返回指定字段
    )
    
    if not response.hits:
        print("未找到匹配的文档\n")
    else:
        for idx, hit in enumerate(response.hits, 1):
            doc = hit.document
            print(f"\n结果 {idx}:")
            print(f"  文档ID: {doc.document_id}")
            print(f"  类别: {doc.metadata.get('category', 'N/A')}")
            print(f"  内容: {doc.text[:100]}...")
        print()
        
except Exception as e:
    print(f"检索失败: {e}\n")

# 测试场景 2: 组合条件过滤
print("\n【场景 2】查询 meta_long > 3 且 meta_boolean = True 的文档")
print("-" * 80)

try:
    response = knowledge_store.search_documents(
        tenant_id=tenant_id,
        limit=3,
        metadata_filter=Filters.logical_and([
            Filters.gt("meta_long", 3),
            Filters.eq("meta_boolean", True)
        ]),
        meta_data_to_get=["text", "category", "meta_long"]
    )
    
    if not response.hits:
        print("未找到匹配的文档\n")
    else:
        for idx, hit in enumerate(response.hits, 1):
            doc = hit.document
            print(f"\n结果 {idx}:")
            print(f"  文档ID: {doc.document_id}")
            print(f"  类别: {doc.metadata.get('category', 'N/A')}")
            print(f"  meta_long: {doc.metadata.get('meta_long', 'N/A')}")
            print(f"  内容: {doc.text[:80]}...")
        print()
        
except Exception as e:
    print(f"检索失败: {e}\n")

# 测试场景 3: 范围查询
print("\n【场景 3】查询 meta_long 在 2-5 之间的文档")
print("-" * 80)

try:
    response = knowledge_store.search_documents(
        tenant_id=tenant_id,
        limit=3,
        metadata_filter=Filters.logical_and([
            Filters.gte("meta_long", 2),  # 大于等于 2
            Filters.lte("meta_long", 5)   # 小于等于 5
        ]),
        meta_data_to_get=["text", "category", "meta_long"]
    )
    
    if not response.hits:
        print("未找到匹配的文档\n")
    else:
        for idx, hit in enumerate(response.hits, 1):
            doc = hit.document
            print(f"\n结果 {idx}:")
            print(f"  文档ID: {doc.document_id}")
            print(f"  类别: {doc.metadata.get('category', 'N/A')}")
            print(f"  meta_long: {doc.metadata.get('meta_long', 'N/A')}")
            print(f"  内容: {doc.text[:80]}...")
        print()
        
except Exception as e:
    print(f"检索失败: {e}\n")

# 测试场景 4: 获取所有文档(不带过滤条件)
print("\n【场景 4】获取所有文档(不带过滤条件)")
print("-" * 80)

try:
    response = knowledge_store.search_documents(
        tenant_id=tenant_id,
        limit=3,
        meta_data_to_get=["text", "category", "meta_long"]
    )
    
    if not response.hits:
        print("未找到任何文档\n")
    else:
        print(f"\n共找到 {len(response.hits)} 个文档(显示前3个):")
        for idx, hit in enumerate(response.hits, 1):
            doc = hit.document
            print(f"\n结果 {idx}:")
            print(f"  文档ID: {doc.document_id}")
            print(f"  类别: {doc.metadata.get('category', 'N/A')}")
            print(f"  内容: {doc.text[:60]}...")
        
        if response.next_token:
            print(f"\n还有更多结果,可使用 next_token 进行翻页查询")
        print()
        
except Exception as e:
    print(f"检索失败: {e}\n")

print("\n" + "=" * 80)
print("\n====== 检索完成 ======")

print("\n通用检索特点:")
print("- 支持基于元数据字段的灵活过滤")
print("- 支持精确匹配、范围查询、逻辑组合等")
print("- 不需要向量或全文检索,适合结构化查询")
print("- 可以指定返回的字段,减少数据传输量")

相关文档