借助 Assistant API 构建具备自动规划能力的 Multi Agent 系统

本文将通过构建一个查询阿里云资源信息的Multi Agent系统,帮助您了解如何通过百炼平台的Assistant API构建一个无需提前定义、可自动规划编排任务流程的Multi Agent系统。

多智能体系统(Multi Agent System)是多个Agent协作完成任务的方式,在多数场景下,比单Agent完成任务准确率更高。通过给每个Agent制定明确且专业的角色名称和职责描述,不仅可以提升它们的专业性,还能帮助它们更好地理解和配合各自的工作。Multi Agent System的设计十分灵活,您可以参考本案例中的设计和示例代码,将其应用到您的业务中。

效果展示

通过本文提供的教程,您可以实现一个能进行阿里云资源信息查询的 Multi Agent 系统。该系统在收到用户问题后,会自动规划多个 Agent 之间的协作流程,按照规划执行任务后给出最终答案。

image

output

详细工作流程:

  1. 用户进行提问;

  2. PlannerAssistant接收到用户的提问,并根据用户问题对其它Agent进行选择并编排其运行顺序;

  3. ChatAssistant、AliyunInfoAssistantInstanceTypeDetailAssistant分别有对应的职责与能力,它们是本教程Multi Agent系统中被PlannerAssistant编排的Agent。编排完成后的Agent组合会接收用户的提问,并按照程序中设计的Multi Agent交互逻辑进行分工合作;

  4. SummaryAssistant将各Agent的输出信息汇总,并结合用户问题对信息进行总结,其输出会作为Multi Agent系统的最终输出。

前提条件

  1. 请参考创建AccessKey,获取用户的ACCESS_KEY_IDACCESS_KEY_SECRET

  2. 请开通百炼服务并获取API Key

  3. 我们推荐您将ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRETDASHSCOPE_API_KEY配置在环境变量中,以降低泄露风险。环境变量配置方法可参考:配置API Key到环境变量。在本教程中,您可以参考以下命令,根据您的操作系统选择配置环境变量的方法:

    # 用您的API-KEY与阿里云AK信息进行替换
    export DASHSCOPE_API_KEY="YOUR_DASHSCOPE_API_KEY"
    export ALIBABA_CLOUD_ACCESS_KEY_ID="YOUR_ALIBABA_CLOUD_ACCESS_KEY_ID"
    export ALIBABA_CLOUD_ACCESS_KEY_SECRET="YOUR_ALIBABA_CLOUD_ACCESS_KEY_SECRET"
    # 用您的API-KEY与阿里云AK信息进行替换
    $env:DASHSCOPE_API_KEY = "YOUR_DASHSCOPE_API_KEY"
    $env:ALIBABA_CLOUD_ACCESS_KEY_ID="YOUR_ALIBABA_CLOUD_ACCESS_KEY_ID"
    $env:ALIBABA_CLOUD_ACCESS_KEY_SECRET="YOUR_ALIBABA_CLOUD_ACCESS_KEY_SECRET"
  4. 如果您没有创建ECS实例,请前往ECS控制台创建一个按量付费的实例。

    说明

    如果您在完成本实践教程后没有继续使用ECS实例的需求,请及时释放您的ECS资源,避免产生不必要的消费。

  5. 请确认您的计算环境中已安装Python。您可以新建一个requirements.txt文件,将以下内容复制到txt文件中。

    alibabacloud_tea_openapi
    alibabacloud_tea_util
    alibabacloud_openapi_util
    alibabacloud_ecs20140526
    alibabacloud_bssopenapi20171214
    dashscope
    gradio

    在您保存requirements.txt文件后,请您在requirements.txt所在目录中运行以下命令安装依赖:

    pip install -r requirements.txt
  6. 请您参考最佳实践-基于RAG的官方文档助手文档,在百炼平台创建RAG应用,知识库文件选择PDF格式的阿里云ECS官方文档实例规格族.pdf,并获取其应用ID。

    image

代码实现

您需要准备两个py文件,分别为tools.pymain.py,这两个文件需要放置在同一目录中。

tools.py(用于定义工具函数)

tools.py主要定义了Agent会使用到的工具函数。tools.py整体代码可见tools.py整体代码

引入依赖

from alibabacloud_tea_util import models as util_models
from alibabacloud_ecs20140526.client import Client as Ecs20140526Client
from alibabacloud_ecs20140526 import models as ecs_20140526_models
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_bssopenapi20171214.client import Client as BssOpenApi20171214Client
from dashscope import Application
import os

定义工具

tools.py中主要使用到了两个工具类,分别为ECSBilling。ECS的类方法包含有query_sourcecall_agent_app,Billing的类方法包含get_balance。您可以参考以下代码,添加您需要的工具类与工具函数以适配您的业务。

ECS

ECS有两个类函数:query_sourcecall_agent_app

