数据集(Dataset)

更新时间:
复制为 MD 格式

Dataset 是 AgentLoop 为 AI 场景设计的新型数据存储,在传统日志存储的基础上提供完整 CRUD、灵活 Schema、向量检索与多维分析能力,让 AI 运行时数据从"只读日志"升级为"可管理资产"。Dataset 适用于 AI 应用的数据全生命周期管理,包括训练数据管理、评测数据集构建、Bad Case 追踪、模型版本回归等场景。

背景介绍

与 SLS Logstore 的关系

维度

Logstore

Dataset

数据模型

Append-Only,写入后不可修改

完整 CRUD,支持增删改查

Schema

灵活自定义索引配置

自定义 Schema,支持类型: text / long / double / json

检索能力

全文检索 + SQL 分析

全文检索 + 语义搜索 + SQL 分析 + 组合查询

向量能力

支持 embedding 向量索引,支持语义相似度检索

内置 embedding 向量索引,支持语义相似度检索

数据修订

不支持

支持按 ID 更新和删除

适用场景

日志采集、监控告警、审计合规

AI 数据管理、评测基准、训练数据、Bad Case 追踪

核心能力

能力

说明

自定义 Schema

自定义字段与类型,支持 textlongdoublejson,JSON 支持嵌套子字段索引

向量检索

对 text 字段配置 embedding 模型即开启向量索引,支持语义相似度检索

完整 CRUD

通过标准 SQL 执行 INSERT / UPDATE / DELETE,数据可修订、可演进

多维检索

全文检索 + 语义搜索 + SQL 分析,四种查询模式自由组合

版本追溯

每条数据自动生成唯一 ID,支持追溯、导出与回归测试

使用步骤

环境准备

安装 SDK

pip install alibabacloud-cms20240330-inner==6.0.8

配置凭证

通过环境变量或 .env 文件设置:

export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"
export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"
export ALIBABA_CLOUD_ENDPOINT="cms.cn-shanghai.aliyuncs.com"
export ALIBABA_CLOUD_CMS_WORKSPACE="your-workspace"

初始化客户端

from alibabacloud_cms20240330.client import Client
from alibabacloud_tea_openapi.models import Config

config = Config(
    access_key_id="your-access-key-id",
    access_key_secret="your-access-key-secret",
    endpoint="cms.cn-shanghai.aliyuncs.com",
)
client = Client(config)
workspace = "your-workspace"

创建 Dataset

Schema 字段类型

Dataset 支持用户自定义 Schema,可选字段类型:

类型

说明

可选能力

示例字段

text

文本类型,支持全文和语义检索

chn: 开启中文分词;embedding: 开启向量索引

question、output

long

长整型,支持数值范围查询

input_tokens、latency_ms

double

浮点型,支持数值范围查询

score

json

JSON 类型,支持嵌套字段索引

json_keys: 定义子字段索引

metadata

向量索引:对 text 类型字段设置 embedding 即开启向量索引,支持语义检索。

内置 ID 主键:Dataset 自动为每条数据生成唯一的 id 主键。UPDATE 和 DELETE 必须通过该主键进行。

SDK 示例

from alibabacloud_cms20240330.models import (
    CreateDatasetRequest,
    IndexJsonKey,
    IndexKey,
)

schema = {
    "input": IndexKey(
        type="text",
        chn=True,                          # 开启中文分词
        embedding="text-embedding-v4",     # 开启向量索引
    ),
    "output": IndexKey(
        type="text",
        chn=True,
        embedding="text-embedding-v4",
    ),
    "model": IndexKey(type="text"),
    "score": IndexKey(type="double"),
    "metadata": IndexKey(
        type="json",
        json_keys={
            "input_tokens": IndexJsonKey(type="long"),
            "output_tokens": IndexJsonKey(type="long"),
        },
    ),
}

request = CreateDatasetRequest(
    dataset_name="my_dataset",
    description="AI 问答数据集",
    schema=schema,
)
client.create_dataset(workspace, request)

数据写入

Dataset 支持通过 ExecuteQuery 接口执行 INSERT SQL 写入。

插入数据(INSERT)

数据写入通过 ExecuteQuery 接口执行标准 SQL。

单条插入

from alibabacloud_cms20240330.models import ExecuteQueryRequest

