Retrieval-augmented generation tool (being unpublished)

更新时间:
复制 MD 格式

The assistant API supports the retrieval-augmented generation (RAG) tool. This tool enables an agent to retrieve external knowledge, such as private product information or customer preferences. This topic uses a simple "phone shopping guide" example to demonstrate how to use the RAG tool.

Important

The assistant API is being unpublished. Migrate to the Responses API. It has multiple built-in tools and supports multi-turn context management as an alternative solution.

Function introduction

In traditional Large Language Model (LLM) applications, the LLM relies only on its pre-trained knowledge. It cannot dynamically retrieve external knowledge. As a result, it cannot answer questions about:

  • Timeliness: Events that happened after the LLM was pre-trained. For example, an LLM pre-trained in 2023 cannot answer, "What honors did the Chinese national sports team win in 2024?"

  • Private knowledge questions: These are questions about information shared within a limited scope. For example, "What are the company's rules and regulations?" and "Who is responsible for the development of AI products?".

  • Long document processing: Retrieving precise information from many documents. For example, "Teach me how to deploy an LLM. Here are a few reference books."

Feature comparison with console RAG applications

Setting

Assistant API RAG application

RAG applications in the Model Studio console

Knowledge base operations

Supported

Supported

Recall count

Supported

Supported

Other advanced settings supported by console RAG applications

Not supported

Supported

Getting started

In this example, you will use a sample knowledge file to create a test knowledge base. You will also build an agent (Assistant) that helps users shop for a new phone.

Note

This example only shows how to create a new knowledge base. For more information about adding files to an existing knowledge base or retrieving, updating, and deleting a knowledge base, see Knowledge Base API Guide.

Before you begin

Before you use the assistant RAG feature, make sure that you have:

  • You have registered an Alibaba Cloud account and activated Model Studio.

  • Installed the required Python libraries.

    pip install alibabacloud_bailian20231229==1.8.2 dashscope
  • Prepared the document that contains phone information, Model Studio Series Phone Product Introduction.docx, to create the knowledge base.

  • In addition, set the following environment variables:

    export ALIBABA_CLOUD_ACCESS_KEY_ID='Your Alibaba Cloud AccessKey ID'
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET='Your Alibaba Cloud AccessKey secret'
    export DASHSCOPE_API_KEY='Your Alibaba Cloud Model Studio API key'
    export WORKSPACE_ID='Your Alibaba Cloud Model Studio workspace ID'
    Important

    DASHSCOPE_API_KEY and ALIBABA_CLOUD_ACCESS_KEY_ID/ALIBABA_CLOUD_ACCESS_KEY_SECRET must belong to the same Alibaba Cloud account. Otherwise, the RAG interaction will fail.

Complete code example

Non-streaming output

import os
import hashlib
import requests
import time
from alibabacloud_bailian20231229.client import Client as bailian20231229Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_bailian20231229 import models as bailian_20231229_models
from alibabacloud_tea_util import models as util_models
from dashscope import Assistants, Messages, Runs, Threads
import sys
def check_environment_variables():
    """Check for and prompt to set necessary environment variables."""
    required_vars = {
        'ALIBABA_CLOUD_ACCESS_KEY_ID': 'Alibaba Cloud AccessKey ID',
        'ALIBABA_CLOUD_ACCESS_KEY_SECRET': 'Alibaba Cloud AccessKey secret',
        'DASHSCOPE_API_KEY': 'Alibaba Cloud Model Studio API key',
        'WORKSPACE_ID': 'Alibaba Cloud Model Studio workspace ID'
    }
    missing_vars = []
    for var, description in required_vars.items():
        if not os.environ.get(var):
            missing_vars.append(var)
            print(f"Error: Please set the {var} environment variable ({description})")
    if missing_vars:
        print("\nYou can use the following commands to set the environment variables:")
        for var in missing_vars:
            print(f"export {var}='Your {required_vars[var]}'")
        return False
    return True
# Part 1: Create a knowledge base using Alibaba Cloud Model Studio
def create_client() -> bailian20231229Client:
    """
    Create and configure the Alibaba Cloud Model Studio client.
    Returns:
        bailian20231229Client: The configured client.
    """
    config = open_api_models.Config(
        access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
    )
    config.endpoint = 'bailian.cn-beijing.aliyuncs.com'
    return bailian20231229Client(config)
def calculate_md5(file_path: str) -> str:
    """
    Calculate the MD5 hash of a file.
    Parameters:
        file_path (str): The file path.
    Returns:
        str: The MD5 hash of the file.
    """
    hash_md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()
def get_file_size(file_path: str) -> int:
    """
    Get the file size in bytes.
    Parameters:
        file_path (str): The file path.
    Returns:
        int: The file size in bytes.
    """
    return os.path.getsize(file_path)
