使用云服务将 Tablestore 数据转成向量

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

本文以文本数据为例介绍如何将存储在 Tablestore 中的数据通过阿里云大模型服务平台百炼生成向量并写入到表格存储的数据表中。

方案概览

阿里云的大模型服务平台百炼(以下简称“百炼”)是一站式的大模型开发及应用构建平台。百炼提供了多种向量模型,可以将文本、图像、语音等转换成向量。更多信息,请参见什么是百炼

本文使用的是百炼中的文本向量模型 text-embedding-v2,通过模型将存储在表格存储中的文本数据生成向量并写入到表格存储中只需要 3 步:

  1. 开通模型调用服务并获取 API-KEY :在百炼控制台开通模型调用服务并获取 API-KEY。

  2. 生成向量并写入到表格存储:使用 Java SDK 生成向量数据并写入到表格存储的数据表中。

  3. 结果验证:使用表格存储的数据读取接口或者多元索引向量检索功能查询数据。

image

使用说明

  • 开发语言:Java

  • Java 版本:Java 8 及以上版本。

注意事项

Tablestore 多元索引中向量检索字段的维度、类型和距离算法必须与百炼中的文本向量模型保持一致。例如本文使用百炼的文本向量模型 text-embedding-v2 生成向量,在 Tablestore 中创建多元索引时必须设置向量为 1536 维、FLOAT_32 类型和 DOT_PRODUCT 点积距离算法。

前提条件

  • 使用阿里云账号或者具有表格存储和百炼操作权限的 RAM 用户进行操作。

    如果要使用 RAM 用户进行操作,您需要使用阿里云账号创建 RAM 用户并授予 RAM 用户访问表格存储(AliyunOTSFullAccess)以及管理和使用阿里云百炼的权限。具体操作,请参见为 RAM 用户授予表格存储操作权限为 RAM 用户授予百炼操作权限

  • 已为阿里云账号或者 RAM 用户创建 AccessKey。具体操作,请参见创建 AccessKey

    警告

    阿里云账号 AccessKey 泄露会威胁您所有资源的安全。建议您使用 RAM 用户 AccessKey 进行操作,可以有效降低 AccessKey 泄露的风险。

1. 开通模型调用服务并获取 API-KEY

在百炼控制台开通模型调用服务并获取 API-KEY。

重要

建议您把 API-KEY 配置到环境变量,从而避免在代码里显式地配置 API-KEY ,降低泄漏风险。

1.1 开通模型调用服务

前往百炼控制台,如果页面顶部显示以下消息,您需要开通百炼的模型调用服务,以获得免费额度。如果未显示该消息,则表示您已经开通服务。

image

1.2 获取调用API所需凭证

在百炼控制台的右上角选择 API-KEY,然后在我的API-KEY 页面创建 API-KEY。

image

2. 生成向量并写入到表格存储

通过 Java SDK 使用百炼模型生成向量,并将数据写入到表格存储的数据表中。具体步骤如下:

说明