query_source通过阿里云的OpenAPI服务查询指定区域的ECS实例信息。函数输入为RegionID,如cn-hangzhoucn-beijingcn-shanghai等;输出包含实例ID、实例规格与价格信息。

call_agent_app函数通过集成阿里云ECS官方文档实例规格族.pdf知识库的百炼RAG应用查询实例规格的详细信息,包括vCPU个数、内存大小等指标数据。您需要获取您在百炼创建应用的app_id并在代码中的对应位置进行替换。函数输入参数为ECS实例规格列表,如['ecs.e-c1m1.large', 'ecs.u1-c1m4.xlarge'],输出为RAG应用的回复。需要注意的是,与Agent相似,RAG应用的背后也是基于大模型驱动,因此call_agent_app的运行时间可能较长。

ECS的定义代码如下:

说明

请用您在百炼平台创建的RAG应用的app_id替代代码中的app_id

class ECS:
    @classmethod
    # 输入:地域ID,如cn-hangzhou,cn-beijing等
    # 输出:查询ecs实例规格信息,包括实例ID,实例规格,每小时收费。(系统盘默认按照cloud_essd_entry,40GiB)
    def query_source(cls,RegionID):
        config = open_api_models.Config(
                # 从环境变量中获取阿里云的AK信息
                access_key_id=os.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
                access_key_secret=os.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
            )
        # 杭州与北京上海等地区的endpoint不同,此处进行判断
        if RegionID != 'cn-hangzhou':
            config.endpoint = f'ecs.{RegionID}.aliyuncs.com'
        else:
            config.endpoint = f'ecs-{RegionID}.aliyuncs.com'
        client = Ecs20140526Client(config)
        describe_instances_request = ecs_20140526_models.DescribeInstancesRequest(region_id=RegionID)
        runtime = util_models.RuntimeOptions()
        # 获得ECS实例信息
        response_source = client.describe_instances_with_options(describe_instances_request, runtime).body
        if len(response_source.instances.instance) == 0:
            return "您在当前区域无ecs实例"
        # 初始化要返回的结果
        result = ""
        # 可能有多个实例,因此用for循环遍历所有实例
        for i in range(len(response_source.instances.instance)):
            # 系统盘类型与存储空间,此处默认设为cloud_essd_entry,40GiB
            system_disk = ecs_20140526_models.DescribePriceRequestSystemDisk(
                category='cloud_essd_entry',
                size=40
            )
            describe_price_request = ecs_20140526_models.DescribePriceRequest(
                region_id=RegionID,
                resource_type='instance',
                instance_type=response_source.instances.instance[i].instance_type,
                system_disk=system_disk
            )
            response = client.describe_price_with_options(describe_price_request, runtime).body
            cur_result = f"""实例:{response_source.instances.instance[i].instance_id} 的规格为:{response_source.instances.instance[i].instance_type},
    每个小时的收费为{response.price_info.price.trade_price}元\n"""
            # 将当前实例的信息添加到返回结果中
            result += cur_result
        return result
    @classmethod
    # RAG应用调用
    def call_agent_app(cls,InstanceType):
        if len(InstanceType) == 0:
            return "您在当前区域无ecs实例"
        result = ""
        for i in range(len(InstanceType)):
            response = Application.call(
                # 此处填写RAG应用的app_id
                app_id='xxx',
                prompt=f'介绍一下{InstanceType[i]}',
                # 从环境变量中获取Dashscope的API Key
                api_key=os.getenv("DASHSCOPE_API_KEY"))
            result += response.output.text
        return result
Billing

Billing类中有一个函数get_balance

get_balance通过阿里云的OpenAPI服务查询阿里云的余额信息。

