Learn about the streaming output feature of the Assistant API (being unpublished)

更新时间:
复制 MD 格式

Streaming output lets you retrieve the real-time running status of an assistant. This lets you display the content generated by the Large Language Model (LLM) to users word by word.

Important

The Assistant API is being unpublished. As an alternative, you can migrate to the Responses API. It offers built-in tools and multi-turn context management.

I am Qianwen, an AI assistant developed by Alibaba Cloud. I can answer various questions, provide information, and talk with users. How can I help you?
⏱️ Wait time: about 3 seconds
Traditional Output Mode

Streaming output has the following features:

  • More natural conversation flow: It simulates human communication patterns for a smoother and more natural interaction.

  • Finer control: You can monitor the assistant's running status in real time to display the generation progress or stop the process if an error occurs.

  • More reliable connection: The response is returned in chunks. This prevents long wait times on the client side.

Getting started

To enable streaming output, run the assistant using Run.create(stream=True).

Make sure you have configured the API key as an environment variable and installed the DashScope SDK.

Non-streaming output

# Non-streaming output 
from dashscope import Assistants, Threads, Runs, Messages

# Create an assistant
assistant = Assistants.create(
        model='qwen-plus',  # For a list of models, see https://www.alibabacloud.com/help/en/model-studio/getting-started/models
        name='Sample Assistant',
        instructions='You are a helpful assistant'
    )

# Create a conversation thread
thread = Threads.create(assistant_id=assistant.id,
                        messages=[{
                            'role': 'user',
                            'content': 'Hello!'
                        }])

# Run the conversation and wait for the response
run = Runs.create(thread.id,
                  assistant_id=assistant.id)
Runs.wait(run.id,
          thread_id=thread.id)

# Get and print the result
message = Messages.list(thread.id).data[0].content[0].text.value
print(message)

Streaming output

# Streaming output
from dashscope import Assistants, Threads, Runs

# Create an assistant
assistant = Assistants.create(
        model='qwen-plus',  # For a list of models, see https://www.alibabacloud.com/help/en/model-studio/getting-started/models
        name='Sample Assistant',
        instructions='You are a helpful assistant'
    )

# Create a conversation thread
thread = Threads.create(assistant_id=assistant.id,
                        messages=[{
                            'role': 'user',
                            'content': 'Hello!'
                        }])

# Run the streaming conversation
run = Runs.create(thread.id,
                  assistant_id=assistant.id,
                  stream=True)  # The key configuration to enable streaming output

# Output the result in real time
for event, data in run:
    if event == 'thread.message.delta':
        print(data.delta.content.text.value, end='', flush=True)

Configuration method

In the Assistant API, rendering streaming output involves two steps:

  1. Run the assistant in stream mode.

  2. Receive the stream data.

Think of streaming output as a "stream of water". Running the assistant is like "turning on the tap", and receiving data is like "catching the water".

Run the assistant in stream mode

To run the assistant in stream mode, set the stream parameter to True when you create a run.

run = Runs.create(thread_id=thread.id,
                  assistant_id=assistant.id,
                  stream=True)  # Simply set stream=True to run the assistant in stream mode

Receive the stream data

In a stream run, the assistant produces two main types of data streams:

  • Status information stream: Tracks the run status and progress.

  • Conversation message stream: Contains the actual output content.

To capture this continuous stream of data, you can build a loop to process it and set rules to distinguish between different data types. As a developer, you typically only need to follow the conversation message stream unless you need to implement more complex state management. For a detailed list of events and event data, see the Assistant API streaming output development reference.

for event, data in run_iterator:
    if event == 'thread.run.created':  # [Status information stream] If a run is created, display its current status
        print(data.status)  
    if event == 'thread.message.delta':  # [Conversation message stream] If a message chunk is generated, display it immediately
        print(data.delta.content.text.value, end='', flush=True)

Conversation message stream