def apply_lease(client, category_id, file_name, file_md5, file_size, workspace_id):
    """
    Request a file upload lease from the Alibaba Cloud Model Studio service.
    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        category_id (str): The category ID.
        file_name (str): The file name.
        file_md5 (str): The MD5 hash of the file.
        file_size (int): The file size in bytes.
        workspace_id (str): The workspace ID.
    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    request = bailian_20231229_models.ApplyFileUploadLeaseRequest(
        file_name=file_name,
        md_5=file_md5,
        size_in_bytes=file_size,
    )
    runtime = util_models.RuntimeOptions()
    return client.apply_file_upload_lease_with_options(category_id, workspace_id, request, headers, runtime)
def upload_file(lease_id, upload_url, headers, file_path):
    """
    Upload a file to the Alibaba Cloud Model Studio service.
    Parameters:
        lease_id (str): The lease ID.
        upload_url (str): The upload URL.
        headers (dict): The headers for the upload request.
        file_path (str): The file path.
    """
    with open(file_path, 'rb') as f:
        file_content = f.read()
    upload_headers = {
        "X-bailian-extra": headers["X-bailian-extra"],
        "Content-Type": headers["Content-Type"]
    }
    response = requests.put(upload_url, data=file_content, headers=upload_headers)
    response.raise_for_status()
def add_file(client: bailian20231229Client, lease_id: str, parser: str, category_id: str, workspace_id: str):
    """
    Add a file to the Alibaba Cloud Model Studio service.
    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        lease_id (str): The lease ID.
        parser (str): The parser for the file.
        category_id (str): The category ID.
        workspace_id (str): The workspace ID.
    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    request = bailian_20231229_models.AddFileRequest(
        lease_id=lease_id,
        parser=parser,
        category_id=category_id,
    )
    runtime = util_models.RuntimeOptions()
    return client.add_file_with_options(workspace_id, request, headers, runtime)
def describe_file(client, workspace_id, file_id):
    """
    Describe a file in the Alibaba Cloud Model Studio service.
    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        file_id (str): The file ID.
    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    runtime = util_models.RuntimeOptions()
    return client.describe_file_with_options(workspace_id, file_id, headers, runtime)
def create_index(client, workspace_id, file_id, name, structure_type, source_type, sink_type):
    """
    Create a knowledge base index for a file in the Alibaba Cloud Model Studio service.
    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        file_id (str): The file ID.
        name (str): The name of the knowledge base index.
        structure_type (str): The data type of the knowledge base.
        source_type (str): The data type for data management.
        sink_type (str): The vector storage class of the knowledge base.
    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    request = bailian_20231229_models.CreateIndexRequest(
        structure_type=structure_type,
        name=name,
        source_type=source_type,
        sink_type=sink_type,
        document_ids=[file_id]
    )
    runtime = util_models.RuntimeOptions()
    return client.create_index_with_options(workspace_id, request, headers, runtime)
def submit_index(client, workspace_id, index_id):
    """
    Submit an index task to the Alibaba Cloud Model Studio service.
    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        index_id (str): The index ID.
    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    submit_index_job_request = bailian_20231229_models.SubmitIndexJobRequest(
        index_id=index_id
    )
    runtime = util_models.RuntimeOptions()
    return client.submit_index_job_with_options(workspace_id, submit_index_job_request, headers, runtime)
def get_index_job_status(client, workspace_id, job_id, index_id):
    """
    Get the status of an index task in the Alibaba Cloud Model Studio service.
    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        job_id (str): The task ID.
        index_id (str): The index ID.
    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    get_index_job_status_request = bailian_20231229_models.GetIndexJobStatusRequest(
        index_id=index_id,
        job_id=job_id
    )
    runtime = util_models.RuntimeOptions()
    return client.get_index_job_status_with_options(workspace_id, get_index_job_status_request, headers, runtime)
