OpenAPI封装了云原生数据仓库AnalyticDB PostgreSQL版向量操作的DDL和DML,使您可以通过OpenAPI来管理向量数据。本文以SDK Python 3调用方式介绍如何通过API导入并查询向量数据。
前提条件
已创建存储弹性模式6.0版的AnalyticDB PostgreSQL版实例。具体操作,请参见创建实例。
已开启向量引擎优化。具体操作,请参见开启或关闭向量检索引擎优化。
已创建初始账号。具体操作,请参见创建数据库账号。
若您使用RAM用户,则需要对RAM用户进行授权,更多方式请参见使用OpenAPI示例。
操作流程
安装SDK
当您没有指定SDK版本时,将自动安装最新版本的SDK,具体代码如下:
pip install alibabacloud_gpdb20160503
pip install alibabacloud_tea_openapi
当您需要安装指定版本的SDK时(本文alibabacloud_gpdb20160503
以3.5.0版本为例,alibabacloud_tea_openapi
以0.3.8版本为例),请执行如下命令:
pip install alibabacloud_gpdb20160503==3.5.0
pip install alibabacloud_tea_openapi==0.3.8
初始化Client
初始化访问OpenAPI的Client,调用示例如下:
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_gpdb20160503.client import Client
import os
ALIBABA_CLOUD_ACCESS_KEY_ID = os.environ['ALIBABA_ACCESS_KEY_ID']
ALIBABA_CLOUD_ACCESS_KEY_SECRET = os.environ['ALIBABA_ACCESS_KEY_SECRET']
ADBPG_INSTANCE_ID = os.environ['ADBPG_INSTANCE_ID']
ADBPG_INSTANCE_REGION = os.environ['ADBPG_INSTANCE_REGION']
def get_client():
config = open_api_models.Config(
access_key_id=ALIBABA_CLOUD_ACCESS_KEY_ID,
access_key_secret=ALIBABA_CLOUD_ACCESS_KEY_SECRET
)
config.region_id = ADBPG_INSTANCE_REGION
return Client(config)
环境变量参数如下:
ALIBABA_ACCESS_KEY_ID:访问OpenAPI的Access Key ID。
ALIBABA_ACCESS_KEY_SECRET:访问OpenAPI的Secret Access Key。
ADBPG_INSTANCE_ID:实例的ID。
ADBPG_INSTANCE_REGION:实例所在的地域。
初始化向量库
在使用向量检索前,需初始化knowledgebase库以及全文检索相关功能。调用示例如下:
from alibabacloud_gpdb20160503 import models as gpdb_20160503_models
def init_vector_database(account, account_password):
request = gpdb_20160503_models.InitVectorDatabaseRequest(
region_id=ADBPG_INSTANCE_REGION,
dbinstance_id=ADBPG_INSTANCE_ID,
manager_account=account,
manager_account_password=account_password
)
response = get_client().init_vector_database(request)
print(f"init_vector_database response code: {response.status_code}, body:{response.body}")
if __name__ == '__main__':
init_vector_database("testacc", "Test1234")
# output: body:
# {
# "Message":"success",
# "RequestId":"FC1E0318-E785-1F21-A33C-FE4B0301B608",
# "Status":"success"
# }
参数说明,请参见InitVectorDatabase - 初始化向量数据库。
创建Namespace
Namespace用于Schema隔离,在使用向量前,需至少创建一个Namespace或者使用public的Namespace。调用示例如下:
def create_namespace(account, account_password, namespace, namespace_password):
request = gpdb_20160503_models.CreateNamespaceRequest(
region_id=ADBPG_INSTANCE_REGION,
dbinstance_id=ADBPG_INSTANCE_ID,
manager_account=account,
manager_account_password=account_password,
namespace=namespace,
namespace_password=namespace_password
)
response = get_client().create_namespace(request)
print(f"create_namespace response code: {response.status_code}, body:{response.body}")
if __name__ == '__main__':
create_namespace("testacc", "Test1234", "ns1", "Ns1password")
# output: body:
# {
# "Message":"success",
# "RequestId":"78356FC9-1920-1E09-BB7B-CCB6BD267124",
# "Status":"success"
# }
参数说明,请参见CreateNamespace - 创建命名空间。
创建完后,可以在实例的knowledgebase库查看对应的Schema。
SELECT schema_name FROM information_schema.schemata;
创建Collection
Collection用于存储向量数据,并使用Namespace隔离。调用示例如下:
def create_collection(account,
account_password,
namespace,
collection,
metadata: str = None,
full_text_retrieval_fields: str = None,
parser: str = None,
embedding_model: str = None,
dimension: int = None,
metrics: str = None,
hnsw_m: int = None,
pq_enable: int = None,
external_storage: int = None,):
request = gpdb_20160503_models.CreateCollectionRequest(
region_id=ADBPG_INSTANCE_REGION,
dbinstance_id=ADBPG_INSTANCE_ID,
manager_account=account,
manager_account_password=account_password,
namespace=namespace,
collection=collection,
metadata=metadata,
full_text_retrieval_fields=full_text_retrieval_fields,
parser=parser,
dimension=dimension,
metrics=metrics,
hnsw_m=hnsw_m,
pq_enable=pq_enable,
external_storage=external_storage
)
response = get_client().create_collection(request)
print(f"create_collection response code: {response.status_code}, body:{response.body}")
if __name__ == '__main__':
metadata = '{"title":"text", "content": "text", "page":"int"}'
full_text_retrieval_fields = "title,content"
dimension = 8
create_collection("testacc", "Test1234", "ns1", "dc1",
metadata=metadata, full_text_retrieval_fields=full_text_retrieval_fields,
dimension=dimension)
# output: body:
# {
# "Message":"success",
# "RequestId":"7BC35B66-5F49-1E79-A153-8D26576C4A3E",
# "Status":"success"
# }
参数说明,请参见CreateCollection - 创建向量数据集。
创建完后,可以在实例的knowledgebase库查看对应的Table。
SELECT tablename FROM pg_tables WHERE schemaname='vector_test';
上传向量数据
将准备好的Embedding向量数据上传到对应的Collection中。调用示例如下:
def upsert_collection_data(namespace,
namespace_password,
collection,
rows):
request = gpdb_20160503_models.UpsertCollectionDataRequest(
region_id=ADBPG_INSTANCE_REGION,
dbinstance_id=ADBPG_INSTANCE_ID,
namespace=namespace,
namespace_password=namespace_password,
collection=collection,
rows=rows,
)
response = get_client().upsert_collection_data(request)
print(f"upsert_collection_data response code: {response.status_code}, body:{response.body}")
if __name__ == '__main__':
rows = []
rows.append(gpdb_20160503_models.UpsertCollectionDataRequestRows(
id="0CB55798-ECF5-4064-B81E-FE35B19E01A6",
metadata={
"page": 1,
"content": "测试内容",
"title": "测试文档"
},
vector=[0.2894745251078251, 0.5364747050266715, 0.14858841010401188, 0.42140750105351877,
0.5780346820809248, 0.1145475372279496, 0.04329004329004329, 0.43246796493549741]
))
upsert_collection_data("ns1", "Ns1password", "dc1", rows)
# output: body:
# {
# "Message":"success",
# "RequestId":"8FEE5D1E-ECE8-1F2F-A17F-48039125CDC3",
# "Status":"success"
# }
参数说明,请参见UpsertCollectionData - 上传向量数据。
上传完成,可以在实例的knowledgebase库查看数据。
SELECT * FROM vector_test.document;
召回向量数据
准备需要召回的查询向量或全文检索字段,执行查询接口。调用示例如下:
from typing import List
def query_collection_data(namespace,
namespace_password,
collection,
top_k,
content: str = None,
filter_str: str = None,
include_values: bool = None,
metrics: str = None,
vector: List[float] = None):
request = gpdb_20160503_models.QueryCollectionDataRequest(
region_id=ADBPG_INSTANCE_REGION,
dbinstance_id=ADBPG_INSTANCE_ID,
namespace=namespace,
namespace_password=namespace_password,
collection=collection,
top_k=top_k,
content=content,
filter=filter_str,
include_values=include_values,
metrics=metrics,
vector=vector,
)
response = get_client().query_collection_data(request)
print(f"query_collection_data response code: {response.status_code}, body:{response.body}")
if __name__ == '__main__':
content = "test query"
vector = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]
query_collection_data("ns1", "Ns1password", "dc1", 5, content=content, vector=vector)
# output:
# query_collection_data response code: 200, body:{'Matches': {'match': [{'Id': '0CB55798-ECF5-4064-B81E-FE35B19E01A6', 'Metadata': {'source': 1, 'page': '1', 'title': '测试文档', 'content': '测试内容'}, 'Score': 0.7208109110736349, 'Values': {'value': [0.28947452, 0.5364747, 0.1485884, 0.4214075, 0.5780347, 0.114547536, 0.043290045, 0.7]}}]}, 'RequestId': '709E2C82-FE25-1722-9DBB-00AD0F85ABBB', 'Status': 'success'}
参数说明,请参见QueryCollectionData - 召回向量数据。