sql = """
INSERT INTO my_dataset (input, output, model, score)
VALUES (
    '如何查看最近一小时的错误日志?',
    '使用查询语句: level:ERROR',
    'qwen-plus',
    0.95
)
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)
print(f"affected_rows: {response.body.meta.affected_rows}")

批量插入

sql = """
INSERT INTO my_dataset (input, output, model, score)
VALUES
    ('统计今天各接口调用次数', 'SELECT api, count(*) ...', 'gpt-4o', 0.88),
    ('查找响应超时的请求', 'latency > 5000 | SELECT ...', 'claude-3.5-sonnet', 0.92)
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
client.execute_query(workspace, "my_dataset", request)

更新数据(UPDATE)

更新操作必须通过 id 主键进行,暂不支持批量更新:

sql = """
UPDATE my_dataset
SET score = 0.98, output = '优化后的回答...'
WHERE id = 'your-doc-id'
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
client.execute_query(workspace, "my_dataset", request)

删除数据(DELETE)

删除操作同样必须通过 id 主键进行:

sql = "DELETE FROM my_dataset WHERE id = 'your-doc-id'"
request = ExecuteQueryRequest(query=sql, type="SQL")
client.execute_query(workspace, "my_dataset", request)
提示:可先通过查询获取目标数据的 id,再执行更新或删除。

数据查询

数据查询通过 ExecuteQuery 接口(type="SQL"),支持四种查询模式,可自由组合:

查询模式

语法

说明

全文检索

field:keyword

关键词匹配,支持 AND / OR / 数值比较

语义搜索

similarity() / semantic_distance()

向量语义检索(两种形式)

SQL 分析

SELECT ... FROM dataset_name ...

标准 SQL 查询与统计

组合查询

<检索条件> | <SQL 语句>

自由组合

全文检索

全文检索使用关键词匹配语法,与 SLS 查询语法一致:

语法

说明

示例

field:keyword

单字段匹配

input:错误

field1:v1 AND field2:v2

组合条件

model:qwen-plus AND input:日志

field:v1 OR field:v2

或条件

model:qwen-plus OR model:gpt-4o

field > value

数值比较

score > 0.9

field:v1 AND field > n

混合条件

model:qwen-plus AND score > 0.8

使用关键词匹配语法,可单独使用或通过管道 | 与 SQL 组合:

from alibabacloud_cms20240330.models import ExecuteQueryRequest

# 全文检索
request = ExecuteQueryRequest(query="input:错误", type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

# 全文检索 + SQL
query = "input:错误 | SELECT input, score FROM my_dataset ORDER BY score DESC LIMIT 5"
request = ExecuteQueryRequest(query=query, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

语义搜索

语义搜索基于向量相似度进行检索,前提是对应字段已在 Schema 中配置 embedding 向量索引。

提供两种使用形式,满足不同的查询和分析场景:

形式一:检索语法 similarity()

用于管道 | 左侧的检索条件中,可与全文检索语法通过 AND / OR混合使用:

# 语义搜索
request = ExecuteQueryRequest(query="similarity(input, '日志分析') < 0.3", type="SQL")

# 语义搜索 + 全文检索
request = ExecuteQueryRequest(
    query="similarity(input, '日志分析') < 0.3 AND model:qwen-plus",
    type="SQL",
)

# 语义搜索 + SQL
request = ExecuteQueryRequest(
    query="similarity(input, '日志分析') < 0.3 | SELECT input, score FROM my_dataset ORDER BY score DESC",
    type="SQL",
)

形式二:SQL 函数 semantic_distance()

用于管道 | 右侧的 SQL 语句中,可在 SELECTWHEREORDER BY 中灵活使用:

sql = """
SELECT input, semantic_distance(input, '日志查询统计') AS similarity
FROM my_dataset
WHERE semantic_distance(input, '日志查询统计') < 0.3
ORDER BY semantic_distance(input, '日志查询统计') ASC
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

两种形式对比

维度

similarity()

semantic_distance()

位置

| 左侧(检索条件)

| 右侧(SQL 语句)

用途

快速语义过滤

精确分析与排序

可否获取距离值

是(可作为 SELECT 列)

可否与全文检索组合

是(AND / OR)

是(在 SQL WHERE 中)

典型场景

快速召回语义相关数据