def create_knowledge_base(
    file_path: str,
    workspace_id: str,
    name: str
):
    """
    Create a knowledge base using the Alibaba Cloud Model Studio service.
    Parameters:
        file_path (str): The file path.
        workspace_id (str): The workspace ID.
        name (str): The knowledge base name.
    Returns:
        str or None: The index ID if successful, otherwise None.
    """
    # Set default values
    category_id = 'default'
    parser = 'DASHSCOPE_DOCMIND'
    source_type = 'DATA_CENTER_FILE'
    structure_type = 'unstructured'
    sink_type = 'DEFAULT'
    try:
        # Step 1: Create the client
        print("Step 1: Create the Alibaba Cloud Model Studio client")
        client = create_client()
        # Step 2: Prepare file information
        print("Step 2: Prepare file information")
        file_name = os.path.basename(file_path)
        file_md5 = calculate_md5(file_path)
        file_size = get_file_size(file_path)
        # Step 3: Request an upload lease
        print("Step 3: Request an upload lease from Alibaba Cloud Model Studio")
        lease_response = apply_lease(client, category_id, file_name, file_md5, file_size, workspace_id)
        lease_id = lease_response.body.data.file_upload_lease_id
        upload_url = lease_response.body.data.param.url
        upload_headers = lease_response.body.data.param.headers
        # Step 4: Upload the file
        print("Step 4: Upload the file to Alibaba Cloud Model Studio")
        upload_file(lease_id, upload_url, upload_headers, file_path)
        # Step 5: Add the file to the server
        print("Step 5: Add the file to the Alibaba Cloud Model Studio server")
        add_response = add_file(client, lease_id, parser, category_id, workspace_id)
        file_id = add_response.body.data.file_id
        # Step 6: Check the file status
        print("Step 6: Check the file status in Alibaba Cloud Model Studio")
        while True:
            describe_response = describe_file(client, workspace_id, file_id)
            status = describe_response.body.data.status
            print(f"Current file status: {status}")
            if status == 'INIT':
                print("File is waiting to be parsed. Please wait...")
            elif status == 'PARSING':
                print("File is being parsed. Please wait...")
            elif status == 'PARSE_SUCCESS':
                print("File parsing completed!")
                break
            else:
                print(f"Unknown file status: {status}. Please contact technical support.")
                return None
            time.sleep(5)
        # Step 7: Create a knowledge file index
        print("Step 7: Create a knowledge file index in Alibaba Cloud Model Studio")
        index_response = create_index(client, workspace_id, file_id, name, structure_type, source_type, sink_type)
        index_id = index_response.body.data.id
        # Step 8: Submit the index task
        print("Step 8: Submit the index task to Alibaba Cloud Model Studio")
        submit_response = submit_index(client, workspace_id, index_id)
        job_id = submit_response.body.data.id
        # Step 9: Get the index task status
        print("Step 9: Get the Alibaba Cloud Model Studio index task status")
        while True:
            get_index_job_status_response = get_index_job_status(client, workspace_id, job_id, index_id)
            status = get_index_job_status_response.body.data.status
            print(f"Current index task status: {status}")
            if status == 'COMPLETED':
                break
            time.sleep(5)
        print("Alibaba Cloud Model Studio knowledge base created successfully!")
        return index_id
    except Exception as e:
        print(f"An error occurred: {e}")
        return None
# Part 2: Use the knowledge base in the assistant API
def create_assistant(index_id):
    """Create an assistant that uses the specified knowledge base."""
    assistant = Assistants.create(
        model='qwen-plus',  # Model list: https://www.alibabacloud.com/help/en/model-studio/getting-started/models
        name='Smartphone Shopping Assistant',
        description='An intelligent assistant to help users choose a phone.',
        instructions='You are a phone shopping guide. Your task is to help users choose a satisfactory phone. Use the provided knowledge base to answer user questions. The following information may be helpful: ${documents}.',
        tools=[
            {
                "type": "rag",  # Specify the use of Retrieval-Augmented Generation (RAG) mode
                "prompt_ra": {
                    "pipeline_id": [index_id],  # Specify the ID of the knowledge base index to use
                    "multiknowledge_rerank_top_n": 10,  # The number of top N results to return during multi-source reranking
                    "rerank_top_n": 5,  # The number of top N results to return after final reranking
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query_word": {
                                "type": "str",
                                "value": "${documents}"  # Use a dynamic placeholder that will be replaced by the actual query content
                            }
                        }
                    }
                }
            },
        ]
    )
    return assistant.id
class SuppressPrint:
    def __enter__(self):
        self._original_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')
    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stdout = self._original_stdout
def send_message(thread, assistant, message):
    """Send a message to the assistant and get a reply."""
    print(f"User: {message}")
    message = Messages.create(thread_id=thread.id, content=message)
    run = Runs.create(thread_id=thread.id, assistant_id=assistant.id)
    
    with SuppressPrint():
        run_status = Runs.wait(run.id, thread_id=thread.id)
    
    msgs = Messages.list(thread.id)
    assistant_reply = msgs['data'][0]['content'][0]['text']['value']
    print(f"Assistant: {assistant_reply}")
def interact_with_assistant(assistant):
    print("\nStep 3: Interact with the assistant")
    thread = Threads.create()  # Create a new conversation thread
    while True:
        user_input = input("\nEnter your question (type 'quit' to exit): ")
        if user_input.lower() == 'quit':
            break
        
        max_retries = 3
        for attempt in range(max_retries):
            try:
                send_message(thread, assistant, user_input)
                break
            except Exception as e:
                if attempt < max_retries - 1:
                    print(f"Failed to send message, retrying... (Attempt {attempt + 2}/{max_retries})")
                else:
                    print(f"Failed to send message: {str(e)}. Please try again later.")
    print("Thank you for using the Alibaba Cloud Model Studio RAG tutorial!")
