多路搜索

多路搜索允许在单次请求中并行发起最多N路独立的查询,并对各路召回的结果进行统一的排序和合并。可为每一路查询自定义召回数量、优先级,并对最终结果进行来源追溯,适用于需要融合文本、向量、不同索引或不同查询策略的复杂搜索场景。

导入并配置相关依赖

OpenSearch 使用自有网关进行签名验签,依赖阿里云TeaDSL SDK进行签名构造,使用时需进行依赖引入:

pip install alibabacloud_tea_util  
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials 

通用请求方法如下:(需要您创建文件名为BaseRequest.py的文件

# -*- coding: utf-8 -*-

import time
from typing import Dict, Any

from Tea.core import TeaCore
from Tea.exceptions import TeaException, UnretryableException
from Tea.model import TeaModel
from Tea.request import TeaRequest
from alibabacloud_credentials import models as credential_models
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_opensearch_util.opensearch_util import OpensearchUtil
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Config(TeaModel):
    """
 Config
 用于配置环境相关参数信息.
 """
    def __init__(
            self,
            endpoint: str = None,
            protocol: str = None,
            type: str = None,
            security_token: str = None,
            access_key_id: str = None,
            access_key_secret: str = None,
            user_agent: str = "",
    ):
        self.endpoint = endpoint
        self.protocol = protocol
        self.type = type
        self.security_token = security_token
        self.access_key_id = access_key_id
        self.access_key_secret = access_key_secret
        self.user_agent = user_agent


class Client:
    """
    OpensearchClient
    用于 opensearch Client 请求 参数组装及发送请求.
    """
    _endpoint: str = None
    _protocol: str = None
    _user_agent: str = None
    _credential: CredentialClient = None

    def __init__(
            self,
            config: Config,
    ):
        if UtilClient.is_unset(config):
            raise TeaException({
                'name': 'ParameterMissing',
                'message': "'config' can not be unset"
            })
        if UtilClient.empty(config.type):
            config.type = 'access_key'
        credential_config = credential_models.Config(
            access_key_id=config.access_key_id,
            type=config.type,
            access_key_secret=config.access_key_secret,
            security_token=config.security_token
        )
        self._credential = CredentialClient(credential_config)
        self._endpoint = config.endpoint
        self._protocol = config.protocol
        self._user_agent = config.user_agent

    def _request(
            self,
            method: str,
            pathname: str,
            query: Dict[str, Any],
            headers: Dict[str, str],
            body: Any,
            runtime: util_models.RuntimeOptions,
    ) -> Dict[str, Any]:
        """
        执行 TeaRequest .
        :param request: TeaRequest
        :param runtime: util_models.RuntimeOptions
        :return: Dict[str, Any]
        """
        runtime.validate()
        _runtime = {
            'timeouted': 'retry',
            'readTimeout': runtime.read_timeout,
            'connectTimeout': runtime.connect_timeout,
            'httpProxy': runtime.http_proxy,
            'httpsProxy': runtime.https_proxy,
            'noProxy': runtime.no_proxy,
            'maxIdleConns': runtime.max_idle_conns,
            'retry': {
                'retryable': runtime.autoretry,
                'maxAttempts': UtilClient.default_number(runtime.max_attempts, 3)
            },
            'backoff': {
                'policy': UtilClient.default_string(runtime.backoff_policy, 'no'),
                'period': UtilClient.default_number(runtime.backoff_period, 1)
            },
            'ignoreSSL': runtime.ignore_ssl
        }
        _last_request = None
        _last_exception = None
        _now = time.time()
        _retry_times = 0
        while TeaCore.allow_retry(_runtime.get('retry'), _retry_times, _now):
            if _retry_times > 0:
                _backoff_time = TeaCore.get_backoff_time(_runtime.get('backoff'), _retry_times)
                if _backoff_time > 0:
                    TeaCore.sleep(_backoff_time)
            _retry_times = _retry_times + 1
            try:
                _request = TeaRequest()
                accesskey_id = self._credential.get_access_key_id()
                access_key_secret = self._credential.get_access_key_secret()
                security_token = self._credential.get_security_token()
                _request.protocol = UtilClient.default_string(self._protocol, 'HTTP')
                _request.method = method
                _request.pathname = pathname
                 # host 需要替换为您自己的实例域名
                _request.headers = TeaCore.merge({
                    'user-agent': UtilClient.get_user_agent(self._user_agent),
                    'Content-Type': 'application/json',
                    'Date': OpensearchUtil.get_date(),
                    'host': UtilClient.default_string(self._endpoint, f'opensearch-cn-hangzhou.aliyuncs.com'),
                    'X-Opensearch-Nonce': UtilClient.get_nonce()
                }, headers)
                if not UtilClient.is_unset(query):
                    _request.query = UtilClient.stringify_map_value(query)
                if not UtilClient.is_unset(body):
                    req_body = UtilClient.to_jsonstring(body)
                    _request.headers['Content-MD5'] = OpensearchUtil.get_content_md5(req_body)
                    _request.body = req_body
                if not UtilClient.is_unset(security_token):
                    _request.headers["X-Opensearch-Security-Token"] = security_token
                _request.headers['Authorization'] = OpensearchUtil.get_signature(_request, accesskey_id,
                                                                                 access_key_secret)
                _last_request = _request
                _response = TeaCore.do_action(_request, _runtime)
                obj_str = UtilClient.read_as_string(_response.body)
                if UtilClient.is_4xx(_response.status_code) or UtilClient.is_5xx(_response.status_code):
                    raise TeaException({
                        'message': _response.status_message,
                        'data': obj_str,
                        'code': _response.status_code
                    })
                obj = UtilClient.parse_json(obj_str)
                res = UtilClient.assert_as_map(obj)
                return {
                    'body': res,
                    'headers': _response.headers
                }
            except TeaException as e:
                if TeaCore.is_retryable(e):
                    _last_exception = e
                    continue
                raise e
        raise UnretryableException(_last_request, _last_exception)

配置环境依赖

配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

重要
  • 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维,具体操作,请参见创建RAM用户

  • 创建AccessKey IDAccessKey Secret,请参考创建AccessKey

  • 如果您使用的是RAM用户的AccessKey,请确保主账号已授权AliyunServiceRoleForOpenSearch服务关联角色,请参考OpenSearch-行业算法版服务关联角色,相关文档参考访问鉴权规则

  • 请不要将AccessKey IDAccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

  • LinuxmacOS系统配置方法:

    执行以下命令,其中, <access_key_id>需替换为您RAM用户的AccessKey ID,<access_key_secret>替换为您RAM用户的AccessKey Secret。

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> 
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
  • Windows系统配置方法

    1. 新建环境变量文件,添加环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,并写入已准备好的AccessKey IDAccessKey Secret。

    2. 重启Windows系统生效。

调用示例

# -*- coding: utf-8 -*-

import time, os
from typing import Dict, Any
from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models
from BaseRequest import Config, Client
class opensearch:
    def __init__(self, config: Config):
        self.Clients = Client(config=config)
        self.runtime = util_models.RuntimeOptions(
            connect_timeout=10000,
            read_timeout=10000,
            autoretry=False,
            ignore_ssl=False,
            max_idle_conns=50,
            max_attempts=3
        )
        self.header={}

    def muliPathSearch(self, app_name: str, body: dict) -> Dict[str, Any]:
        try:
            response = self.Clients._request(method="POST", pathname=f'/v3/openapi/apps/{app_name}/multi-path-search',
                                             query=None, headers = self.header, body=body, runtime=self.runtime)
            return response
        except TeaException as e:
            print(e)


if __name__ == "__main__":
    # 配置统一的请求入口和  需要去掉http://
    endpoint = "<endpoint>"

    # 支持 protocol 配置 HTTPS/HTTP
    endpoint_protocol = "HTTP"

    # 用户识别信息
    # 从环境变量读取配置的AccessKey ID和AccessKey Secret,
    # 运行代码示例前必须先配置环境变量,参考文档上面“配置环境变量”步骤
    access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
    access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

    # 支持 type 配置 sts/access_key 鉴权. 其中 type 默认为 access_key 鉴权. 使用 sts 可配置 RAM-STS 鉴权.
    # 备选参数为:  sts 或者 access_key
    auth_type = "access_key"

    # 如果使用 RAM-STS 鉴权, 请配置 security_token, 可使用 阿里云 AssumeRole 获取 相关 STS 鉴权结构.
    # security_token = "<security_token>"


    # 配置请求使用的通用信息.
    # type和security_token 参数如果不是子账号,需要省略
    Configs = Config(endpoint=endpoint, access_key_id=access_key_id, access_key_secret=access_key_secret,
                     type=auth_type, protocol=endpoint_protocol)



    # 创建 opensearch 实例
    ops = opensearch(Configs)
    app_name = "<appname>"

    # --------------- 多路搜索 ---------------

      # 自行书写 body 信息 , 查询相关请求构造可参数可参考: https://help.aliyun.com/zh/open-search/industry-algorithm-edition/multiple-search
   body = {
    "queries": [
        {
            "query": "title:'AI'",
            "total_rank_size": 10000,
            "total_rerank_size": 1000,
            "path": "sub",
            "priority": 1,
            "quota": 100,
            "qp": "sys_title"
        },
        {
            "query": "title:'OpenSearch'",
            "total_rank_size": 10000,
            "total_rerank_size": 1000,
            "path": "main",
            "priority": 1,
            "quota": 100,
            "sort": "+pk",
            "qp": "sys_title"
        }
    ],
    "raw_query": "OpenSearch",
    "start": 0,
    "hit": 10,
    "format": "fulljson",
    "rank_trace": "info",
    "unified_rank_type": "rrf",
    "unified_rank_size": 1000,
    "fetch_fields": "title",
    "user_id": "123",
    "vector_search": {
    }
}

    response = ops.muliPathSearch(app_name=app_name, body=body)
    print(response)

成功调用后将返回以下结果,单击查看完整信息:

{
  "body": {
    "status": "OK",
    "request_id": "c0492f1f8b0647bc031b0e50c38f5166",
    "result": {
      "searchtime": 0.061772,
      "total": 3,
      "num": 3,
      "viewtotal": 3,
      "compute_cost": [
        {
          "index_name": "caisheng_test2",
          "value": 13.73
        }
      ],
      "items": [
        {
          "fields": {
            "title": "OpenSearch-行业算法版"
          },
          "property": {},
          "attribute": {
            "pk": [
              "120142134_1"
            ]
          },
          "variableValue": {},
          "sortExprValues": [
            "0.0327869"
          ],
          "tracerInfo": "begin search path[main] rank trace:\nend search path[main] rank trace.\n\nbegin search path[main/vector] rank trace:\nend search path[main/vector] rank trace.\n"
        },
        {
          "fields": {
            "title": "OpenSearch-召回引擎版"
          },
          "property": {},
          "attribute": {
            "pk": [
              "120142134_2"
            ]
          },
          "variableValue": {},
          "sortExprValues": [
            "0.0322581"
          ],
          "tracerInfo": "begin search path[main] rank trace:\nend search path[main] rank trace.\n\nbegin search path[main/vector] rank trace:\nend search path[main/vector] rank trace.\n"
        },
        {
          "fields": {
            "title": "AI搜索开放平台"
          },
          "property": {},
          "attribute": {
            "pk": [
              "120142134_3"
            ]
          },
          "variableValue": {},
          "sortExprValues": [
            "0.0163934"
          ],
          "tracerInfo": "begin search path[sub] rank trace:\nend search path[sub] rank trace.\n"
        }
      ],
      "facet": []
    },
    "errors": [
      {
        "code": 2112,
        "message": "Specified index not in query:text_relevance(title,default,true)"
      }
    ],
    "tracer": "",
    "ops_request_misc": "%7B%22request%5Fid%22%3A%22c0492f1f8b0647bc031b0e50c38f5166%22%2C%22scm%22%3A%2220140713.120141928..%22%7D"
  },
  "headers": {
    "server": "Tengine",
    "date": "Thu, 06 Nov 2025 06:29:46 GMT",
    "content-type": "application/json; charset=utf-8",
    "content-length": "1332",
    "connection": "keep-alive",
    "keep-alive": "timeout=32",
    "x-acs-parent-uid": "26842",
    "x-acs-api-name": "opensearch:SearchApp",
    "x-app-name": "caisheng_test2",
    "x-acs-caller-type": "customer",
    "x-instance-type": "ha3",
    "application-execute-error": "2112",
    "x-acs-caller-uid": "1062017779051424",
    "x-app-id": "120142134",
    "x-appgroup-id": "120141928",
    "x-aliyun-user-id": "1062017779051424",
    "request-id": "c0492f1f8b0647bc031b0e50c38f5166"
  }
}