精排、相似度排序、距离分析

阈值建议:阈值范围 0-1,值越小表示语义越相似。

阈值范围

匹配程度

适用场景

0.1 - 0.2

严格匹配

精确去重、近似查找

0.2 - 0.3

常规匹配

相关内容检索

0.3 - 0.5

宽松匹配

探索性搜索、主题聚类

SQL 分析

Dataset 支持标准 SQL 查询与统计分析,包括 GROUP BY / HAVING / ORDER BY / LIMIT / CTE / 子查询 / 窗口函数等。SQL 引擎基于 PrestoSQL 语法,在服务端转换为 PostgreSQL 执行。

sql = """
SELECT model, count(*) AS total, avg(score) AS avg_score
FROM my_dataset
GROUP BY model
ORDER BY total DESC
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

分页使用 LIMIT offset, count 语法:

page_size = 10
page = 2
skip = (page - 1) * page_size
# 第 2 页(跳过 10 条,取 10 条)
sql = f"SELECT * FROM my_dataset ORDER BY score DESC LIMIT {skip}, {page_size}"
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

支持的函数

支持的 SQL 子句

子句

支持

说明

SELECT

列选择、表达式、别名、*、子查询

FROM

单表查询、子查询(带别名)

WHERE

比较运算、IN / NOT INBETWEENLIKEIS NULLEXISTSIS DISTINCT FROM

GROUP BY

基础分组、GROUPING SETSROLLUPCUBE

HAVING

聚合条件过滤

ORDER BY

ASC / DESCNULLS FIRST / NULLS LAST

LIMIT

LIMIT countLIMIT offset, count

WITH (CTE)

公共表表达式

JOIN

不支持任何类型的 JOIN

UNION / INTERSECT / EXCEPT

不支持集合操作

聚合函数

函数

说明

示例

count(*)

计数

SELECT count(*) FROM ds

count(column)

非空计数

SELECT count(score) FROM ds

count(DISTINCT column)

去重计数

SELECT count(DISTINCT model) FROM ds

sum(column)

求和

SELECT sum(score) FROM ds

avg(column)

平均值

SELECT avg(score) FROM ds

min(column) / max(column)

最小 / 最大值

SELECT min(score), max(score) FROM ds

count_if(condition)

条件计数

SELECT count_if(score > 0.9) FROM ds

approx_percentile(column, p)

近似百分位数

SELECT approx_percentile(score, 0.5) FROM ds

JSON 函数

函数

说明

示例

json_extract_scalar(col, '$.path')

提取 JSON 标量值

json_extract_scalar(metadata, '$.input_tokens')

json_size(col, '$.path')

获取 JSON 数组/对象大小

json_size(metadata, '$.tags')

注意json_extract_scalar 仅支持标量值提取。json_size 不支持通配符路径(如 $.a[*])。不支持 json_extract 函数。

日期时间函数

函数

说明

示例

now()

当前时间

WHERE __time__ > to_unixtime(now() - interval '1' day)

from_unixtime(epoch)

Unix 时间戳转时间

SELECT from_unixtime(__time__)

to_unixtime(timestamp)

时间转 Unix 时间戳

WHERE __time__ >= to_unixtime(...)

date(timestamp)

提取日期

SELECT date(from_unixtime(__time__))

date_add(unit, n, timestamp)

日期加减

date_add('day', 7, ts)

date_diff(unit, ts1, ts2)

日期差值

date_diff('day', ts1, ts2)

INTERVAL

时间间隔

now() - interval '7' day

类型转换与表达式

函数 / 表达式

说明

示例

CAST(expr AS type)

类型转换

CAST(score AS varchar)

try_cast(expr AS type)

安全类型转换(失败返回 NULL)

try_cast(val AS bigint)

typeof(expr)

获取类型名

typeof(score)

CASE WHEN ... THEN ... ELSE ... END

条件表达式

BETWEEN ... AND ...

范围判断

score BETWEEN 0.8 AND 1.0

ARRAY[...]

数组构造

ARRAY[1, 2, 3]

`

`

窗口函数

函数

说明

示例

row_number() OVER (...)

行号

row_number() OVER (PARTITION BY model ORDER BY score DESC)

rank() OVER (...)

排名

rank() OVER (PARTITION BY model ORDER BY score DESC)

其他常用函数

函数

说明

round(value, n)

四舍五入

floor(value)

向下取整

length(str)

字符串长度

lower(str)

转小写

coalesce(a, b, ...)

返回第一个非空值

组合查询

四种查询模式可通过管道 | 自由组合,检索条件放 | 左侧,SQL 放右侧:

# 全文 + SQL + semantic_distance
query = """
model:qwen-plus
| SELECT input, output, score
  FROM my_dataset
  WHERE semantic_distance(input, '数据分析') < 0.4
  ORDER BY score DESC
  LIMIT 10