def main():
    print("Welcome to the Alibaba Cloud Model Studio RAG tutorial!")
    
    if not check_environment_variables():
        return
    start_option = input("Select a starting step:\n1. Start from scratch (create knowledge base and assistant)\n2. Start interacting with the assistant directly\nEnter option (1 or 2): ")
    if start_option == '1':
        # Step 1: Create the knowledge base
        print("\nStep 1: Create the knowledge base")
        file_path = input("Enter the file path (text file containing phone information): ")
        kb_name = input("Enter the knowledge base name: ")
        workspace_id = os.environ.get('WORKSPACE_ID')
        index_id = create_knowledge_base(file_path, workspace_id, kb_name)
        if not index_id:
            print("Failed to create the knowledge base. Exiting the program.")
            return
        print(f"Knowledge base created successfully. Index ID: {index_id}")
        # Step 2: Create the assistant
        print("\nStep 2: Create the assistant")
        assistant_id = create_assistant(index_id)
        print(f"Assistant created successfully! Assistant ID: {assistant_id}")
        assistant = Assistants.get(assistant_id)
        if not assistant:
            print("Failed to create the assistant. Exiting the program.")
            return
        print("Assistant created successfully!")
    elif start_option == '2':
        # Use an existing assistant
        print("\nUse an existing assistant")
        assistant_id = input("Enter the existing assistant ID: ")
        assistant = Assistants.get(assistant_id)
        if not assistant:
            print("Could not retrieve the specified assistant. Exiting the program.")
            return
        print("Successfully retrieved the assistant!")
    else:
        print("Invalid option. Exiting the program.")
        return
    # Step 3: Interact with the assistant
    interact_with_assistant(assistant)
if __name__ == '__main__':
    main()

Streaming output

import os
import hashlib
import requests
import time
from alibabacloud_bailian20231229.client import Client as bailian20231229Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_bailian20231229 import models as bailian_20231229_models
from alibabacloud_tea_util import models as util_models
from dashscope import Assistants, Messages, Runs, Threads
import sys

def check_environment_variables():
    """Check for and prompt to set necessary environment variables."""
    required_vars = {
        'ALIBABA_CLOUD_ACCESS_KEY_ID': 'Alibaba Cloud AccessKey ID',
        'ALIBABA_CLOUD_ACCESS_KEY_SECRET': 'Alibaba Cloud AccessKey secret',
        'DASHSCOPE_API_KEY': 'Alibaba Cloud Model Studio API key',
        'WORKSPACE_ID': 'Alibaba Cloud Model Studio workspace ID'
    }

    missing_vars = []
    for var, description in required_vars.items():
        if not os.environ.get(var):
            missing_vars.append(var)
            print(f"Error: Please set the {var} environment variable ({description})")

    if missing_vars:
        print("\nYou can use the following commands to set the environment variables:")
        for var in missing_vars:
            print(f"export {var}='Your {required_vars[var]}'")
        return False
    return True

# Part 1: Create a knowledge base using Alibaba Cloud Model Studio

def create_client() -> bailian20231229Client:
    """
    Create and configure the Alibaba Cloud Model Studio client.

    Returns:
        bailian20231229Client: The configured client.
    """
    config = open_api_models.Config(
        access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
    )
    config.endpoint = 'bailian.cn-beijing.aliyuncs.com'
    return bailian20231229Client(config)

def calculate_md5(file_path: str) -> str:
    """
    Calculate the MD5 hash of a file.

    Parameters:
        file_path (str): The file path.

    Returns:
        str: The MD5 hash of the file.
    """
    hash_md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

def get_file_size(file_path: str) -> int:
    """
    Get the file size in bytes.

    Parameters:
        file_path (str): The file path.

    Returns:
        int: The file size in bytes.
    """
    return os.path.getsize(file_path)

def apply_lease(client, category_id, file_name, file_md5, file_size, workspace_id):
    """
    Request a file upload lease from the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        category_id (str): The category ID.
        file_name (str): The file name.
        file_md5 (str): The MD5 hash of the file.
        file_size (int): The file size in bytes.
        workspace_id (str): The workspace ID.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    request = bailian_20231229_models.ApplyFileUploadLeaseRequest(
        file_name=file_name,
        md_5=file_md5,
        size_in_bytes=file_size,
    )
    runtime = util_models.RuntimeOptions()
    return client.apply_file_upload_lease_with_options(category_id, workspace_id, request, headers, runtime)

def upload_file(lease_id, upload_url, headers, file_path):
    """
    Upload a file to the Alibaba Cloud Model Studio service.

    Parameters:
        lease_id (str): The lease ID.
        upload_url (str): The upload URL.
        headers (dict): The headers for the upload request.
        file_path (str): The file path.
    """
    with open(file_path, 'rb') as f:
        file_content = f.read()
    upload_headers = {
        "X-bailian-extra": headers["X-bailian-extra"],
        "Content-Type": headers["Content-Type"]
    }
    response = requests.put(upload_url, data=file_content, headers=upload_headers)
    response.raise_for_status()

def add_file(client: bailian20231229Client, lease_id: str, parser: str, category_id: str, workspace_id: str):
    """
    Add a file to the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        lease_id (str): The lease ID.
        parser (str): The parser for the file.
        category_id (str): The category ID.
        workspace_id (str): The workspace ID.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    request = bailian_20231229_models.AddFileRequest(
        lease_id=lease_id,
        parser=parser,
        category_id=category_id,
    )
    runtime = util_models.RuntimeOptions()
    return client.add_file_with_options(workspace_id, request, headers, runtime)

