下拉提示Demo

配置环境变量

配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

重要
  • 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维,具体操作,请参见创建RAM用户

  • 创建AccessKey ID和AccessKey Secret,请参考创建AccessKey

  • 如果您使用的是RAM用户的AccessKey,请确保主账号已授权AliyunServiceRoleForOpenSearch服务关联角色,请参考OpenSearch-行业算法版服务关联角色,相关文档参考访问鉴权规则

  • 请不要将AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

  • LinuxmacOS系统配置方法:

    执行以下命令,其中, <access_key_id>需替换为您RAM用户的AccessKey ID,<access_key_secret>替换为您RAM用户的AccessKey Secret。

    export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> 
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
  • Windows系统配置方法

    1. 新建环境变量文件,添加环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,并写入已准备好的AccessKey ID和AccessKey Secret。

    2. 重启Windows系统生效。

引入依赖

pip install alibabacloud_tea_util 
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials

下拉提示示例

# -*- coding: utf-8 -*-

import time, os
from typing import Dict, Any

from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models

from BaseRequest import Config, Client


class opensearch:
    def __init__(self, config: Config):
        self.Clients = Client(config=config)
        self.runtime = util_models.RuntimeOptions(
            connect_timeout=10000,
            read_timeout=10000,
            autoretry=False,
            ignore_ssl=False,
            max_idle_conns=50,
            max_attempts=3
        )
        self.header = {}

    def appAlgoSearch(self, app_name: str, algo_name: str, algo_type: str, query_params: dict) -> Dict[str, Any]:

        algo_type = algo_type if algo_type else "suggest"
        path_name = f'/v3/openapi/apps/{app_name}/suggest/{algo_name}/search'

        if algo_type != "suggest":
            if algo_type not in ["hint", "hot"]:
                raise Exception("algo_type should be hint or hot")
            path_name = f'/v3/openapi/apps/{app_name}/actions/{algo_type}'

        try:
            response = self.Clients._request(method="GET", pathname=path_name, query=query_params,headers = self.header,body=None, runtime=self.runtime)
            return response
        except TeaException as e:
            print(e)


if __name__ == "__main__":

    # 配置统一的请求入口 注意:host需要去掉http://
    endpoint = "<endpoint>"

    # 支持 protocol 配置 HTTPS/HTTP
    endpoint_protocol = "HTTP"

 		# 用户识别信息
    # 从环境变量读取配置的AccessKey ID和AccessKey Secret,
    # 运行代码示例前必须先配置环境变量,参考文档上面“配置环境变量”步骤
    access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
    access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

    # 支持 type 配置 sts/access_key 鉴权. 其中 type 默认为 access_key 鉴权. 使用 sts 可配置 RAM-STS 鉴权.
    # 备选参数为:  sts 或者 access_key
    auth_type = "sts"

    # 如果使用 RAM-STS 鉴权, 请配置 security_token, 可使用 阿里云 AssumeRole 获取 相关 STS 鉴权结构.
    security_token = "<security_token>"

 # 配置请求使用的通用信息.
 # 注意:security_token和type参数,如果不是子账号需要省略
    Configs = Config(endpoint=endpoint, access_key_id=access_key_id, access_key_secret=access_key_secret,
                     security_token=security_token, type=auth_type, protocol=endpoint_protocol)

    # 创建 opensearch 实例
    ops = opensearch(Configs)
    app_name = "app_name"
    model_name = "model_name"

   # --------------- 下拉提示搜索 ---------------
    
    suggest_algoQuery = {
        "query": "开放搜索",
        "hit": 10,
        "user_id": "a7a0d37c824b659f36a5b9e3b819fcdd"
    }
    res2 = ops.appAlgoSearch(app_name=app_name, algo_name=model_name, algo_type="suggest",
                             query_params=suggest_algoQuery)
    print(res2)
说明

参考:下拉提示搜索