Assistant API 支持知识检索增强(RAG)工具,让智能体能够根据您的需求获取外部知识,例如私有产品知识或客户的偏好信息。本文介绍了一个简单的“手机导购”示例,帮助您快速上手RAG工具的基本使用方法。
快速开始
在这个示例中,您将使用示例知识文件创建一个测试知识库,并构建一个能够帮助用户选购新款手机的智能体(Assistant)。
在您开始前
在开始使用 Assistant RAG 之前,请确保您已经:
注册了阿里云账号并开通了百炼服务。
安装了必要的 Python 库。
pip install alibabacloud_bailian20231229==1.8.2 dashscope
准备好了包含手机信息的文档百炼系列手机产品介绍.docx,用于创建知识库。
此外,您需要设置以下环境变量:
export ALIBABA_CLOUD_ACCESS_KEY_ID='您的阿里云访问密钥ID'
export ALIBABA_CLOUD_ACCESS_KEY_SECRET='您的阿里云访问密钥密码'
export DASHSCOPE_API_KEY='您的百炼API密钥'
export WORKSPACE_ID='您的百炼工作空间ID'
完整代码示例
import os
import hashlib
import requests
import time
from alibabacloud_bailian20231229.client import Client as bailian20231229Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_bailian20231229 import models as bailian_20231229_models
from alibabacloud_tea_util import models as util_models
from dashscope import Assistants, Messages, Runs, Threads
import sys
def check_environment_variables():
"""检查并提示设置必要的环境变量"""
required_vars = {
'ALIBABA_CLOUD_ACCESS_KEY_ID': '阿里云访问密钥ID',
'ALIBABA_CLOUD_ACCESS_KEY_SECRET': '阿里云访问密钥密码',
'DASHSCOPE_API_KEY': '百炼API密钥',
'WORKSPACE_ID': '百炼工作空间ID'
}
missing_vars = []
for var, description in required_vars.items():
if not os.environ.get(var):
missing_vars.append(var)
print(f"错误:请设置 {var} 环境变量 ({description})")
if missing_vars:
print("\n您可以使用以下命令设置环境变量:")
for var in missing_vars:
print(f"export {var}='您的{required_vars[var]}'")
return False
return True
# 第一部分:使用阿里云百炼创建知识库
def create_client() -> bailian20231229Client:
"""
创建并配置阿里云百炼客户端。
返回:
bailian20231229Client: 配置好的客户端。
"""
config = open_api_models.Config(
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
)
config.endpoint = 'bailian.cn-beijing.aliyuncs.com'
return bailian20231229Client(config)
def calculate_md5(file_path: str) -> str:
"""
计算文件的 MD5 哈希值。
参数:
file_path (str): 文件路径。
返回:
str: 文件的 MD5 哈希值。
"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def get_file_size(file_path: str) -> int:
"""
获取文件大小(以字节为单位)。
参数:
file_path (str): 文件路径。
返回:
int: 文件大小(以字节为单位)。
"""
return os.path.getsize(file_path)
def apply_lease(client, category_id, file_name, file_md5, file_size, workspace_id):
"""
从阿里云百炼服务申请文件上传租约。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
category_id (str): 类别 ID。
file_name (str): 文件名称。
file_md5 (str): 文件的 MD5 哈希值。
file_size (int): 文件大小(以字节为单位)。
workspace_id (str): 业务空间 ID。
返回:
阿里云百炼服务的响应。
"""
headers = {}
request = bailian_20231229_models.ApplyFileUploadLeaseRequest(
file_name=file_name,
md_5=file_md5,
size_in_bytes=file_size,
)
runtime = util_models.RuntimeOptions()
return client.apply_file_upload_lease_with_options(category_id, workspace_id, request, headers, runtime)
def upload_file(lease_id, upload_url, headers, file_path):
"""
将文件上传到阿里云百炼服务。
参数:
lease_id (str): 租约 ID。
upload_url (str): 上传 URL。
headers (dict): 上传请求的头部。
file_path (str): 文件路径。
"""
with open(file_path, 'rb') as f:
file_content = f.read()
upload_headers = {
"X-bailian-extra": headers["X-bailian-extra"],
"Content-Type": headers["Content-Type"]
}
response = requests.put(upload_url, data=file_content, headers=upload_headers)
response.raise_for_status()
def add_file(client: bailian20231229Client, lease_id: str, parser: str, category_id: str, workspace_id: str):
"""
将文件添加到阿里云百炼服务。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
lease_id (str): 租约 ID。
parser (str): 用于文件的解析器。
category_id (str): 类别 ID。
workspace_id (str): 业务空间 ID。
返回:
阿里云百炼服务的响应。
"""
headers = {}
request = bailian_20231229_models.AddFileRequest(
lease_id=lease_id,
parser=parser,
category_id=category_id,
)
runtime = util_models.RuntimeOptions()
return client.add_file_with_options(workspace_id, request, headers, runtime)
def describe_file(client, workspace_id, file_id):
"""
在阿里云百炼服务中描述文件。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
workspace_id (str): 业务空间 ID。
file_id (str): 文件 ID。
返回:
阿里云百炼服务的响应。
"""
headers = {}
runtime = util_models.RuntimeOptions()
return client.describe_file_with_options(workspace_id, file_id, headers, runtime)
def create_index(client, workspace_id, file_id, name, structure_type, source_type, sink_type):
"""
为文件在阿里云百炼服务中创建知识库索引。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
workspace_id (str): 业务空间 ID。
file_id (str): 文件 ID。
name (str): 知识库索引名称。
structure_type (str): 知识库的数据类型。
source_type (str): 数据管理的数据类型。
sink_type (str): 知识库的向量存储类型。
返回:
阿里云百炼服务的响应。
"""
headers = {}
request = bailian_20231229_models.CreateIndexRequest(
structure_type=structure_type,
name=name,
source_type=source_type,
sink_type=sink_type,
document_ids=[file_id]
)
runtime = util_models.RuntimeOptions()
return client.create_index_with_options(workspace_id, request, headers, runtime)
def submit_index(client, workspace_id, index_id):
"""
向阿里云百炼服务提交索引任务。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
workspace_id (str): 业务空间 ID。
index_id (str): 索引 ID。
返回:
阿里云百炼服务的响应。
"""
headers = {}
submit_index_job_request = bailian_20231229_models.SubmitIndexJobRequest(
index_id=index_id
)
runtime = util_models.RuntimeOptions()
return client.submit_index_job_with_options(workspace_id, submit_index_job_request, headers, runtime)
def get_index_job_status(client, workspace_id, job_id, index_id):
"""
获取阿里云百炼服务中索引任务的状态。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
workspace_id (str): 业务空间 ID。
job_id (str): 任务 ID。
index_id (str): 索引 ID。
返回:
阿里云百炼服务的响应。
"""
headers = {}
get_index_job_status_request = bailian_20231229_models.GetIndexJobStatusRequest(
index_id=index_id,
job_id=job_id
)
runtime = util_models.RuntimeOptions()
return client.get_index_job_status_with_options(workspace_id, get_index_job_status_request, headers, runtime)
def create_knowledge_base(
file_path: str,
workspace_id: str,
name: str
):
"""
使用阿里云百炼服务创建知识库。
参数:
file_path (str): 文件路径。
workspace_id (str): 业务空间 ID。
name (str): 知识库名称。
返回:
str or None: 如果成功,返回索引 ID;否则返回 None。
"""
# 设置默认值
category_id = 'default'
parser = 'DASHSCOPE_DOCMIND'
source_type = 'DATA_CENTER_FILE'
structure_type = 'unstructured'
sink_type = 'DEFAULT'
try:
# 步骤1:创建客户端
print("步骤1:创建阿里云百炼客户端")
client = create_client()
# 步骤2:准备文件信息
print("步骤2:准备文件信息")
file_name = os.path.basename(file_path)
file_md5 = calculate_md5(file_path)
file_size = get_file_size(file_path)
# 步骤3:申请上传租约
print("步骤3:向阿里云百炼申请上传租约")
lease_response = apply_lease(client, category_id, file_name, file_md5, file_size, workspace_id)
lease_id = lease_response.body.data.file_upload_lease_id
upload_url = lease_response.body.data.param.url
upload_headers = lease_response.body.data.param.headers
# 步骤4:上传文件
print("步骤4:上传文件到阿里云百炼")
upload_file(lease_id, upload_url, upload_headers, file_path)
# 步骤5:将文件添加到服务器
print("步骤5:将文件添加到阿里云百炼服务器")
add_response = add_file(client, lease_id, parser, category_id, workspace_id)
file_id = add_response.body.data.file_id
# 步骤6:检查文件状态
print("步骤6:检查阿里云百炼中的文件状态")
while True:
describe_response = describe_file(client, workspace_id, file_id)
status = describe_response.body.data.status
print(f"当前文件状态:{status}")
if status == 'INIT':
print("文件待解析,请稍候...")
elif status == 'PARSING':
print("文件解析中,请稍候...")
elif status == 'PARSE_SUCCESS':
print("文件解析完成!")
break
else:
print(f"未知的文件状态:{status},请联系技术支持。")
return None
time.sleep(5)
# 步骤7:创建知识文件索引
print("步骤7:在阿里云百炼中创建知识文件索引")
index_response = create_index(client, workspace_id, file_id, name, structure_type, source_type, sink_type)
index_id = index_response.body.data.id
# 步骤8:提交索引任务
print("步骤8:向阿里云百炼提交索引任务")
submit_response = submit_index(client, workspace_id, index_id)
job_id = submit_response.body.data.id
# 步骤9:获取索引任务状态
print("步骤9:获取阿里云百炼索引任务状态")
while True:
get_index_job_status_response = get_index_job_status(client, workspace_id, job_id, index_id)
status = get_index_job_status_response.body.data.status
print(f"当前索引任务状态:{status}")
if status == 'COMPLETED':
break
time.sleep(5)
print("阿里云百炼知识库创建成功!")
return index_id
except Exception as e:
print(f"发生错误:{e}")
return None
# 第二部分:在 Assistant API 中使用知识库
def create_assistant(index_id):
"""创建一个使用指定知识库的 Assistant。"""
assistant = Assistants.create(
model='qwen-plus',
name='智能手机选购助手',
description='一个帮助用户选择手机的智能助手。',
instructions='你是一个手机选购向导,你的任务是帮助用户选择满意的手机。使用提供的知识库来回答用户的问题。以下信息可能对你有帮助:${document1}。',
tools=[
{
"type": "rag", # 指定使用RAG(检索增强生成)模式
"prompt_ra": {
"pipeline_id": [index_id], # 指定使用的知识库索引ID
"multiknowledge_rerank_top_n": 10, # 多知识源重排序时返回的top N结果数
"rerank_top_n": 5, # 最终重排序后返回的top N结果数
"parameters": {
"type": "object",
"properties": {
"query_word": {
"type": "str",
"value": "${document1}" # 使用动态占位符,将被实际查询内容替换
}
}
}
}
},
]
)
return assistant.id
class SuppressPrint:
def __enter__(self):
self._original_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
def __exit__(self, exc_type, exc_val, exc_tb):
sys.stdout.close()
sys.stdout = self._original_stdout
def send_message(thread, assistant, message):
"""向 Assistant 发送消息并获取回复。"""
print(f"用户: {message}")
message = Messages.create(thread_id=thread.id, content=message)
run = Runs.create(thread_id=thread.id, assistant_id=assistant.id)
with SuppressPrint():
run_status = Runs.wait(run.id, thread_id=thread.id)
msgs = Messages.list(thread.id)
assistant_reply = msgs['data'][0]['content'][0]['text']['value']
print(f"助手: {assistant_reply}")
def interact_with_assistant(assistant):
print("\n步骤3:与 Assistant 交互")
thread = Threads.create() # 创建一个新的对话线程
while True:
user_input = input("\n请输入您的问题(输入 'quit' 退出):")
if user_input.lower() == 'quit':
break
max_retries = 3
for attempt in range(max_retries):
try:
send_message(thread, assistant, user_input)
break
except Exception as e:
if attempt < max_retries - 1:
print(f"发送消息失败,正在重试... (尝试 {attempt + 2}/{max_retries})")
else:
print(f"发送消息失败:{str(e)}。请稍后再试。")
print("感谢使用阿里云百炼 RAG 教程!")
def main():
print("欢迎使用阿里云百炼 RAG 教程!")
if not check_environment_variables():
return
start_option = input("请选择开始步骤:\n1. 从头开始(创建知识库和Assistant)\n2. 直接开始与Assistant交互\n请输入选项(1或2):")
if start_option == '1':
# 步骤1:创建知识库
print("\n步骤1:创建知识库")
file_path = input("请输入文件路径(包含手机信息的文本文件):")
kb_name = input("请输入知识库名称:")
workspace_id = os.environ.get('WORKSPACE_ID')
index_id = create_knowledge_base(file_path, workspace_id, kb_name)
if not index_id:
print("知识库创建失败,程序退出。")
return
print(f"知识库创建成功,索引ID为:{index_id}")
# 步骤2:创建 Assistant
print("\n步骤2:创建 Assistant")
assistant_id = create_assistant(index_id)
print(f"Assistant 创建成功!Assistant ID: {assistant_id}")
assistant = Assistants.get(assistant_id)
if not assistant:
print("Assistant 创建失败,程序退出。")
return
print("Assistant 创建成功!")
elif start_option == '2':
# 直接使用已有的 Assistant
print("\n使用已有的 Assistant")
assistant_id = input("请输入已有的 Assistant ID:")
assistant = Assistants.get(assistant_id)
if not assistant:
print("无法获取指定的 Assistant,程序退出。")
return
print("成功获取 Assistant!")
else:
print("无效的选项,程序退出。")
return
# 步骤3:与 Assistant 交互
interact_with_assistant(assistant)
if __name__ == '__main__':
main()
创建知识库
为了访问知识文档,RAG工具会检索百炼知识库的知识索引。上传您的知识文件到百炼知识库,并创建知识索引。以下是两种配置方法,您可以选择任意一种:通过API上传文件并创建索引,或通过百炼控制台上传文件并创建索引。
这里提供了一个代码示例,帮助您一键完成知识库搭建。您需要准备:
待上传的文档路径(例如:~/test.pdf)。
当前 API 仅支持 PDF、DOCX 等非结构化数据上传,结构化数据请使用控制台上传。
待创建的知识库名称。(例如:测试知识库)
第一步:创建阿里云百炼客户端
在开始上传文件和创建知识库之前,您需要初始化阿里云百炼的客户端,以完成身份验证和端点配置。您可以按照以下示例配置您的客户端:
def create_client() -> bailian20231229Client:
"""
创建并配置阿里云百炼客户端。
返回:
bailian20231229Client: 配置好的客户端。
"""
config = open_api_models.Config(
access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
)
config.endpoint = 'bailian.cn-beijing.aliyuncs.com'
return bailian20231229Client(config)
创建完成后,您将得到一个Client对象,用于后续的 API 调用。
第二步:申请文件上传租约
在上传文件前,您需要申请一个文件上传租约。租约是一个临时的授权,允许您在限定时间内上传文件到阿里云百炼服务。以下是申请文件上传租约的示例代码:
def apply_lease(client, category_id, file_name, file_md5, file_size, workspace_id):
"""
从阿里云百炼服务申请文件上传租约。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
category_id (str): 类别 ID。
file_name (str): 文件名称。
file_md5 (str): 文件的 MD5 哈希值。
file_size (int): 文件大小(以字节为单位)。
workspace_id (str): 业务空间 ID。
返回:
阿里云百炼服务的响应。
"""
headers = {}
request = bailian_20231229_models.ApplyFileUploadLeaseRequest(
file_name=file_name,
md_5=file_md5,
size_in_bytes=file_size,
)
runtime = util_models.RuntimeOptions()
return client.apply_file_upload_lease_with_options(category_id, workspace_id, request, headers, runtime)
您可能需要提供文件的 MD5 哈希值和文件大小,本文提供了简单的计算方法以供参考:
import hashlib
def calculate_md5(file_path: str) -> str:
"""
计算文件的 MD5 哈希值。
参数:
file_path (str): 文件路径。
返回:
str: 文件的 MD5 哈希值。
"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def get_file_size(file_path: str) -> int:
"""
获取文件大小(以字节为单位)。
参数:
file_path (str): 文件路径。
返回:
int: 文件大小(以字节为单位)。
"""
return os.path.getsize(file_path)
申请租约成功后,您将得到用于上传文件的临时请求头和临时URL,下一步骤中将使用这两项来上传文件。
第三步:上传文件到阿里云百炼
成功获取上传租约后,使用租约中的临时请求头和临时URL将文件上传至阿里云百炼服务器。
def upload_file(lease_id, upload_url, headers, file_path):
"""
将文件上传到阿里云百炼服务。
参数:
lease_id (str): 租约 ID。
upload_url (str): 上传 URL。
headers (dict): 上传请求的头部。
file_path (str): 文件路径。
"""
with open(file_path, 'rb') as f:
file_content = f.read()
upload_headers = {
"X-bailian-extra": headers["X-bailian-extra"],
"Content-Type": headers["Content-Type"]
}
response = requests.put(upload_url, data=file_content, headers=upload_headers)
response.raise_for_status()
第四步:添加文件到默认类目
百炼使用类目管理您上传的文件。因此,您需要将已上传的文件添加到类目中。百炼会自动创建一个默认类目(category_id='default'
),以下是将文件添加至默认类目的方法。
def add_file(client: bailian20231229Client, lease_id: str, parser: str, category_id: str, workspace_id: str):
"""
将文件添加到阿里云百炼服务。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
lease_id (str): 租约 ID。
parser (str): 用于文件的解析器。
category_id (str): 类别 ID。
workspace_id (str): 业务空间 ID。
返回:
阿里云百炼服务的响应。
"""
headers = {}
request = bailian_20231229_models.AddFileRequest(
lease_id=lease_id,
parser=parser,
category_id=category_id,
)
runtime = util_models.RuntimeOptions()
return client.add_file_with_options(workspace_id, request, headers, runtime)
完成添加后,百炼会自动开始解析您的文档。
第五步:查询文件的解析状态
您可以查询文件的解析状态,当解析完成后,您就可以创建知识库了。以下是查询文件的解析状态的示例代码:
def describe_file(client, workspace_id, file_id):
"""
在阿里云百炼服务中描述文件。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
workspace_id (str): 业务空间 ID。
file_id (str): 文件 ID。
返回:
阿里云百炼服务的响应。
"""
headers = {}
runtime = util_models.RuntimeOptions()
return client.describe_file_with_options(workspace_id, file_id, headers, runtime)
第六步:创建知识库索引
您可以为文件创建一个知识库索引,以便稍后在 Assistant API 中使用。
def create_index(client, workspace_id, file_id, name, structure_type, source_type, sink_type):
"""
为文件在阿里云百炼服务中创建知识库索引。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
workspace_id (str): 业务空间 ID。
file_id (str): 文件 ID。
name (str): 知识库索引名称。
structure_type (str): 知识库的数据类型。
source_type (str): 数据管理的数据类型。
sink_type (str): 知识库的向量存储类型。
返回:
阿里云百炼服务的响应。
"""
headers = {}
request = bailian_20231229_models.CreateIndexRequest(
structure_type=structure_type,
name=name,
source_type=source_type,
sink_type=sink_type,
document_ids=[file_id]
)
runtime = util_models.RuntimeOptions()
return client.create_index_with_options(workspace_id, request, headers, runtime)
第七步:提交索引任务
创建索引后,您需要提交索引任务,让阿里云百炼开始构建知识库索引。
def submit_index(client, workspace_id, index_id):
"""
向阿里云百炼服务提交索引任务。
参数:
client (bailian20231229Client): 阿里云百炼客户端。
workspace_id (str): 业务空间 ID。
index_id (str): 索引 ID。
返回:
阿里云百炼服务的响应。
"""
headers = {}
submit_index_job_request = bailian_20231229_models.SubmitIndexJobRequest(
index_id=index_id
)
runtime = util_models.RuntimeOptions()
return client.submit_index_job_with_options(workspace_id, submit_index_job_request, headers, runtime))
创建 Assistant
第八步:创建 Assistant
知识库创建完成后,您可以使用该知识库与Assistant API结合,创建一个智能助手。以下是创建Assistant的示例代码:
from dashscope import Assistants
def create_assistant(index_id):
assistant = Assistants.create(
model='qwen-plus',
name='智能手机选购助手',
instructions='你的任务是帮助用户选择手机。以下信息可能对你有帮助:${document1}。',
tools=[{
"type": "rag",
"prompt_ra": {
"pipeline_id": [index_id],
"multiknowledge_rerank_top_n": 10
}
}]
)
return assistant
创建对话管理
第九步:提交消息到 Assistant
在创建完 Assistant 之后,您可以通过提交用户消息并获取 Assistant 的回复。以下是向 Assistant 发送消息的代码示例:
from dashscope import Messages, Runs
def send_message(thread, assistant, message):
"""向 Assistant 发送消息并获取回复。"""
print(f"用户: {message}")
# 创建用户消息
message = Messages.create(thread_id=thread.id, content=message)
# 创建一个运行任务
run = Runs.create(thread_id=thread.id, assistant_id=assistant.id)
# 等待运行任务完成
Runs.wait(run.id, thread_id=thread.id)
# 获取消息列表
msgs = Messages.list(thread.id)
# 输出 Assistant 的回复
assistant_reply = msgs['data'][0]['content'][0]['text']['value']
print(f"助手: {assistant_reply}")
第十步:创建对话线程并进行交互
Assistant 通过对话线程与用户进行交互。您可以为每个对话创建一个新的线程,发送用户输入并接收回复。以下是管理对话线程并与 Assistant 交互的代码:
from dashscope import Threads
def interact_with_assistant(assistant):
"""与 Assistant 进行对话互动。"""
print("\n步骤3:与 Assistant 交互")
# 创建一个新的对话线程
thread = Threads.create()
while True:
user_input = input("\n请输入您的问题(输入 'quit' 退出):")
if user_input.lower() == 'quit':
break
try:
# 发送消息并获取回复
send_message(thread, assistant, user_input)
except Exception as e:
print(f"发送消息失败:{str(e)}。请稍后再试。")