def describe_file(client, workspace_id, file_id):
    """
    Describe a file in the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        file_id (str): The file ID.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    runtime = util_models.RuntimeOptions()
    return client.describe_file_with_options(workspace_id, file_id, headers, runtime)

def create_index(client, workspace_id, file_id, name, structure_type, source_type, sink_type):
    """
    Create a knowledge base index for a file in the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        file_id (str): The file ID.
        name (str): The name of the knowledge base index.
        structure_type (str): The data type of the knowledge base.
        source_type (str): The data type for data management.
        sink_type (str): The vector storage class of the knowledge base.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    request = bailian_20231229_models.CreateIndexRequest(
        structure_type=structure_type,
        name=name,
        source_type=source_type,
        sink_type=sink_type,
        document_ids=[file_id]
    )
    runtime = util_models.RuntimeOptions()
    return client.create_index_with_options(workspace_id, request, headers, runtime)

def submit_index(client, workspace_id, index_id):
    """
    Submit an index task to the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        index_id (str): The index ID.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    submit_index_job_request = bailian_20231229_models.SubmitIndexJobRequest(
        index_id=index_id
    )
    runtime = util_models.RuntimeOptions()
    return client.submit_index_job_with_options(workspace_id, submit_index_job_request, headers, runtime)

def get_index_job_status(client, workspace_id, job_id, index_id):
    """
    Get the status of an index task in the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        job_id (str): The task ID.
        index_id (str): The index ID.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    get_index_job_status_request = bailian_20231229_models.GetIndexJobStatusRequest(
        index_id=index_id,
        job_id=job_id
    )
    runtime = util_models.RuntimeOptions()
    return client.get_index_job_status_with_options(workspace_id, get_index_job_status_request, headers, runtime)

def create_knowledge_base(
    file_path: str,
    workspace_id: str,
    name: str
):
    """
    Create a knowledge base using the Alibaba Cloud Model Studio service.

    Parameters:
        file_path (str): The file path.
        workspace_id (str): The workspace ID.
        name (str): The knowledge base name.

    Returns:
        str or None: The index ID if successful, otherwise None.
    """
    # Set default values
    category_id = 'default'
    parser = 'DASHSCOPE_DOCMIND'
    source_type = 'DATA_CENTER_FILE'
    structure_type = 'unstructured'
    sink_type = 'DEFAULT'

    try:
        # Step 1: Create the client
        print("Step 1: Create the Alibaba Cloud Model Studio client")
        client = create_client()

        # Step 2: Prepare file information
        print("Step 2: Prepare file information")
        file_name = os.path.basename(file_path)
        file_md5 = calculate_md5(file_path)
        file_size = get_file_size(file_path)

        # Step 3: Request an upload lease
        print("Step 3: Request an upload lease from Alibaba Cloud Model Studio")
        lease_response = apply_lease(client, category_id, file_name, file_md5, file_size, workspace_id)
        lease_id = lease_response.body.data.file_upload_lease_id
        upload_url = lease_response.body.data.param.url
        upload_headers = lease_response.body.data.param.headers

        # Step 4: Upload the file
        print("Step 4: Upload the file to Alibaba Cloud Model Studio")
        upload_file(lease_id, upload_url, upload_headers, file_path)

        # Step 5: Add the file to the server
        print("Step 5: Add the file to the Alibaba Cloud Model Studio server")
        add_response = add_file(client, lease_id, parser, category_id, workspace_id)
        file_id = add_response.body.data.file_id

        # Step 6: Check the file status
        print("Step 6: Check the file status in Alibaba Cloud Model Studio")
        while True:
            describe_response = describe_file(client, workspace_id, file_id)
            status = describe_response.body.data.status
            print(f"Current file status: {status}")
            if status == 'INIT':
                print("File is waiting to be parsed. Please wait...")
            elif status == 'PARSING':
                print("File is being parsed. Please wait...")
            elif status == 'PARSE_SUCCESS':
                print("File parsing completed!")
                break
            else:
                print(f"Unknown file status: {status}. Please contact technical support.")
                return None
            time.sleep(5)

        # Step 7: Create a knowledge file index
        print("Step 7: Create a knowledge file index in Alibaba Cloud Model Studio")
        index_response = create_index(client, workspace_id, file_id, name, structure_type, source_type, sink_type)
        index_id = index_response.body.data.id

        # Step 8: Submit the index task
        print("Step 8: Submit the index task to Alibaba Cloud Model Studio")
        submit_response = submit_index(client, workspace_id, index_id)
        job_id = submit_response.body.data.id

        # Step 9: Get the index task status
        print("Step 9: Get the Alibaba Cloud Model Studio index task status")
        while True:
            get_index_job_status_response = get_index_job_status(client, workspace_id, job_id, index_id)
            status = get_index_job_status_response.body.data.status
            print(f"Current index task status: {status}")
            if status == 'COMPLETED':
                break
            time.sleep(5)

        print("Alibaba Cloud Model Studio knowledge base created successfully!")
        return index_id

    except Exception as e:
        print(f"An error occurred: {e}")
        return None

# Part 2: Use the knowledge base in the assistant API