In the Assistant API, the assistant produces two types of conversation message streams:

  • Text message stream: The stream of text messages generated by the LLM in the assistant.

    for event, data in run_iterator:
        if event == 'thread.message.delta':  # [Conversation message stream] A message delta object, which indicates a newly generated text message chunk.
            print(data.delta.content.text.value, end='', flush=True)  # Output this chunk.
  • Tool message stream: The stream of tool messages returned from tool calls in the assistant. The following example uses the code interpreter:

    for event, data in run_iterator:
        if event == 'thread.run.step.delta':  # [Conversation message stream] A run step delta object, which indicates a new tool call.
            tool_call = data.delta.step_details.tool_calls[0]
            if getattr(tool_call, 'type', '') == 'code_interpreter':  # Take the code interpreter as an example.
                print(getattr(tool_call.code_interpreter, 'arguments', ''), end='', flush=True)  # Output the code to be executed.
                print(getattr(tool_call.code_interpreter, 'output', ''), end='', flush=True)  # Output the code execution result.

    Code interpreter, Quark search, text-to-image, and calculator support streaming output. Other Assistant API tools do not support streaming output. For more information, see Run step delta object.

You have now learned the basic method for enabling streaming output. The following example demonstrates how to build a streaming output with tool calls.

Example: Code tutoring assistant (streaming output)

In this simple example, the assistant is configured with the code interpreter tool. This enables streaming output for text messages, code generation, and code execution. For more information about how to configure the code interpreter, see Code interpreter - Features.

Make sure you have configured the API key as an environment variable and installed the DashScope SDK.

from dashscope import Assistants, Threads, Runs
import time

class CodeTutorStream:
    """A demo class for the code tutoring assistant with streaming output."""
    
    def __init__(self):
        self.assistant = None
        self.thread = None
        
    def create_assistant(self):
        """Create a tutoring assistant configured with the code interpreter."""
        self.assistant = Assistants.create(
            model='qwen-plus',
            name="Python Tutoring Assistant",
            instructions="You are a patient code tutoring assistant. Please use streaming output to explain the code step by step.",
            tools=[{'type': 'code_interpreter'}]
        )
        print("Tutoring assistant initialized (code interpreter enabled)\n")

    def start_lesson(self, question):
        """Create a tutoring thread and start the streaming conversation."""
        self.thread = Threads.create(messages=[{
            'role': 'user',
            'content': question
        }])
        
        print("Student's question:", question)
        print("\nAssistant is thinking...\n")
        
        # Delay for 1 second to simulate processing time
        time.sleep(1)
        
        # Start the stream run
        run_stream = Runs.create(
            self.thread.id,
            assistant_id=self.assistant.id,
            stream=True
        )
        return run_stream

    def process_stream(self, stream):
        message_flag = False
        tool_call_flag = False
        """Process the streaming output and simulate a tutoring scenario."""
        try:
            for event, data in stream:
                # Process the text explanation
                if event == 'thread.message.delta':
                    if not message_flag:
                        print(f"\nStep-by-step explanation:")
                        message_flag = True
                    text_chunk = data.delta.content.text.value
                    self.simulate_typing(text_chunk)
                
                # Process the code demonstration
                if event == 'thread.run.step.delta':
                    if not tool_call_flag:
                        print(f"\nCode demonstration:")
                        tool_call_flag = True
                    tool_call = data.delta.step_details.tool_calls[0]
                    if getattr(tool_call, 'type', '') == 'code_interpreter':
                        code = getattr(tool_call.code_interpreter, 'arguments', '')
                        output = getattr(tool_call.code_interpreter, 'output', '')
                        
                        self.simulate_typing(code, speed=0.03)
                        self.simulate_typing(output)
                        
        except KeyboardInterrupt:
            stream.close()
            print("\nTutoring interrupted")

    @staticmethod
    def simulate_typing(text, speed=0.03):
        """Simulate a typewriter effect for the output."""
        for char in text:
            print(char, end='', flush=True)
            time.sleep(speed)

if __name__ == "__main__":
    tutor = CodeTutorStream()
    
    # Create the tutoring assistant
    tutor.create_assistant()
    
    # Set the tutoring question
    question = """Please explain in detail how to plot a sine function graph in Python:
1. Explain the mathematical principles step by step.
2. Demonstrate the usage of numpy and matplotlib.
3. Show the final visualization."""
    
    # Start the streaming tutorial
    stream = tutor.start_lesson(question)
    
    # Process the real-time output
    tutor.process_stream(stream)

FAQ

If you encounter code execution errors while configuring streaming output, see Error messages to troubleshoot the error.