此处以百炼中的文本向量模型 text-embedding-v2 为例介绍向量的生成,其它模型请参见模型列表

  1. 安装 DashScope Java SDK 和表格存储 Java SDK。

    要在 Maven 工程中使用 DashScope SDK 和表格存储 SDK 时,您只需在 pom.xml 文件中加入如下依赖:

    <!--安装 DashScope Java SDK -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>dashscope-sdk-java</artifactId>
        <!-- 请根据实际替换为查询到的最新版本号:https://mvnrepository.com/artifact/com.alibaba/dashscope-sdk-java -->
        <version>最新版本</version>
    </dependency>
    <!--安装表格存储 Java SDK -->
    <dependency>
        <groupId>com.aliyun.openservices</groupId>
        <artifactId>tablestore</artifactId>
        <!-- 请根据实际替换为查询到的最新版本号 -->
        <version>最新版本</version>
    </dependency>   
  2. 生成向量并将向量数据写入表格存储中。

    说明

    使用表格存储功能前,需要初始化 OTSClient。具体操作,请参见初始化 OTSClient

    建议将 AccessKey(包括 AccessKey ID 和 AccessKey Secret)、实例名称和实例的服务地址配置到环境变量中。

    以下示例介绍了直接将文本模型生成的向量数据写入表格存储,以及将表格存储中存量的数据生成向量后再将向量数据写入到表格存储中两种方式。

    import com.alibaba.dashscope.embeddings.TextEmbedding;
    import com.alibaba.dashscope.embeddings.TextEmbeddingParam;
    import com.alibaba.dashscope.embeddings.TextEmbeddingResult;
    import com.alicloud.openservices.tablestore.SyncClient;
    import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest;
    import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse;
    import com.alicloud.openservices.tablestore.model.ColumnValue;
    import com.alicloud.openservices.tablestore.model.CreateTableRequest;
    import com.alicloud.openservices.tablestore.model.Direction;
    import com.alicloud.openservices.tablestore.model.GetRangeRequest;
    import com.alicloud.openservices.tablestore.model.GetRangeResponse;
    import com.alicloud.openservices.tablestore.model.PrimaryKey;
    import com.alicloud.openservices.tablestore.model.PrimaryKeyBuilder;
    import com.alicloud.openservices.tablestore.model.PrimaryKeySchema;
    import com.alicloud.openservices.tablestore.model.PrimaryKeyType;
    import com.alicloud.openservices.tablestore.model.PrimaryKeyValue;
    import com.alicloud.openservices.tablestore.model.RangeRowQueryCriteria;
    import com.alicloud.openservices.tablestore.model.Row;
    import com.alicloud.openservices.tablestore.model.RowPutChange;
    import com.alicloud.openservices.tablestore.model.RowUpdateChange;
    import com.alicloud.openservices.tablestore.model.TableMeta;
    import com.alicloud.openservices.tablestore.model.TableOptions;
    import com.alicloud.openservices.tablestore.model.UpdateRowRequest;
    import com.alicloud.openservices.tablestore.model.search.CreateSearchIndexRequest;
    import com.alicloud.openservices.tablestore.model.search.FieldSchema;
    import com.alicloud.openservices.tablestore.model.search.FieldType;
    import com.alicloud.openservices.tablestore.model.search.IndexSchema;
    import com.alicloud.openservices.tablestore.model.search.vector.VectorDataType;
    import com.alicloud.openservices.tablestore.model.search.vector.VectorMetricType;
    import com.alicloud.openservices.tablestore.model.search.vector.VectorOptions;
    
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.UUID;
    import java.util.stream.Collectors;
    
    public class DashScopeToTableStoreTests {
    
        private static final String DASH_SCOPE_API_KEY = "您的API-KEY";
    
        public static void main(String[] args) throws Exception {
            // 初始化tableStoreClient。
            SyncClient tableStoreClient = new SyncClient(System.getenv("endPoint"), System.getenv("accessId"), System.getenv("accessKey"), System.getenv("instanceName"));
            // 创建数据表和多元索引。
            {
                createTable(tableStoreClient);
                createSearchIndex(tableStoreClient);
            }
    
            // 方式 1:直接写入向量数据到 TableStore 中。
            {
                batchWriteRow(tableStoreClient);
            }
            // 方式 2:将 TableStore 中存量的数据生成向量再写入到 TableStore 中。
            {
                getRangeAndUpdateVector(tableStoreClient);
            }
        }
        
        // 文本转向量。
        private static String textToVector(String text) throws Exception {
            TextEmbeddingParam param = TextEmbeddingParam
                    .builder()
                    .model(TextEmbedding.Models.TEXT_EMBEDDING_V2) // V2 模型的向量进行了归一化,推荐使用点积(dot_product)进行向量检索。
                    .apiKey(DASH_SCOPE_API_KEY)
                    .texts(Collections.singleton(text)) // 支持同时生成多行,此处使用一行作为示例。实际使用时可考虑单次请求设置多行来减少调用。
                    .build();
            TextEmbedding textEmbedding = new TextEmbedding();
            TextEmbeddingResult result = textEmbedding.call(param);
            // 返回结果转成 Tablestore 支持的格式,即 float32 数组字符串。例如 [1, 5.1, 4.7, 0.08 ]。
            return result.getOutput().getEmbeddings().get(0).getEmbedding().stream().map(Double::floatValue).collect(Collectors.toList()).toString();
        }
    
        // 创建数据表。
        private static void createTable(SyncClient tableStoreClient) {
            TableMeta tableMeta = new TableMeta("TABLE_NAME");
            tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("PK_1", PrimaryKeyType.STRING));
            int timeToLive = -1; // 数据的过期时间, 单位秒, -1 代表永不过期. 假如设置过期时间为一年, 即为 365 * 24 * 3600。
            int maxVersions = 1;
            TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);
            CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);
            tableStoreClient.createTable(request);
        }
    
        // 创建多元索引,并指定向量索引字段信息。
        private static void createSearchIndex(SyncClient tableStoreClient) {
            CreateSearchIndexRequest request = new CreateSearchIndexRequest();
            request.setTableName("TABLE_NAME");
            request.setIndexName("INDEX_NAME");
            IndexSchema indexSchema = new IndexSchema();
            indexSchema.setFieldSchemas(Arrays.asList(
                    // 字符串字段,支持文本匹配查询。
                    new FieldSchema("field_string", FieldType.KEYWORD).setIndex(true),
                    // 整型字段,支持数字范围查询。
                    new FieldSchema("field_long", FieldType.LONG).setIndex(true),
                    // 全文检索字段。
                    new FieldSchema("field_text", FieldType.TEXT).setIndex(true).setAnalyzer(FieldSchema.Analyzer.MaxWord),
                    // 向量检索字段,使用点积作为距离度量算法,向量维度为 1536。
                    new FieldSchema("field_vector", FieldType.VECTOR).setIndex(true).setVectorOptions(new VectorOptions(VectorDataType.FLOAT_32, 1536, VectorMetricType.DOT_PRODUCT))
            ));
            request.setIndexSchema(indexSchema);
            tableStoreClient.createSearchIndex(request);
        }
    
        // 批量写入数据。
        private static void batchWriteRow(SyncClient tableStoreClient) throws Exception {
            // 写入 1 千行数据,每 100 行一个批次。
            for (int i = 0; i < 10; i++) {
                BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
                for (int j = 0; j < 100; j++) {
                    // 用户的业务数据。
                    String text = "一段字符串,可用户全文检索。同时该字段生成 Embedding 向量,写入到下方 field_vector 字段中进行向量语义相似性查询";
                    // 文本转为向量。
                    String vector = textToVector(text);
                    RowPutChange rowPutChange = new RowPutChange("TABLE_NAME");
                    // 设置主键。
                    rowPutChange.setPrimaryKey(PrimaryKeyBuilder.createPrimaryKeyBuilder().addPrimaryKeyColumn("PK_1", PrimaryKeyValue.fromString(UUID.randomUUID().toString())).build());
                    // 设置属性列。
                    rowPutChange.addColumn("field_string", ColumnValue.fromLong(i));
                    rowPutChange.addColumn("field_long", ColumnValue.fromLong(i * 100 + j));
                    rowPutChange.addColumn("field_text", ColumnValue.fromString(text));
                    // 向量格式为 float32 数组字符串,例如 [1, 5.1, 4.7, 0.08 ]。此处直接使用 DashScope 生成向量。
                    rowPutChange.addColumn("field_vector", ColumnValue.fromString(vector));
    
                    batchWriteRowRequest.addRowChange(rowPutChange);
                }
                BatchWriteRowResponse batchWriteRowResponse = tableStoreClient.batchWriteRow(batchWriteRowRequest);
                System.out.println("批量写入是否全部成功:" + batchWriteRowResponse.isAllSucceed());
                if (!batchWriteRowResponse.isAllSucceed()) {
                    for (BatchWriteRowResponse.RowResult rowResult : batchWriteRowResponse.getFailedRows()) {
                        System.out.println("失败的行:" + batchWriteRowRequest.getRowChange(rowResult.getTableName(), rowResult.getIndex()).getPrimaryKey());
                        System.out.println("失败原因:" + rowResult.getError());
                    }
                }
            }
        }
    
        // 遍历所有数据, 并更新 field_vector 向量字段。
        private static void getRangeAndUpdateVector(SyncClient tableStoreClient) throws Exception {
            int total = 0;
            RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("TABLE_NAME");
    
            PrimaryKeyBuilder start = PrimaryKeyBuilder.createPrimaryKeyBuilder();
            start.addPrimaryKeyColumn("PK_1", PrimaryKeyValue.INF_MIN);
            rangeRowQueryCriteria.setInclusiveStartPrimaryKey(start.build());
    
            PrimaryKeyBuilder end = PrimaryKeyBuilder.createPrimaryKeyBuilder();
            end.addPrimaryKeyColumn("PK_1", PrimaryKeyValue.INF_MAX);
            rangeRowQueryCriteria.setExclusiveEndPrimaryKey(end.build());
    
            rangeRowQueryCriteria.setMaxVersions(1);
            rangeRowQueryCriteria.setLimit(5000);
            rangeRowQueryCriteria.addColumnsToGet(Arrays.asList("field_text", "想要返回的其它字段"));
            rangeRowQueryCriteria.setDirection(Direction.FORWARD);
            GetRangeRequest getRangeRequest = new GetRangeRequest(rangeRowQueryCriteria);
            GetRangeResponse getRangeResponse;
            while (true) {
                getRangeResponse = tableStoreClient.getRange(getRangeRequest);
                for (Row row : getRangeResponse.getRows()) {
                    total++;
                    PrimaryKey primaryKey = row.getPrimaryKey();
                    // 获取用户的文本内容。
                    String text = row.getLatestColumn("field_text").getValue().asString();
                    // 文本转为向量。
                    String vector = textToVector(text);
                    // 将向量更新到数据表中。此处用单行更新作为示例,您可以使用 batchWriteRow 批量执行。
                    {
                        RowUpdateChange rowUpdateChange = new RowUpdateChange("TABLE_NAME");
                        // 设置主键。
                        rowUpdateChange.setPrimaryKey(primaryKey);
                        // 设置需要更新的属性列。
                        rowUpdateChange.put("field_vector", ColumnValue.fromString(vector));
                        // 执行单行更新。
                        tableStoreClient.updateRow(new UpdateRowRequest(rowUpdateChange));
                    }
                }
                if (getRangeResponse.getNextStartPrimaryKey() != null) {
                    rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
                } else {
                    break;
                }
            }
            System.out.println("一共处理数据: " + total);
        }
    }

3.结果验证

表格存储控制台查看写入表格存储中的向量数据。您可以使用数据读取接口(GetRow 、 BatchGetRow 和 GetRange)或者使用多元索引的向量检索查询向量数据。

  • 使用数据读取接口查询向量数据

    向量数据写入到表格存储后,您可以直接读取表中的向量数据。具体操作,请参见读取数据

  • 使用向量检索功能查询向量数据

    创建多元索引时配置向量字段后,您可以使用向量检索功能查询向量数据。具体操作,请参见向量检索介绍与使用

计费说明

相关文档

您也可以通过免费开源模型将表格存储中数据转成向量。更多信息,请参见使用开源模型将 Tablestore 数据转成向量