def create_assistant(index_id):
    """Create an assistant that uses the specified knowledge base."""
    assistant = Assistants.create(
        model='qwen-plus',  # Model list: https://www.alibabacloud.com/help/en/model-studio/getting-started/models
        name='Smartphone Shopping Assistant',
        description='An intelligent assistant to help users choose a phone.',
        instructions='You are a phone shopping guide. Your task is to help users choose a satisfactory phone. Use the provided knowledge base to answer user questions. The following information may be helpful: ${document1}.',
        tools=[
            {
                "type": "rag",  # Specify the use of Retrieval-Augmented Generation (RAG) mode
                "prompt_ra": {
                    "pipeline_id": [index_id],  # Specify the ID of the knowledge base index to use
                    "multiknowledge_rerank_top_n": 10,  # The number of top N results to return during multi-source reranking
                    "rerank_top_n": 5,  # The number of top N results to return after final reranking
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query_word": {
                                "type": "str",
                                "value": "${document1}"  # Use a dynamic placeholder that will be replaced by the actual query content
                            }
                        }
                    }
                }
            },
        ]
    )
    return assistant.id

def send_message(thread, assistant, message):
    """Send a message to the assistant and get a reply."""
    print(f"User: {message}")

    message = Messages.create(thread_id=thread.id, content=message)
    
    # Start a streaming conversation, set stream=True to get real-time responses
    run_iterator = Runs.create(
        thread_id=thread.id,
        assistant_id=assistant.id,
        stream=True
    )
    
    print("Assistant: ", end='', flush=True)  # Print the assistant identifier without a new line
    
    try:
        # Process different types of streaming response events
        for event, data in run_iterator:
            # Print a new line when a step is completed
            if event == 'thread.run.step.completed':
                print("\n", end='')
            # Process the assistant's text message
            if event == 'thread.message.delta':
                print(data.delta.content.text.value, end='', flush=True)
            
    
    # Exception handling: Supports user interruption and error handling            
    except KeyboardInterrupt:
        run_iterator.close()
        print("\nOutput interrupted by user")
    except Exception as e:
        print(f"\nAn error was encountered: {str(e)}")
        run_iterator.close()
    finally:
        print()  # Ensure there is a new line at the end

def interact_with_assistant(assistant):
    print("\nStep 3: Interact with the assistant")
    thread = Threads.create()  # Create a new conversation thread
    while True:
        user_input = input("\nEnter your question (type 'quit' to exit): ")
        if user_input.lower() == 'quit':
            break
        
        max_retries = 3
        for attempt in range(max_retries):
            try:
                send_message(thread, assistant, user_input)
                break
            except Exception as e:
                if attempt < max_retries - 1:
                    print(f"\nFailed to send message, retrying... (Attempt {attempt + 2}/{max_retries})")
                    time.sleep(1)  # Add a delay to avoid immediate retries
                else:
                    print(f"\nFailed to send message: {str(e)}. Please try again later.")

    print("Thank you for using the Alibaba Cloud Model Studio RAG tutorial!")

def main():
    print("Welcome to the Alibaba Cloud Model Studio RAG tutorial!")
    
    if not check_environment_variables():
        return

    start_option = input("Select a starting step:\n1. Start from scratch (create knowledge base and assistant)\n2. Start interacting with the assistant directly\nEnter option (1 or 2): ")

    if start_option == '1':
        # Step 1: Create the knowledge base
        print("\nStep 1: Create the knowledge base")
        file_path = input("Enter the file path (text file containing phone information): ")
        kb_name = input("Enter the knowledge base name: ")
        workspace_id = os.environ.get('WORKSPACE_ID')

        index_id = create_knowledge_base(file_path, workspace_id, kb_name)
        if not index_id:
            print("Failed to create the knowledge base. Exiting the program.")
            return

        print(f"Knowledge base created successfully. Index ID: {index_id}")

        # Step 2: Create the assistant
        print("\nStep 2: Create the assistant")
        assistant_id = create_assistant(index_id)
        print(f"Assistant created successfully! Assistant ID: {assistant_id}")
        assistant = Assistants.get(assistant_id)
        if not assistant:
            print("Failed to create the assistant. Exiting the program.")
            return

        print("Assistant created successfully!")

    elif start_option == '2':
        # Use an existing assistant
        print("\nUse an existing assistant")
        assistant_id = input("Enter the existing assistant ID: ")
        assistant = Assistants.get(assistant_id)
        if not assistant:
            print("Could not retrieve the specified assistant. Exiting the program.")
            return
        print("Successfully retrieved the assistant!")

    else:
        print("Invalid option. Exiting the program.")
        return

    # Step 3: Interact with the assistant
    interact_with_assistant(assistant)

if __name__ == '__main__':
    main()

Create a knowledge base

To access knowledge documents, the RAG tool retrieves a knowledge index from an Alibaba Cloud Model Studio knowledge base. Upload your knowledge files to a Model Studio knowledge base and create a knowledge index. You can use one of the following two methods: Upload files and create an index using an API, or Upload files and create an index using the Model Studio console.

