多路搜索允许在单次请求中并行发起最多N路独立的查询,并对各路召回的结果进行统一的排序和合并。可为每一路查询自定义召回数量、优先级,并对最终结果进行来源追溯,适用于需要融合文本、向量、不同索引或不同查询策略的复杂搜索场景。
导入并配置相关依赖
OpenSearch 使用自有网关进行签名验签,依赖阿里云TeaDSL SDK进行签名构造,使用时需进行依赖引入:
pip install alibabacloud_tea_util
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials 通用请求方法如下:(需要您创建文件名为BaseRequest.py的文件)
# -*- coding: utf-8 -*-
import time
from typing import Dict, Any
from Tea.core import TeaCore
from Tea.exceptions import TeaException, UnretryableException
from Tea.model import TeaModel
from Tea.request import TeaRequest
from alibabacloud_credentials import models as credential_models
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_opensearch_util.opensearch_util import OpensearchUtil
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Config(TeaModel):
"""
Config
用于配置环境相关参数信息.
"""
def __init__(
self,
endpoint: str = None,
protocol: str = None,
type: str = None,
security_token: str = None,
access_key_id: str = None,
access_key_secret: str = None,
user_agent: str = "",
):
self.endpoint = endpoint
self.protocol = protocol
self.type = type
self.security_token = security_token
self.access_key_id = access_key_id
self.access_key_secret = access_key_secret
self.user_agent = user_agent
class Client:
"""
OpensearchClient
用于 opensearch Client 请求 参数组装及发送请求.
"""
_endpoint: str = None
_protocol: str = None
_user_agent: str = None
_credential: CredentialClient = None
def __init__(
self,
config: Config,
):
if UtilClient.is_unset(config):
raise TeaException({
'name': 'ParameterMissing',
'message': "'config' can not be unset"
})
if UtilClient.empty(config.type):
config.type = 'access_key'
credential_config = credential_models.Config(
access_key_id=config.access_key_id,
type=config.type,
access_key_secret=config.access_key_secret,
security_token=config.security_token
)
self._credential = CredentialClient(credential_config)
self._endpoint = config.endpoint
self._protocol = config.protocol
self._user_agent = config.user_agent
def _request(
self,
method: str,
pathname: str,
query: Dict[str, Any],
headers: Dict[str, str],
body: Any,
runtime: util_models.RuntimeOptions,
) -> Dict[str, Any]:
"""
执行 TeaRequest .
:param request: TeaRequest
:param runtime: util_models.RuntimeOptions
:return: Dict[str, Any]
"""
runtime.validate()
_runtime = {
'timeouted': 'retry',
'readTimeout': runtime.read_timeout,
'connectTimeout': runtime.connect_timeout,
'httpProxy': runtime.http_proxy,
'httpsProxy': runtime.https_proxy,
'noProxy': runtime.no_proxy,
'maxIdleConns': runtime.max_idle_conns,
'retry': {
'retryable': runtime.autoretry,
'maxAttempts': UtilClient.default_number(runtime.max_attempts, 3)
},
'backoff': {
'policy': UtilClient.default_string(runtime.backoff_policy, 'no'),
'period': UtilClient.default_number(runtime.backoff_period, 1)
},
'ignoreSSL': runtime.ignore_ssl
}
_last_request = None
_last_exception = None
_now = time.time()
_retry_times = 0
while TeaCore.allow_retry(_runtime.get('retry'), _retry_times, _now):
if _retry_times > 0:
_backoff_time = TeaCore.get_backoff_time(_runtime.get('backoff'), _retry_times)
if _backoff_time > 0:
TeaCore.sleep(_backoff_time)
_retry_times = _retry_times + 1
try:
_request = TeaRequest()
accesskey_id = self._credential.get_access_key_id()
access_key_secret = self._credential.get_access_key_secret()
security_token = self._credential.get_security_token()
_request.protocol = UtilClient.default_string(self._protocol, 'HTTP')
_request.method = method
_request.pathname = pathname
# host 需要替换为您自己的实例域名
_request.headers = TeaCore.merge({
'user-agent': UtilClient.get_user_agent(self._user_agent),
'Content-Type': 'application/json',
'Date': OpensearchUtil.get_date(),
'host': UtilClient.default_string(self._endpoint, f'opensearch-cn-hangzhou.aliyuncs.com'),
'X-Opensearch-Nonce': UtilClient.get_nonce()
}, headers)
if not UtilClient.is_unset(query):
_request.query = UtilClient.stringify_map_value(query)
if not UtilClient.is_unset(body):
req_body = UtilClient.to_jsonstring(body)
_request.headers['Content-MD5'] = OpensearchUtil.get_content_md5(req_body)
_request.body = req_body
if not UtilClient.is_unset(security_token):
_request.headers["X-Opensearch-Security-Token"] = security_token
_request.headers['Authorization'] = OpensearchUtil.get_signature(_request, accesskey_id,
access_key_secret)
_last_request = _request
_response = TeaCore.do_action(_request, _runtime)
obj_str = UtilClient.read_as_string(_response.body)
if UtilClient.is_4xx(_response.status_code) or UtilClient.is_5xx(_response.status_code):
raise TeaException({
'message': _response.status_message,
'data': obj_str,
'code': _response.status_code
})
obj = UtilClient.parse_json(obj_str)
res = UtilClient.assert_as_map(obj)
return {
'body': res,
'headers': _response.headers
}
except TeaException as e:
if TeaCore.is_retryable(e):
_last_exception = e
continue
raise e
raise UnretryableException(_last_request, _last_exception)配置环境依赖
配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维,具体操作,请参见创建RAM用户。
创建AccessKey ID和AccessKey Secret,请参考创建AccessKey。
如果您使用的是RAM用户的AccessKey,请确保主账号已授权AliyunServiceRoleForOpenSearch服务关联角色,请参考OpenSearch-行业算法版服务关联角色,相关文档参考访问鉴权规则。
请不要将AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
Linux和macOS系统配置方法:
执行以下命令,其中,
<access_key_id>需替换为您RAM用户的AccessKey ID,<access_key_secret>替换为您RAM用户的AccessKey Secret。export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>Windows系统配置方法
新建环境变量文件,添加环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并写入已准备好的AccessKey ID和AccessKey Secret。
重启Windows系统生效。
调用示例
# -*- coding: utf-8 -*-
import time, os
from typing import Dict, Any
from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models
from BaseRequest import Config, Client
class opensearch:
def __init__(self, config: Config):
self.Clients = Client(config=config)
self.runtime = util_models.RuntimeOptions(
connect_timeout=10000,
read_timeout=10000,
autoretry=False,
ignore_ssl=False,
max_idle_conns=50,
max_attempts=3
)
self.header={}
def muliPathSearch(self, app_name: str, body: dict) -> Dict[str, Any]:
try:
response = self.Clients._request(method="POST", pathname=f'/v3/openapi/apps/{app_name}/multi-path-search',
query=None, headers = self.header, body=body, runtime=self.runtime)
return response
except TeaException as e:
print(e)
if __name__ == "__main__":
# 配置统一的请求入口和 需要去掉http://
endpoint = "<endpoint>"
# 支持 protocol 配置 HTTPS/HTTP
endpoint_protocol = "HTTP"
# 用户识别信息
# 从环境变量读取配置的AccessKey ID和AccessKey Secret,
# 运行代码示例前必须先配置环境变量,参考文档上面“配置环境变量”步骤
access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
# 支持 type 配置 sts/access_key 鉴权. 其中 type 默认为 access_key 鉴权. 使用 sts 可配置 RAM-STS 鉴权.
# 备选参数为: sts 或者 access_key
auth_type = "access_key"
# 如果使用 RAM-STS 鉴权, 请配置 security_token, 可使用 阿里云 AssumeRole 获取 相关 STS 鉴权结构.
# security_token = "<security_token>"
# 配置请求使用的通用信息.
# type和security_token 参数如果不是子账号,需要省略
Configs = Config(endpoint=endpoint, access_key_id=access_key_id, access_key_secret=access_key_secret,
type=auth_type, protocol=endpoint_protocol)
# 创建 opensearch 实例
ops = opensearch(Configs)
app_name = "<appname>"
# --------------- 多路搜索 ---------------
# 自行书写 body 信息 , 查询相关请求构造可参数可参考: https://help.aliyun.com/zh/open-search/industry-algorithm-edition/multiple-search
body = {
"queries": [
{
"query": "title:'AI'",
"total_rank_size": 10000,
"total_rerank_size": 1000,
"path": "sub",
"priority": 1,
"quota": 100,
"qp": "sys_title"
},
{
"query": "title:'OpenSearch'",
"total_rank_size": 10000,
"total_rerank_size": 1000,
"path": "main",
"priority": 1,
"quota": 100,
"sort": "+pk",
"qp": "sys_title"
}
],
"raw_query": "OpenSearch",
"start": 0,
"hit": 10,
"format": "fulljson",
"rank_trace": "info",
"unified_rank_type": "rrf",
"unified_rank_size": 1000,
"fetch_fields": "title",
"user_id": "123",
"vector_search": {
}
}
response = ops.muliPathSearch(app_name=app_name, body=body)
print(response)