"""
request = ExecuteQueryRequest(query=query, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)
关于 SQL 支持的完整能力(函数、子句、限制等),请参阅 Dataset 产品功能文档

Dataset 管理

操作

API

说明

创建

client.create_dataset(workspace, CreateDatasetRequest)

创建 Dataset 并定义 Schema

查看

client.get_dataset(workspace, dataset_name)

获取 Dataset 详情(含 Schema)

列举

client.list_datasets(workspace, ListDatasetsRequest)

列举所有 Dataset,支持分页和名称过滤

更新

client.update_dataset(workspace, dataset_name, UpdateDatasetRequest)

更新 Dataset 描述

删除

client.delete_dataset(workspace, dataset_name)

删除 Dataset(不可恢复)

列举 Dataset

from alibabacloud_cms20240330.models import ListDatasetsRequest

request = ListDatasetsRequest(max_results=100)
response = client.list_datasets(workspace, request)
for ds in response.body.datasets:
    print(f"{ds.dataset_name}: {ds.description}")

# 按名称过滤
request = ListDatasetsRequest(max_results=100, dataset_name="my_dataset")
response = client.list_datasets(workspace, request)

获取 Dataset 详情

response = client.get_dataset(workspace, "my_dataset")
print(response.body.to_map())  # 包含 Schema、创建时间等

更新 Dataset

from alibabacloud_cms20240330.models import UpdateDatasetRequest

request = UpdateDatasetRequest(description="更新后的描述")
client.update_dataset(workspace, "my_dataset", request)

删除 Dataset

# 谨慎操作,数据不可恢复
client.delete_dataset(workspace, "my_dataset")

典型场景

场景一:Bad Case 管理

从线上数据中筛选低分样本,人工标注后更新:

# 1. 筛选低分样本
sql = "SELECT id, input, output, score FROM my_dataset WHERE score < 0.3 ORDER BY score ASC LIMIT 50"
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

# 2. 人工审核后更新标注
sql = """
UPDATE my_dataset
SET human_label = 'hallucination', fix_suggestion = '需要增加事实核查'
WHERE id = 'bad-case-id'
"""
client.execute_query(workspace, "my_dataset", ExecuteQueryRequest(query=sql, type="SQL"))

场景二:版本回归测试

基于 Dataset 构建评测基准集,对比不同模型版本效果:

# 获取基准测试集
sql = "SELECT id, input, expected_output FROM my_dataset WHERE is_baseline = 'true'"
request = ExecuteQueryRequest(query=sql, type="SQL")
baseline = client.execute_query(workspace, "my_dataset", request)

# 运行新版本模型,对比结果
for sample in baseline.body.data:
    new_output = run_new_model(sample["input"])
    score = evaluate(new_output, sample["expected_output"])
    # 记录评测结果...

场景三:训练数据导出

筛选高质量样本导出用于模型 SFT 微调或 RL 后训练:

import json

# 查询高质量样本
sql = """
SELECT input, output FROM my_dataset
WHERE human_label = 'correct' AND score >= 0.9
LIMIT 10000
"""
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

# 导出为 JSONL 格式
with open("sft_data.jsonl", "w") as f:
    for item in response.body.data:
        f.write(json.dumps({
            "input": item["input"],
            "output": item["output"],
        }, ensure_ascii=False) + "\n")

限制与约束

约束项

限制

说明

默认返回行数

1,000

未指定 LIMIT 时,默认返回 1000 条

单次查询最大返回行数

100,000

LIMIT 不可超过此值

单次查询最大返回数据量

100MB

单次查询返回的数据量不超过此值

JOIN

不支持

暂不支持任何类型的 JOIN 操作

集合操作

不支持

暂不支持 UNION / INTERSECT / EXCEPT

跨 Dataset 查询

不支持

暂不支持与其他 Dataset 的联合查询

UPDATE / DELETE 方式

仅支持按 id 主键

暂不支持按条件批量更新 / 删除

部分函数

不支持

approx_distinctmax_bynormalizetransformsequence、URL 函数等

参数占位符

不支持

不支持 ? 参数化查询

最佳实践

Schema 设计

  • 有高频语义搜索需求的字段,建议配置 embedding 开启向量索引。

  • 使用 json 类型支持动态扩展的数据列,如元数据、自定义多维打标列等。

数据管理

  • 通过查询获取 id 后再执行更新 / 删除。

  • 定期清理低质量数据,保持信噪比。

  • 结合 Pipeline 实现自动化数据清洗入库。

查询优化

  • 语义搜索配合条件过滤缩小范围。

  • 大数据量场景建议使用分页方式分批获取数据。

  • 复杂统计优先使用 SQL 分析。

常见问题

Dataset 与 LogStore 有什么区别?

LogStore 是 Append-Only 的日志存储,适合写入后不再修改的场景;Dataset 支持完整的 CRUD 操作,适合需要修订、打标、版本管理的 AI 数据场景。

如何获取数据的 ID?

插入数据时系统自动生成 id。可通过查询获取:

sql = "SELECT id, input FROM my_dataset LIMIT 10"
request = ExecuteQueryRequest(query=sql, type="SQL")
response = client.execute_query(workspace, "my_dataset", request)

更新和删除为什么必须用 ID?

为保证数据一致性和操作安全,UPDATE 和 DELETE 目前仅支持按主键 id 操作,不支持批量条件更新 / 删除。

语义搜索的阈值如何选择?

similarity() / semantic_distance() 返回向量距离,范围 0-1,越小越相似。建议:

  • 0.1 - 0.2:严格匹配。

  • 0.2 - 0.3:常规匹配。

  • 0.3 - 0.5:宽松匹配。

数据如何自动入库?

AgentLoop 提供 Pipeline 数据处理能力,支持从 LogStore 自动清洗、去重、评估后写入 Dataset。详见 Pipeline 使用指南。

API 参考

Dataset 管理

操作

方法

参数

创建

client.create_dataset(workspace, request)

CreateDatasetRequest(dataset_name, description, schema)

查看

client.get_dataset(workspace, dataset_name)

列举

client.list_datasets(workspace, request)

ListDatasetsRequest(max_results, next_token, dataset_name)

更新

client.update_dataset(workspace, dataset_name, request)

UpdateDatasetRequest(description)

删除

client.delete_dataset(workspace, dataset_name)

数据操作

所有数据操作统一通过 ExecuteQuery 接口:

from alibabacloud_cms20240330.models import ExecuteQueryRequest

request = ExecuteQueryRequest(query="...", type="SQL")
response = client.execute_query(workspace, dataset_name, request)

参数

类型

说明

query

string

支持四种查询模式:全文检索、语义搜索、SQL 分析、组合查询。语义搜索提供 similarity()(检索语法)和 semantic_distance()(SQL 函数)两种形式

type

string

目前固定值 "SQL"

响应结构

{
  "data": [{"id": "...", "input": "...", "score": 0.95, ...}],
  "meta": {
    "count": 10,
    "affectedRows": 0,
    "elapsedMillisecond": 42,
    "progress": "Complete"
  },
  "requestId": "..."
}

字段

说明

data

查询结果数组(SELECT 操作)

meta.count

返回的数据条数

meta.affectedRows

影响的行数(INSERT / UPDATE / DELETE 操作)

meta.elapsedMillisecond

查询耗时(毫秒)

SDK 示例代码

完整可运行的示例代码见 dataset/samples/v2/ 目录:

文件

说明

代码附件

quickstart.py

端到端自闭环演示(创建→写入→查询→更新→删除)

quickstart.py

manage_dataset.py

Dataset 资源管理(创建 / 查看 / 列举 / 更新 / 删除)

manage_dataset.py

write_data.py

数据写入(单条插入 / 批量插入 / 更新 / 删除)

write_data.py

query_data.py

数据查询(全文检索 / 语义搜索 / SQL 分析 / 组合查询 / 分页)

query_data.py