A code example is provided to help you quickly set up a knowledge base. You need to prepare:

  • The path of the document to upload (for example, ~/test.pdf).

    The current API only supports uploading unstructured data, such as PDF and DOCX files. To upload structured data, use the console.
  • The name of the knowledge base to create (for example, Test Knowledge Base).

Step 1: Create an Alibaba Cloud Model Studio client

Before you upload files and create a knowledge base, you must initialize the Alibaba Cloud Model Studio client to complete identity verification and endpoint configuration. You can configure your client as shown in the following example:

def create_client() -> bailian20231229Client:
    """
    Create and configure the Alibaba Cloud Model Studio client.

    Returns:
        bailian20231229Client: The configured client.
    """
    config = open_api_models.Config(
        access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
    )
    config.endpoint = 'bailian.cn-beijing.aliyuncs.com'
    return bailian20231229Client(config)

After the client is created, a Client object is returned for subsequent API calls.

Step 2: Request a file upload lease

Before you upload a file, you must request a file upload lease. A lease is a temporary authorization that lets you upload a file to the Alibaba Cloud Model Studio service within a specific time frame. The following code shows an example of how to request a file upload lease:

def apply_lease(client, category_id, file_name, file_md5, file_size, workspace_id):
    """
    Request a file upload lease from the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        category_id (str): The category ID.
        file_name (str): The file name.
        file_md5 (str): The MD5 hash of the file.
        file_size (int): The file size in bytes.
        workspace_id (str): The workspace ID.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    request = bailian_20231229_models.ApplyFileUploadLeaseRequest(
        file_name=file_name,
        md_5=file_md5,
        size_in_bytes=file_size,
    )
    runtime = util_models.RuntimeOptions()
    return client.apply_file_upload_lease_with_options(category_id, workspace_id, request, headers, runtime)

You may need to provide the MD5 hash and size of the file. This topic provides simple calculation methods:

import hashlib

def calculate_md5(file_path: str) -> str:
    """
    Calculate the MD5 hash of a file.

    Parameters:
        file_path (str): The file path.

    Returns:
        str: The MD5 hash of the file.
    """
    hash_md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

def get_file_size(file_path: str) -> int:
    """
    Get the file size in bytes.

    Parameters:
        file_path (str): The file path.

    Returns:
        int: The file size in bytes.
    """
    return os.path.getsize(file_path)

After you successfully request a lease, you will receive a temporary request header and a temporary URL for uploading the file. You will use them in the next step to upload the file.

Step 3: Upload the file to Alibaba Cloud Model Studio

After you obtain the upload lease, use the temporary request header and URL from the lease to upload the file to the Alibaba Cloud Model Studio server.

def upload_file(lease_id, upload_url, headers, file_path):
    """
    Upload a file to the Alibaba Cloud Model Studio service.

    Parameters:
        lease_id (str): The lease ID.
        upload_url (str): The upload URL.
        headers (dict): The headers for the upload request.
        file_path (str): The file path.
    """
    with open(file_path, 'rb') as f:
        file_content = f.read()
    upload_headers = {
        "X-bailian-extra": headers["X-bailian-extra"],
        "Content-Type": headers["Content-Type"]
    }
    response = requests.put(upload_url, data=file_content, headers=upload_headers)
    response.raise_for_status()   

Step 4: Add the file to the default category

Model Studio uses categories to manage your uploaded files. Therefore, you must add the uploaded file to a category. Model Studio automatically creates a default category (category_id='default'). Use the following method to add the file to the default category.

def add_file(client: bailian20231229Client, lease_id: str, parser: str, category_id: str, workspace_id: str):
    """
    Add a file to the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        lease_id (str): The lease ID.
        parser (str): The parser for the file.
        category_id (str): The category ID.
        workspace_id (str): The workspace ID.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    request = bailian_20231229_models.AddFileRequest(
        lease_id=lease_id,
        parser=parser,
        category_id=category_id,
    )
    runtime = util_models.RuntimeOptions()
    return client.add_file_with_options(workspace_id, request, headers, runtime)

After the file is added, Alibaba Cloud Model Studio automatically starts parsing the document.

Step 5: Query the parsing status of the file

You can query the parsing status of the file. After the file is parsed, you can create the knowledge base. The following code shows an example of how to query the parsing status of the file:

def describe_file(client, workspace_id, file_id):
    """
    Describe a file in the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        file_id (str): The file ID.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    runtime = util_models.RuntimeOptions()
    return client.describe_file_with_options(workspace_id, file_id, headers, runtime)

Step 6: Create a knowledge base index

You can create a knowledge base index for the file for use with the assistant API.

def create_index(client, workspace_id, file_id, name, structure_type, source_type, sink_type):
    """
    Create a knowledge base index for a file in the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        file_id (str): The file ID.
        name (str): The name of the knowledge base index.
        structure_type (str): The data type of the knowledge base.
        source_type (str): The data type for data management.
        sink_type (str): The vector storage class of the knowledge base.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    request = bailian_20231229_models.CreateIndexRequest(
        structure_type=structure_type,
        name=name,
        source_type=source_type,
        sink_type=sink_type,
        document_ids=[file_id]
    )
    runtime = util_models.RuntimeOptions()
    return client.create_index_with_options(workspace_id, request, headers, runtime)