class Billing:
    # 无需输入,返回为阿里云账户余额信息
    @classmethod
    def get_balance(cls):
        # 创建客户端
        config = open_api_models.Config(
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Endpoint 请参考 https://api.aliyun.com/product/BssOpenApi
        config.endpoint = f'business.aliyuncs.com'
        runtime = util_models.RuntimeOptions()
        client = BssOpenApi20171214Client(config)
        balance_info = client.query_account_balance_with_options(runtime).body.data
        return f"""币种为:{balance_info.currency},可用额度为{balance_info.available_amount},信控余额为{balance_info.credit_amount},
        网商余额为{balance_info.mybank_credit_amount},现金余额为{balance_info.available_cash_amount},生态客户Quota限额为{balance_info.quota_limit}。"""

tools.py整体代码

tools.py整体代码如下:

from alibabacloud_tea_util import models as util_models
from alibabacloud_ecs20140526.client import Client as Ecs20140526Client
from alibabacloud_ecs20140526 import models as ecs_20140526_models
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_bssopenapi20171214.client import Client as BssOpenApi20171214Client
from dashscope import Application
import os

class ECS:
    @classmethod
    # 输入:地域ID,如cn-hangzhou,cn-beijing等
    # 输出:查询ecs实例规格信息,包括实例ID,实例规格,每小时收费。(系统盘默认按照cloud_essd_entry,40GiB)
    def query_source(cls,RegionID):
        config = open_api_models.Config(
                # 从环境变量中获取阿里云的AK信息
                access_key_id=os.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
                access_key_secret=os.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
            )
        # 杭州与北京上海等地区的endpoint不同,此处进行判断
        if RegionID != 'cn-hangzhou':
            config.endpoint = f'ecs.{RegionID}.aliyuncs.com'
        else:
            config.endpoint = f'ecs-{RegionID}.aliyuncs.com'
        client = Ecs20140526Client(config)
        describe_instances_request = ecs_20140526_models.DescribeInstancesRequest(region_id=RegionID)
        runtime = util_models.RuntimeOptions()
        # 获得ECS实例信息
        response_source = client.describe_instances_with_options(describe_instances_request, runtime).body
        if len(response_source.instances.instance) == 0:
            return "您在当前区域无ecs实例"
        # 初始化要返回的结果
        result = ""
        # 可能有多个实例,因此用for循环遍历所有实例
        for i in range(len(response_source.instances.instance)):
            # 系统盘类型与存储空间,此处默认设为cloud_essd_entry,40GiB
            system_disk = ecs_20140526_models.DescribePriceRequestSystemDisk(
                category='cloud_essd_entry',
                size=40
            )
            describe_price_request = ecs_20140526_models.DescribePriceRequest(
                region_id=RegionID,
                resource_type='instance',
                instance_type=response_source.instances.instance[i].instance_type,
                system_disk=system_disk
            )
            response = client.describe_price_with_options(describe_price_request, runtime).body
            cur_result = f"""实例:{response_source.instances.instance[i].instance_id} 的规格为:{response_source.instances.instance[i].instance_type},
    每个小时的收费为{response.price_info.price.trade_price}元\n"""
            # 将当前实例的信息添加到返回结果中
            result += cur_result
        return result
    @classmethod
    # RAG应用调用
    def call_agent_app(cls,InstanceType):
        if len(InstanceType) == 0:
            return "您在当前区域无ecs实例"
        result = ""
        for i in range(len(InstanceType)):
            response = Application.call(
                # 此处填写RAG应用的app_id
                app_id='xxx',
                prompt=f'介绍一下{InstanceType[i]}',
                # 从环境变量中获取Dashscope的API Key
                api_key=os.getenv("DASHSCOPE_API_KEY"))
            result += response.output.text
        return result

class Billing:
    # 无需输入,返回为阿里云账户余额信息
    @classmethod
    def get_balance(cls):
        # 创建客户端
        config = open_api_models.Config(
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
            access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
            # 必填,请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
            access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
        )
        # Endpoint 请参考 https://api.aliyun.com/product/BssOpenApi
        config.endpoint = f'business.aliyuncs.com'
        runtime = util_models.RuntimeOptions()
        client = BssOpenApi20171214Client(config)
        balance_info = client.query_account_balance_with_options(runtime).body.data
        return f"""币种为:{balance_info.currency},可用额度为{balance_info.available_amount},信控余额为{balance_info.credit_amount},
        网商余额为{balance_info.mybank_credit_amount},现金余额为{balance_info.available_cash_amount},生态客户Quota限额为{balance_info.quota_limit}。"""

if __name__ == '__main__':
    print(ECS.query_source('cn-hangzhou'))

main.py(用于创建Agent并定义交互方式)

main.py主要作用为:

  1. 创建Agent

  2. 定义消息传递函数

  3. 定义Agent之间交互方式并获得回复

  4. 前端展示界面

其整体代码可见:main.py整体代码

引入依赖

from dashscope import Assistants, Messages, Runs, Threads
import json
# 从tools.py导入工具函数
from tools import ECS,Billing
# 引入前端界面展示依赖
import gradio as gr

# 将列表形式的字符串解析为列表形式的数据,例如:"['a','b']"-->['a','b']。
# 用于将plannerassistant的输出解析为元素为assistant的列表
import ast

创建Agent

本教程共包含五个Agent,集成的工具函数与功能请见下表:

Agent名称

集成的工具函数

功能

PlannerAssistant

Multi Agent进行编排。

ChatAssistant

如果无需使用工具,则使用该Agent进行回答。

AliyunInfoAssistant

query_sourceget_balance

查询阿里云的资源信息,包括ECS实例与阿里云余额。

InstanceTypeDetailAssistant

call_agent_app

查询指定实例规格的指标数据。

SummaryAssistant

结合前序Agent的输出,对用户问题进行全面、完整的回复。

五个Agent的详情如下:

说明

Agent中的大模型通过Assistants.create方法的model参数定义;Agent的功能由Assistants.create方法中的tools参数定义;您可以通过Assistants.create中的instructions指引Agent使用工具的方式,以及Agent的输出格式等,例如让AgentJSON格式输出字符串。具体实现代码请参考以下五个Agent的代码实现。

PlannerAssistant

PlannerAssistant负责根据用户的输入与其它agent的功能,编排Multi Agent的工作方式,是Multi Agent的核心。在得到PlannerAssistant输出后,需要在后续程序中进行字符串解析,将其解析成列表形式的数据。代码如下:

# 决策级别的agent,决定使用哪些agent,以及它们的运行顺序
PlannerAssistant = Assistants.create(
    # 因为该Agent作用比较重要,因此建议选择性能较强的大模型:qwen-max
    model="qwen-max",
    # 定义Agent的名称
    name='流程编排机器人',
    # 定义Agent的功能描述
    description='你是团队的leader,你的手下有很多assistant,你需要根据用户的输入,决定要以怎样的顺序去使用这些assistant',
    # 定义对Agent的指示语句,Agent会按照指示语句进行工具的调用并返回结果。
    instructions="""你的团队中有以下assistant。AliyunInfoAssistant:可以查询用户指定区域的阿里云ecs实例信息,或者查询用户的阿里云余额;InstanceTypeDetailAssistant:可以查询指定阿里云ecs实例规格的详细信息,比如cpu核数、内存大小等,可以一次查询多个实例规格信息,因此无需多次调用;
    ChatAssistant:如果用户的问题无需以上两个assistant,则调用该assistant。你需要根据用户的问题,判断要以什么顺序使用这些assistant,你的返回形式是一个列表,不能返回其它信息。比如:["AliyunInfoAssistant", "AliyunInfoAssistant","InstanceTypeDetailAssistant"]或者["ChatAssistant"],列表中的元素只能为上述的assistant"""
)

ChatAssistant

ChatAssistant的功能是回复日常问题,可以使用成本较低的模型(如qwen-turbo)作为agent的底座模型。

# 功能是回复日常问题。对于日常问题来说,可以使用价格较为低廉的模型作为agent的基座
ChatAssistant = Assistants.create(
    # 因为该Agent对大模型性能要求不高,因此使用成本较低的qwen-turbo模型
    model="qwen-turbo",
    name='回答日常问题的机器人',
    description='一个智能助手,解答用户的问题',
    instructions='请礼貌地回答用户的问题'
)

AliyunInfoAssistant

AliyunInfoAssistant的功能是查询阿里云的资源信息。目前有ecs实例查询与阿里云余额查询两个功能。代码详情为:

# 功能是查询阿里云的资源信息。目前有ecs实例查询与阿里云余额查询两个功能
AliyunInfoAssistant = Assistants.create(
    model="qwen-max",
    name='阿里云资源信息查询机器人',
    description='一个智能助手,根据用户的查询去调用工具并返回查询到的阿里云资源结果',
    instructions='你是一个智能助手,你有两个功能,分别是阿里云ecs实例信息查询和阿里云余额查询。请准确判断调用哪个工具,并礼貌地回答用户的问题。',
    # 定义Agent使用的工具,您可以根据您的业务场景在tools列表中定义一个或多个Agent可能会使用的工具。
    tools=[
        {
            'type': 'function',
            'function': {
                # 工具函数的名称,可通过下文代码中的function_mapper将name映射到函数本体
                'name': 'ecs实例信息查询',
                # 工具函数的描述
                'description': '当需要查询阿里云ecs实例信息时非常有用,比如实例id,实例规格,收费信息等',
                # 工具函数的入参
                'parameters': {
                    'type': 'object',
                    'properties': {
                        # 该工具需要用户输入地域信息
                        'RegionID': {
                            'type': 'str',
                            # 参数的描述信息
                            'description': '用户想要查询实例所属的地域id,如果是杭州,则为cn-hangzhou,如果是上海,则为cn-shanghai,如果是北京,则为cn-beijing'
                        },
                    },
                    'required': ['RegionID']},
            }
        },
        {
            'type': 'function',
            'function': {
                'name': '阿里云余额查询',
                # 工具函数的描述
                'description': '当需要查询阿里云账户信息时非常有用',
                # 工具函数的入参,余额查询无需入参,因此为空
                'parameters': {}
            }
        }
    ]
)

InstanceTypeDetailAssistant

InstanceTypeDetailAssistant接收实例规格列表信息,并通过call_agent_app函数将每一种实例规格对RAG应用进行查询,将每种实例规格返回的vCPU个数、内存大小等指标数据汇总作为输出。

# 功能是通过在百炼平台创建的RAG应用查询实例规格的详细信息
InstanceTypeDetailAssistant = Assistants.create(
    model="qwen-max",
    name='ecs实例规格介绍机器人',
    description='一个智能助手,可以通过用户提供的实例规格,调用已有的插件能力给用户介绍实例规格信息。',
    instructions='你是一个智能助手,你需要从输入中精确提取出实例规格信息,如[ecs.e-c1m1.large],或[ecs.u1-c1m4.xlarge,ecs.e-c1m1.large]。将实例规格列表输入工具中,获得它们的详细信息',
    tools=[
        {
            'type': 'function',
            'function': {
                'name': 'ecs实例规格介绍',
                'description': '返回客户查询指定ecs实例规格的信息',
                'parameters': {
                    'type': 'object',
                    'properties': {
                        'InstanceType': {
                            'type': 'list',
                            'InstanceType': '用户想要查询的实例规格,有可能是一个,有可能是多个,如:[ecs.e-c1m1.large],或[ecs.u1-c1m4.xlarge,ecs.e-c1m1.large]'
                        },
                    },
                    'required': ['InstanceType']},
            }
        }
    ]
)

SummaryAssistant

每一个Agent都会有输出信息,因此需要一个用于总结的Agent将前序的Agent输出信息进行总结,从而对用户的问题进行全面、完整的回答。代码详情如下:

# 在Multi Agent场景下,定义一个用于总结的Agent,该Agent会根据用户的问题与之前Agent输出的参考信息,全面、完整地回答用户问题
SummaryAssistant = Assistants.create(
    model="qwen-max",
    name='总结机器人',
    description='一个智能助手,根据用户的问题与参考信息,全面、完整地回答用户问题',
    instructions='你是一个智能助手,根据用户的问题与参考信息,全面、完整地回答用户问题'
)

定义字符串与函数、字符串与Agent本体的映射

由于大模型生成的是字符串形式的结果,因此我们在程序中需要对大模型生成的字符串进行解析、映射等操作,以达到和外界交互的功能。

# 将工具函数的name映射到函数本体
function_mapper = {
    "ecs实例信息查询": ECS.query_source,
    "ecs实例规格介绍":ECS.call_agent_app,
    "阿里云余额查询":Billing.get_balance
}
# 将Agent的name映射到Agent本体
assistant_mapper = {
    "ChatAssistant": ChatAssistant,
    "AliyunInfoAssistant":AliyunInfoAssistant,
    "InstanceTypeDetailAssistant":InstanceTypeDetailAssistant
}

定义消息传递函数

消息传递函数get_agent_response接收assistantmessage两个参数,用于获得指定Agent在接收到输入message时的输出信息。

# 输入message信息,输出为指定Agent的回复
def get_agent_response(assistant, message=''):
    # 打印出输入Agent的信息
    print(f"Query: {message}")
    thread = Threads.create()
    message = Messages.create(thread.id, content=message)
    run = Runs.create(thread.id, assistant_id=assistant.id)
    run_status = Runs.wait(run.id, thread_id=thread.id)
    # 如果响应失败,会打印出run failed
    if run_status.status == 'failed':
        print('run failed:')
    # 如果需要工具来辅助大模型输出,则进行以下流程
    if run_status.required_action:
        f = run_status.required_action.submit_tool_outputs.tool_calls[0].function
        # 获得function name
        func_name = f['name']
        # 获得function 的入参
        param = json.loads(f['arguments'])
        # 打印出工具信息
        print("function is",f)
        # 根据function name,通过function_mapper映射到函数,并将参数输入工具函数得到output输出
        if func_name in function_mapper:
            output = function_mapper[func_name](**param)
        else:    
            output = ""
        tool_outputs = [{
            'output':
                output
        }]
        run = Runs.submit_tool_outputs(run.id,
                                       thread_id=thread.id,
                                       tool_outputs=tool_outputs)
        run_status = Runs.wait(run.id, thread_id=thread.id)
    run_status = Runs.get(run.id, thread_id=thread.id)
    msgs = Messages.list(thread.id)
    # 将Agent的输出返回
    return msgs['data'][0]['content'][0]['text']['value']

定义Agent之间交互方式并获得回复

Agent之间的交互步骤根据PlannerAssistant进行编排,为了适配Gradio的前端界面展示,输入输出的参数需要与Gradio中的组件进行对齐。代码如下:

说明

使用yield关键字而不是return,可以迭代地生成和返回中间结果。这样,中间结果可以逐步传递给前端界面,实现实时显示,而不必等到所有结果生成后再显示。

# 获得Multi Agent的回复,输入与输出需要与Gradio前端展示界面中的参数对齐
def get_multi_agent_response(query,history):
    # 处理输入为空的情况
    if len(query) == 0:
        return "",history+[("","")],"",""
    # 获取Agent的运行顺序
    assistant_order = get_agent_response(PlannerAssistant,query)
    try:
        order_stk = ast.literal_eval(assistant_order)
        cur_query = query
        # 依次运行Agent
        for i in range(len(order_stk)):
            yield "----->".join(order_stk),history+[(query,"multi agent正在努力工作中...")],f"{order_stk[i]}正在处理信息...",""
            cur_assistant = assistant_mapper[order_stk[i]]
            response = get_agent_response(cur_assistant,cur_query)
            yield "----->".join(order_stk),history+[(query,"multi agent正在努力工作中...")],response,""
            # 如果当前Agent为最后一个Agent,则将其输出作为Multi Agent的输出
            if i == len(order_stk)-1:
                yield "----->".join(order_stk),history+[(query,response)],"assistant已处理完毕",""
            # 如果当前Agent不是最后一个Agent,则将上一个Agent的输出response添加到下一轮的query中,作为参考信息
            else:
                # 在参考信息前后加上特殊标识符,可以防止大模型混淆参考信息与提问
                cur_query = f"你可以参考已知的信息:\n{response}\n你要完整地回答用户的问题。问题是:{query}。"
    # 兜底策略,如果上述程序运行失败,则直接调用ChatAssistant
    except Exception as e:
        yield "ChatAssistant",[(query,get_agent_response(ChatAssistant,query))],"",""

输入参数为queryhistory。其中query为用户的提问(字符串形式),history为用户与Multi Agent的对话记录(列表形式)。

第一个输出参数为Agent的编排信息(字符串形式),第二个输出参数为用户与Multi Agent的对话历史(列表形式),第三个输出参数为当前运行Agent的状态(字符串形式),为了适配Gradio组件,第四个输出参数为用户的输入框(字符串形式,设为""以达到用户发起提问后将输入框清空的效果)。

前端展示界面

本教程使用gradio作为前端展示工具。gradio可以快速帮助机器学习工作者创建模型效果展示界面,代码详情如下:

# 前端界面展示
with gr.Blocks() as demo:
    # 在界面中央展示标题
    gr.HTML('<center><h1>欢迎使用阿里云资源查询bot</h1></center>')
    gr.HTML('<center><h3>支持的功能有指定区域的ecs实例查询、余额查询、实例规格详情查询。您可以在tools.py中添加您需要的工具,并在main.py中配置相关的agent</h3></center>')
    with gr.Row():
        with gr.Column(scale=10):
            chatbot = gr.Chatbot(value=[["hello","很高兴见到您!您想问关于阿里云资源的哪些问题呢?"]],height=600)
        with gr.Column(scale=4):
            text1 = gr.Textbox(label="assistant选择")
            text2 = gr.Textbox(label="当前assistant状态",lines=22)
    with gr.Row():
        msg = gr.Textbox(label="输入",placeholder="您想了解什么呢?")
    # 一些示例问题
    with gr.Row():
        examples = gr.Examples(examples=[
            '我的阿里云余额还有多少钱啊',
            '我在杭州有哪些ecs实例,把它的实例id,价钱以及实例规格详情告诉我',
            '我想了解ecs.u1-c1m4.xlarge和ecs.gn6i-c4g1.xlarge的指标'],inputs=[msg])
    clear = gr.ClearButton([text1,chatbot,text2,msg])
    msg.submit(get_multi_agent_response, [msg,chatbot], [text1,chatbot,text2,msg])

main.py整体代码

from dashscope import Assistants, Messages, Runs, Threads
import json
# 从tools.py导入工具函数
from tools import ECS,Billing
# 引入前端界面展示依赖
import gradio as gr
import ast

# 决策级别的agent,决定使用哪些agent,以及它们的运行顺序
PlannerAssistant = Assistants.create(
    # 因为该Agent作用比较重要,因此建议选择性能较强的大模型:qwen-max
    model="qwen-max",
    # 定义Agent的名称
    name='流程编排机器人',
    # 定义Agent的功能描述
    description='你是团队的leader,你的手下有很多assistant,你需要根据用户的输入,决定要以怎样的顺序去使用这些assistant',
    # 定义对Agent的指示语句,Agent会按照指示语句进行工具的调用并返回结果。
    instructions="""你的团队中有以下assistant。AliyunInfoAssistant:可以查询用户指定区域的阿里云ecs实例信息,或者查询用户的阿里云余额;InstanceTypeDetailAssistant:可以查询指定阿里云ecs实例规格的详细信息,比如cpu核数、内存大小等,可以一次查询多个实例规格信息,因此无需多次调用;
    ChatAssistant:如果用户的问题无需以上两个assistant,则调用该assistant。你需要根据用户的问题,判断要以什么顺序使用这些assistant,你的返回形式是一个列表,不能返回其它信息。比如:["AliyunInfoAssistant", "AliyunInfoAssistant","InstanceTypeDetailAssistant"]或者["ChatAssistant"],列表中的元素只能为上述的assistant"""
)

# 功能是回复日常问题。对于日常问题来说,可以使用价格较为低廉的模型作为agent的基座
ChatAssistant = Assistants.create(
    # 因为该Agent对大模型性能要求不高,因此使用成本较低的qwen-turbo模型
    model="qwen-turbo",
    name='回答日常问题的机器人',
    description='一个智能助手,解答用户的问题',
    instructions='请礼貌地回答用户的问题'
)

# 功能是查询阿里云的资源信息。目前有ECS实例查询与阿里云余额查询两个功能
AliyunInfoAssistant = Assistants.create(
    model="qwen-max",
    name='阿里云资源信息查询机器人',
    description='一个智能助手,根据用户的查询去调用工具并返回查询到的阿里云资源结果',
    instructions='你是一个智能助手,你有两个功能,分别是阿里云ecs实例信息查询和阿里云余额查询。请准确判断调用哪个工具,并礼貌地回答用户的问题。',
    # 定义Agent使用的工具,您可以根据您的业务场景在tools列表中定义一个或多个Agent可能会使用的工具。
    tools=[
        {
            'type': 'function',
            'function': {
                # 工具函数的名称,可通过下文代码中的function_mapper将name映射到函数本体
                'name': 'ecs实例信息查询',
                # 工具函数的描述
                'description': '当需要查询阿里云ecs实例信息时非常有用,比如实例id,实例规格,收费信息等',
                # 工具函数的入参
                'parameters': {
                    'type': 'object',
                    'properties': {
                        # 该工具需要用户输入地域信息
                        'RegionID': {
                            'type': 'str',
                            # 参数的描述信息
                            'description': '用户想要查询实例所属的地域id,如果是杭州,则为cn-hangzhou,如果是上海,则为cn-shanghai,如果是北京,则为cn-beijing'
                        },
                    },
                    'required': ['RegionID']},
            }
        },
        {
            'type': 'function',
            'function': {
                'name': '阿里云余额查询',
                # 工具函数的描述
                'description': '当需要查询阿里云账户信息时非常有用',
                # 工具函数的入参,余额查询无需入参,因此为空
                'parameters': {}
            }
        }
    ]
)

# 功能是通过在百炼平台创建的RAG应用查询实例规格的详细信息
InstanceTypeDetailAssistant = Assistants.create(
    model="qwen-max",
    name='ecs实例规格介绍机器人',
    description='一个智能助手,可以通过用户提供的输入,精确识别提到的实例规格。调用已有的插件能力给用户介绍实例规格信息。',
    instructions='你是一个智能助手,你需要从用户的输入中精确识别提取出阿里云的实例规格信息,如[ecs.e-c1m1.large],或[ecs.u1-c1m4.xlarge,ecs.e-c1m1.large]。将实例规格列表输入工具中,获得它们的详细信息',
    tools=[
        {
            'type': 'function',
            'function': {
                'name': 'ecs实例规格介绍',
                'description': '返回客户查询指定ecs实例规格的信息',
                'parameters': {
                    'type': 'object',
                    'properties': {
                        'InstanceType': {
                            'type': 'list',
                            'InstanceType': '用户想要查询的实例规格,有可能是一个,有可能是多个,如:[ecs.e-c1m1.large],或[ecs.u1-c1m4.xlarge,ecs.e-c1m1.large]'
                        },
                    },
                    'required': ['InstanceType']},
            }
        }
    ]
)

# 在Multi Agent场景下,定义一个用于总结的Agent,该Agent会根据用户的问题与之前Agent输出的参考信息,全面、完整地回答用户问题
SummaryAssistant = Assistants.create(
    model="qwen-max",
    name='总结机器人',
    description='一个智能助手,根据用户的问题与参考信息,全面、完整地回答用户问题',
    instructions='你是一个智能助手,根据用户的问题与参考信息,全面、完整地回答用户问题'
)

# 将工具函数的name映射到函数本体
function_mapper = {
    "ecs实例信息查询": ECS.query_source,
    "ecs实例规格介绍":ECS.call_agent_app,
    "阿里云余额查询":Billing.get_balance
}
# 将Agent的name映射到Agent本体
assistant_mapper = {
    "ChatAssistant": ChatAssistant,
    "AliyunInfoAssistant":AliyunInfoAssistant,
    "InstanceTypeDetailAssistant":InstanceTypeDetailAssistant
}

# 输入message信息,输出为指定Agent的回复
def get_agent_response(assistant, message=''):
    # 打印出输入Agent的信息
    print(f"Query: {message}")
    thread = Threads.create()
    message = Messages.create(thread.id, content=message)
    run = Runs.create(thread.id, assistant_id=assistant.id)
    run_status = Runs.wait(run.id, thread_id=thread.id)
    # 如果响应失败,会打印出run failed
    if run_status.status == 'failed':
        print('run failed:')
    # 如果需要工具来辅助大模型输出,则进行以下流程
    if run_status.required_action:
        f = run_status.required_action.submit_tool_outputs.tool_calls[0].function
        # 获得function name
        func_name = f['name']
        # 获得function 的入参
        param = json.loads(f['arguments'])
        # 打印出工具信息
        print("function is",f)
        # 根据function name,通过function_mapper映射到函数,并将参数输入工具函数得到output输出
        if func_name in function_mapper:
            output = function_mapper[func_name](**param)
        else:    
            output = ""
        tool_outputs = [{
            'output':
                output
        }]
        run = Runs.submit_tool_outputs(run.id,
                                       thread_id=thread.id,
                                       tool_outputs=tool_outputs)
        run_status = Runs.wait(run.id, thread_id=thread.id)
    run_status = Runs.get(run.id, thread_id=thread.id)
    msgs = Messages.list(thread.id)
    # 将Agent的输出返回
    return msgs['data'][0]['content'][0]['text']['value']

# 获得Multi Agent的回复,输入与输出需要与Gradio前端展示界面中的参数对齐
def get_multi_agent_response(query,history):
    # 处理输入为空的情况
    if len(query) == 0:
        return "",history+[("","")],"",""
    # 获取Agent的运行顺序
    assistant_order = get_agent_response(PlannerAssistant,query)
    try:
        order_stk = ast.literal_eval(assistant_order)
        cur_query = query
        Agent_Message = ""
        # 依次运行Agent
        for i in range(len(order_stk)):
            yield "----->".join(order_stk),history+[(query,"multi agent正在努力工作中...")],Agent_Message+'\n'+f"*{order_stk[i]}*正在处理中...",""
            cur_assistant = assistant_mapper[order_stk[i]]
            response = get_agent_response(cur_assistant,cur_query)
            Agent_Message += f"*{order_stk[i]}*的回复为:{response}\n\n"
            yield "----->".join(order_stk),history+[(query,"multi agent正在努力工作中...")],Agent_Message,""
            # 如果当前Agent为最后一个Agent,则将其输出作为Multi Agent的输出
            if i == len(order_stk)-1:
                prompt = f"请参考已知的信息:{Agent_Message},回答用户的问题:{query}。"
                multi_agent_response = get_agent_response(SummaryAssistant,prompt)
                yield "----->".join(order_stk),history+[(query,multi_agent_response)],Agent_Message,""
            # 如果当前Agent不是最后一个Agent,则将上一个Agent的输出response添加到下一轮的query中,作为参考信息
            else:
                # 在参考信息前后加上特殊标识符,可以防止大模型混淆参考信息与提问
                cur_query = f"你可以参考已知的信息:{response}你要完整地回答用户的问题。问题是:{query}。"
    # 兜底策略,如果上述程序运行失败,则直接调用ChatAssistant
    except Exception as e:
        yield "ChatAssistant",[(query,get_agent_response(ChatAssistant,query))],"",""


# 前端界面展示
with gr.Blocks() as demo:
    # 在界面中央展示标题
    gr.HTML('<center><h1>欢迎使用阿里云资源查询bot</h1></center>')
    gr.HTML('<center><h3>支持的功能有指定区域的ecs实例查询、余额查询、实例规格详情查询。您可以在tools.py中添加您需要的工具,并在main.py中配置相关的agent</h3></center>')
    with gr.Row():
        with gr.Column(scale=10):
            chatbot = gr.Chatbot(value=[["hello","很高兴见到您!您想问关于阿里云资源的哪些问题呢?"]],height=600)
        with gr.Column(scale=4):
            text1 = gr.Textbox(label="assistant选择")
            text2 = gr.Textbox(label="当前assistant状态",lines=22)
    with gr.Row():
        msg = gr.Textbox(label="输入",placeholder="您想了解什么呢?")
    # 一些示例问题
    with gr.Row():
        examples = gr.Examples(examples=[
            '我的阿里云余额还有多少钱啊',
            '我在杭州有哪些ecs实例,把它的实例id,价钱以及实例规格详情告诉我',
            '我想了解ecs.u1-c1m4.xlarge和ecs.gn6i-c4g1.xlarge的指标'],inputs=[msg])
    clear = gr.ClearButton([text1,chatbot,text2,msg])
    msg.submit(get_multi_agent_response, [msg,chatbot], [text1,chatbot,text2,msg])

if __name__ == '__main__':
    demo.launch()

运行效果

请您在配置ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRETDASHSCOPE_API_KEY到环境变量后,运行main.py文件。终端页面会有Running on local URL: 的输出,访问对应URL,进入交互界面。输入:我想知道我在杭州的ecs实例,还有我的阿里云余额,或者单击Examples中的示例问题,将其添加到输入框中,并单击Enter,等待结果的生成。您可以观察当前assistant状态框来查看Agent的实时状态。image

总结

通过本教程,您可以了解到将百炼RAG应用与阿里云的OpenAPI能力集成到Agent中的方式,以及使用百炼平台的Assistants API进行Multi Agent开发的步骤,并最终通过基于Gradio的前端界面展示出来。

您可以通过修改Agent中的提示词、细化Agent之间的交互方式、修改工具函数等方法,将Multi Agent应用到您的业务中。