Step 7: Submit the index task

After you create the index, you must submit an index task so that Alibaba Cloud Model Studio can start building the knowledge base index.

def submit_index(client, workspace_id, index_id):
    """
    Submit an index task to the Alibaba Cloud Model Studio service.

    Parameters:
        client (bailian20231229Client): The Alibaba Cloud Model Studio client.
        workspace_id (str): The workspace ID.
        index_id (str): The index ID.

    Returns:
        The response from the Alibaba Cloud Model Studio service.
    """
    headers = {}
    submit_index_job_request = bailian_20231229_models.SubmitIndexJobRequest(
        index_id=index_id
    )
    runtime = util_models.RuntimeOptions()
    return client.submit_index_job_with_options(workspace_id, submit_index_job_request, headers, runtime)

Create an assistant

Step 8: Create an assistant

After the knowledge base is created, you can use it with the assistant API to create an intelligent assistant. The following code shows an example of how to create an assistant:

def create_assistant(index_id):
    """Create an assistant that uses the specified knowledge base."""
    assistant = Assistants.create(
        model='qwen-plus',  # Model list: https://www.alibabacloud.com/help/en/model-studio/getting-started/models
        name='Smartphone Shopping Assistant',
        description='An intelligent assistant to help users choose a phone.',
        instructions='You are a phone shopping guide. Your task is to help users choose a satisfactory phone. Use the provided knowledge base to answer user questions. The following information may be helpful: ${documents}.',
        tools=[
            {
                "type": "rag",  # Specify the use of Retrieval-Augmented Generation (RAG) mode
                "prompt_ra": {
                    "pipeline_id": [index_id],  # Specify the ID of the knowledge base to use. You can get it from the return value in Step 6: Create a knowledge base index.
                                                # You can also get it from the console: https://bailian.console.alibabacloud.com/#/knowledge-base
                    "multiknowledge_rerank_top_n": 10,  # The number of top N results to return during multi-source reranking
                    "rerank_top_n": 5,  # The number of top N results to return after final reranking
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "query_word": {
                                "type": "str",
                                "value": "${documents}"  # Use a dynamic placeholder that will be replaced by the actual query content
                            }
                        }
                    }
                }
            },
        ]
    )
    return assistant.id

Manage conversations

Step 9: Submit a message to the assistant

After you create the assistant, you can submit a user message and receive the assistant's reply. The following code shows an example of how to send a message to the assistant:

from dashscope import Messages, Runs

def send_message(thread, assistant, message):
    """Send a message to the assistant and get a reply."""
    print(f"User: {message}")

    # Create a user message
    message = Messages.create(thread_id=thread.id, content=message)
    
    # Create a run task
    run = Runs.create(thread_id=thread.id, assistant_id=assistant.id)

    # Wait for the run task to complete
    Runs.wait(run.id, thread_id=thread.id)
    
    # Get the message list
    msgs = Messages.list(thread.id)
    
    # Output the assistant's reply
    assistant_reply = msgs['data'][0]['content'][0]['text']['value']
    print(f"Assistant: {assistant_reply}")

Step 10: Create a conversation thread and interact

The assistant interacts with users through conversation threads. You can create a new thread for each conversation, send user input, and receive replies. The following code shows how to manage conversation threads and interact with the assistant:

from dashscope import Threads

def interact_with_assistant(assistant):
    """Interact with the assistant in a conversation."""
    print("\nStep 3: Interact with the assistant")
    
    # Create a new conversation thread
    thread = Threads.create()
    
    while True:
        user_input = input("\nEnter your question (type 'quit' to exit): ")
        if user_input.lower() == 'quit':
            break
        
        try:
            # Send the message and get the reply
            send_message(thread, assistant, user_input)
        except Exception as e:
            print(f"Failed to send message: {str(e)}. Please try again later.")

FAQ

  1. Is there a visual way to create a knowledge base? Can I attach a knowledge base created in the Model Studio console?

    You can use the Model Studio console to create and use knowledge bases. Knowledge bases created in the console or using an API can be used to build the retrieval-augmented generation (RAG) feature of the assistant API.

  2. Does the assistant API support calling multiple tools? If an assistant is configured with both RAG and function call tools, will they conflict with each other?

    Yes, the assistant API supports calling multiple tools, and they will not conflict. The assistant decides whether to call a specific tool. If you configure multiple tools for an assistant, the assistant decides whether to call one or more tools based on the user's input.

  3. Does the RAG feature of the assistant API support setting assembly policies?

    Currently, the Assistant API for Retrieval-Augmented Generation (RAG) only supports specifying the number of segments to retrieve from a single knowledge base or the total number of segments to retrieve from multiple knowledge bases. Other advanced features are not yet supported. To use advanced RAG features, such as the "Assembly Strategy", you can use the Model Studio console to create an Alibaba Cloud Model Studio agent application.