DAS Agent交互最佳实践(集成Chat API)

更新时间:
复制为 MD 格式

Chat接口是DAS大模型能力的异步逻辑接口,支持知识问答、性能诊断、上下文对话等能力,以SSE(Server-Sent Events)数据流形式返回Agent的思考过程与最终回答。本文介绍如何通过Java、Python、Go SDK集成Chat API,包括SSE事件解析和多轮对话的完整示例。

前提条件

  • 已开通DAS Agent,并确认运维实例所在地域与DAS Agent所在国家或地区一致且已完成绑定。

  • 使用阿里云DAS SDK时,建议使用最新版本。

  • 调用DAS服务时,地域需指定为cn-shanghai,Endpointdas.cn-shanghai.aliyuncs.com

  • 已配置ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET环境变量,或使用阿里云默认凭据链。

说明

Chat接口为付费接口,按输入/输出字符计费。详情请参见DAS Agent计量规则

核心事件描述

SSE数据流中的事件遵循ag-ui协议,主要类型如下:

事件类型(Type)

关键字段

说明

RUN_STARTED

RunId

任务启动,标记本次Chat开始。

RUN_FINISHED

RunId

任务结束,后续不再产出事件。

TEXT_MESSAGE_START

MessageIdRole

文本消息开始。Role=user表示用户回显(可忽略),Role=assistant表示模型输出。

TEXT_MESSAGE_CONTENT

MessageIdDelta

消息正文增量片段,相同MessageId的多条事件按顺序拼接即可得到完整文本。

TEXT_MESSAGE_END

MessageId

该条文本消息结束。

ACTIVITY_DELTA

ActivityTypePatch

Agent心跳或状态事件,如waiting_for_agent_thinking,一般可忽略。

TOOL_CALL_START

ToolCallIdToolCallNameParentMessageId

Agent开始发起一次工具调用(如das_api)。

TOOL_CALL_ARGS

ToolCallIdDelta

工具参数按JSON文本流式下发,同一ToolCallIdDelta顺序拼接后即为完整参数。

TOOL_CALL_END

ToolCallId

工具参数下发完毕,即将执行。

TOOL_CALL_RESULT

ToolCallIdContentMessageId

工具执行完成,Content为返回结果文本。

典型时序示例

以"为实例rm-uf63bopu77b*******执行SQL限流"为例,说明完整的SSE事件时序。

1. 任务启动

服务端收到请求后下发RUN_STARTED,标记本轮会话开始,客户端可据此开启计时或初始化UI。

{"Type":"RUN_STARTED","RunId":"58abc22e-5742-4e9b-802e-5f060a0ca2e3"}

2. 用户输入回显(可忽略)

服务端会将本轮用户消息以Role=user的文本消息返回。客户端通常无需展示,按Role过滤即可。

{"Type":"TEXT_MESSAGE_START","Role":"user","MessageId":"20d2bc27-..."}
{"Type":"TEXT_MESSAGE_CONTENT","MessageId":"20d2bc27-...","Delta":"为实例 rm-uf63bopu77b*******执行SQL限流"}
{"Type":"TEXT_MESSAGE_END","MessageId":"20d2bc27-..."}

3. Agent心跳(可忽略)

模型思考阶段会穿插ACTIVITY_DELTA作为心跳事件,可直接跳过。

{"Type":"ACTIVITY_DELTA","ActivityType":"waiting_for_agent_thinking","Patch":[],"MessageId":""}

4. Agent分析输出(Role=assistant)

模型思考结果通过流式TEXT_MESSAGE_CONTENT.Delta下发,相同MessageIdDelta顺序拼接即为完整文本。

{"Type":"TEXT_MESSAGE_START","Role":"assistant","MessageId":"36aaafdb-..."}
{"Type":"TEXT_MESSAGE_CONTENT","MessageId":"36aaafdb-...","Delta":"我需要先查看该实例的SQL执行情况..."}
{"Type":"TEXT_MESSAGE_END","MessageId":"36aaafdb-..."}

5. Agent工具调用

Agent决定调用外部工具(如das_api)时,按TOOL_CALL_START → 多条TOOL_CALL_ARGSTOOL_CALL_ENDTOOL_CALL_RESULT的顺序下发事件。

调用开始

{"Type":"TOOL_CALL_START","ToolCallId":"call_0fd4d0...","ToolCallName":"das_api","ParentMessageId":"36aaafdb-..."}

参数流式下发

多条TOOL_CALL_ARGS.Delta需按同一ToolCallId顺序拼接,聚合后解析为完整JSON参数:

{
  "command": "execute",
  "api_name": "getdassqlloghotdata",
  "parameters": {
    "instance_id": "rm-uf63bopu77b*******",
    "start": "2026-03-05T15:54:16+08:00",
    "end": "2026-03-05T16:54:16+08:00",
    "max_records_per_page": 10,
    "include_fields": ["sql_text", "execution_count", "avg_consume"],
    "security_risk": "LOW"
  }
}

参数结束并返回执行结果

{"Type":"TOOL_CALL_END","ToolCallId":"call_0fd4d0..."}
{"Type":"TOOL_CALL_RESULT","ToolCallId":"call_0fd4d0...","Content":"API执行成功。响应:..."}

6. 任务结束

收到RUN_FINISHED表示本轮SSE流结束,客户端可停止计时、关闭连接。

{"Type":"RUN_FINISHED","RunId":"58abc22e-5742-4e9b-802e-5f060a0ca2e3"}

SDK示例

Java示例

Maven依赖

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>alibabacloud-das20200116</artifactId>
    <version>2.0.0</version>
</dependency>

示例代码

import com.aliyun.auth.credentials.Credential;
import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
import com.aliyun.sdk.gateway.pop.Configuration;
import com.aliyun.sdk.gateway.pop.auth.SignatureVersion;
import com.aliyun.sdk.service.das20200116.AsyncClient;
import com.aliyun.sdk.service.das20200116.models.ChatRequest;
import com.aliyun.sdk.service.das20200116.models.ChatResponseBody;
import darabonba.core.ResponseIterable;
import darabonba.core.client.ClientOverrideConfiguration;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class ChatSample {

    private static AsyncClient createClient() {
        StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder()
                .accessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
                .accessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
                .build());

        return AsyncClient.builder()
                .region("cn-shanghai")
                .credentialsProvider(provider)
                .serviceConfiguration(Configuration.create().setSignatureVersion(SignatureVersion.V3))
                .overrideConfiguration(ClientOverrideConfiguration.create().setProtocol("HTTPS")
                        .setEndpointOverride("das.cn-shanghai.aliyuncs.com"))
                .build();
    }

    private static String buildMessage(String text) {
        String escaped = text.replace("\\", "\\\\").replace("\"", "\\\"");
        return String.format(
                "{\"id\":\"%s\",\"role\":\"user\",\"content\":[{\"type\":\"text\",\"text\":\"%s\"}]}",
                UUID.randomUUID(), escaped);
    }

    private static ChatRequest buildRequest(String text, String sessionId, String agentId, String summary) {
        ChatRequest.Builder builder = ChatRequest.builder().message(buildMessage(text));
        if (sessionId != null && !sessionId.isEmpty()) {
            builder.sessionId(sessionId);
        }
        if (agentId != null && !agentId.isEmpty()) {
            builder.agentId(agentId);
        }
        if (summary != null && !summary.isEmpty()) {
            builder.summary(summary);
        }
        return builder.build();
    }

    private static void run(String text, String sessionId, String agentId, String summary) throws Exception {
        AsyncClient client = createClient();
        ChatRequest request = buildRequest(text, sessionId, agentId, summary);
        ResponseIterable<ChatResponseBody> iterable = client.chatWithResponseIterable(request);
        for (ChatResponseBody event : iterable) {
            String delta = event.getDelta();
            String content = event.getContent();
            String activity = event.getActivityType();
            String extName = event.getName();
            Object extValue = event.getValue();
            if (delta != null && !delta.isEmpty()) {
                System.out.print(delta);
            } else if (content != null && !content.isEmpty()) {
                System.out.println();
                System.out.println("[Content] " + content);
            } else if (activity != null && !activity.isEmpty()) {
                System.out.println();
                System.out.println("[Activity] " + activity);
                System.out.println();
            }
            if ("summary".equals(extName) && extValue != null) {
                System.out.println();
                System.out.println("[Summary] " + extValue);
            }
        }
        System.out.println();
        client.close();
    }

    private static class Args {
        String query = "Please introduce DAS Agent in about 1000 words.";
        String sessionId;
        String agentId;
        String summary;
    }

    private static Args parseArgs(String[] argv) {
        Args args = new Args();
        List<String> positional = new ArrayList<>();
        for (int i = 0; i < argv.length; i++) {
            switch (argv[i]) {
                case "--session-id":
                    args.sessionId = argv[++i];
                    break;
                case "--agent-id":
                    args.agentId = argv[++i];
                    break;
                case "--summary":
                    args.summary = argv[++i];
                    break;
                default:
                    positional.add(argv[i]);
                    break;
            }
        }
        if (!positional.isEmpty()) {
            args.query = String.join(" ", positional);
        }
        return args;
    }

    public static void main(String[] argv) throws Exception {
        Args args = parseArgs(argv);
        run(args.query, args.sessionId, args.agentId, args.summary);
    }
}

运行命令

mvn -q exec:java -Dexec.mainClass=ChatSample -Dexec.args="'麻烦介绍下 DAS Agent,1000 字左右'"

Python示例

安装SDK

pip3 install alibabacloud_das20200116==3.0.0

示例代码

import argparse
import json
import sys
import uuid

from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class ChatSample:

    @staticmethod
    def create_client() -> OpenApiClient:
        credential = CredentialClient()
        config = open_api_models.Config(credential=credential)
        config.endpoint = 'das.cn-shanghai.aliyuncs.com'
        return OpenApiClient(config)

    @staticmethod
    def create_api_info() -> open_api_models.Params:
        return open_api_models.Params(
            action='Chat',
            version='2020-01-16',
            protocol='HTTPS',
            pathname='/chat',
            method='POST',
            auth_type='AK',
            style='RPC',
            req_body_type='json',
            body_type='sse',
        )

    @staticmethod
    def build_message(text: str) -> str:
        message = {
            'id': str(uuid.uuid4()),
            'role': 'user',
            'content': [{'type': 'text', 'text': text}],
        }
        return json.dumps(message, ensure_ascii=False)

    @staticmethod
    def build_request(
        text: str,
        session_id: str = None,
        agent_id: str = None,
        summary: str = None,
    ) -> open_api_models.OpenApiRequest:
        query = {'Message': ChatSample.build_message(text)}
        if not UtilClient.is_unset(session_id):
            query['SessionId'] = session_id
        if not UtilClient.is_unset(agent_id):
            query['AgentId'] = agent_id
        if not UtilClient.is_unset(summary):
            query['Summary'] = summary
        return open_api_models.OpenApiRequest(query=query, headers={})

    @staticmethod
    def parse_event_data(event) -> dict:
        if not hasattr(event, 'data'):
            return {}
        data = event.data
        if isinstance(data, str):
            try:
                data = json.loads(data)
            except json.JSONDecodeError:
                return {}
        return data if isinstance(data, dict) else {}

    @staticmethod
    def run(
        text: str,
        session_id: str = None,
        agent_id: str = None,
        summary: str = None,
    ) -> None:
        client = ChatSample.create_client()
        params = ChatSample.create_api_info()
        runtime = util_models.RuntimeOptions()
        request = ChatSample.build_request(text, session_id, agent_id, summary)

        response = client.call_sseapi(params, request, runtime)

        full_content = []
        for res in response:
            data = ChatSample.parse_event_data(res.event)
            if not data:
                continue

            activity = data.get('ActivityType')
            delta = data.get('Delta')
            content = data.get('Content')
            ext_name = data.get('Name')
            ext_value = data.get('Value')

            if delta:
                print(delta, end='', flush=True)
                full_content.append(delta)
            elif content and not delta:
                print(f"\n[Content] {content}", flush=True)
            elif activity:
                print(f"\n[Activity] {activity}", file=sys.stderr, flush=True)

            if ext_name == 'summary' and ext_value:
                print(f"\n[Summary] {ext_value}", flush=True)

        if full_content:
            print()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description='DAS Chat API sample')
    parser.add_argument(
        'query',
        nargs='?',
        default='Please introduce DAS Agent in about 1000 words.',
        help='user query',
    )
    parser.add_argument('--session-id', dest='session_id', help='session id (UUID)')
    parser.add_argument('--agent-id', dest='agent_id', help='agent id')
    parser.add_argument('--summary', choices=['true', 'false'], help='whether to output summary')
    return parser.parse_args()


if __name__ == '__main__':
    args = parse_args()
    ChatSample.run(
        text=args.query,
        session_id=args.session_id,
        agent_id=args.agent_id,
        summary=args.summary,
    )

运行命令

python3 chat_sample.py "麻烦介绍下 DAS Agent,1000 字左右"
python3 chat_sample.py "继续介绍下慢 SQL 分析" --session-id <UUID> --agent-id <AgentId> --summary true

Go示例

安装SDK

go mod init das_agent_chat_demo
go get github.com/alibabacloud-go/darabonba-openapi/v2/client
go get github.com/alibabacloud-go/tea-utils/v2/service
go get github.com/alibabacloud-go/tea/tea
go get github.com/google/uuid

示例代码

package main

import (
	"encoding/json"
	"flag"
	"fmt"
	"os"
	"strings"

	openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
	openapiutil "github.com/alibabacloud-go/darabonba-openapi/v2/utils"
	"github.com/alibabacloud-go/tea/dara"
	"github.com/alibabacloud-go/tea/tea"
	"github.com/google/uuid"
)

func createClient() (*openapi.Client, error) {
	config := &openapi.Config{
		AccessKeyId:     tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")),
		AccessKeySecret: tea.String(os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")),
		Endpoint:        tea.String("das.cn-shanghai.aliyuncs.com"),
	}
	return openapi.NewClient(config)
}

func createAPIInfo() *openapi.Params {
	return &openapi.Params{
		Action:      tea.String("Chat"),
		Version:     tea.String("2020-01-16"),
		Protocol:    tea.String("HTTPS"),
		Pathname:    tea.String("/chat"),
		Method:      tea.String("POST"),
		AuthType:    tea.String("AK"),
		Style:       tea.String("RPC"),
		ReqBodyType: tea.String("json"),
		BodyType:    tea.String("sse"),
	}
}

func buildMessage(text string) (string, error) {
	payload := map[string]interface{}{
		"id":   uuid.NewString(),
		"role": "user",
		"content": []map[string]string{
			{"type": "text", "text": text},
		},
	}
	bs, err := json.Marshal(payload)
	if err != nil {
		return "", err
	}
	return string(bs), nil
}

func buildRequest(text, sessionID, agentID, summary string) (*openapi.OpenApiRequest, error) {
	msg, err := buildMessage(text)
	if err != nil {
		return nil, err
	}
	query := map[string]interface{}{
		"Message": msg,
	}
	if sessionID != "" {
		query["SessionId"] = sessionID
	}
	if agentID != "" {
		query["AgentId"] = agentID
	}
	if summary != "" {
		query["Summary"] = summary
	}
	return &openapi.OpenApiRequest{
		Query:   openapiutil.Query(query),
		Headers: map[string]*string{},
	}, nil
}

func parseEventData(raw *string) map[string]interface{} {
	if raw == nil || *raw == "" {
		return nil
	}
	var data map[string]interface{}
	if err := json.Unmarshal([]byte(*raw), &data); err != nil {
		return nil
	}
	return data
}

func run(text, sessionID, agentID, summary string) error {
	client, err := createClient()
	if err != nil {
		return err
	}
	params := createAPIInfo()
	request, err := buildRequest(text, sessionID, agentID, summary)
	if err != nil {
		return err
	}
	runtime := &dara.RuntimeOptions{}

	sseChan := make(chan *openapi.SSEResponse, 100)
	errChan := make(chan error, 1)
	go client.CallSSEApi(params, request, runtime, sseChan, errChan)

	for event := range sseChan {
		if event.Event == nil {
			continue
		}
		data := parseEventData(event.Event.Data)
		if data == nil {
			continue
		}

		delta, _ := data["Delta"].(string)
		content, _ := data["Content"].(string)
		activity, _ := data["ActivityType"].(string)
		extName, _ := data["Name"].(string)
		extValue := data["Value"]

		switch {
		case delta != "":
			fmt.Print(delta)
		case content != "":
			fmt.Println()
			fmt.Println("[Content]", content)
		case activity != "":
			fmt.Fprintln(os.Stderr, "[Activity]", activity)
		}

		if extName == "summary" && extValue != nil {
			fmt.Println()
			fmt.Println("[Summary]", extValue)
		}
	}
	fmt.Println()
	if err := <-errChan; err != nil {
		return err
	}
	return nil
}

func main() {
	fs := flag.NewFlagSet("chat_sample", flag.ExitOnError)
	sessionID := fs.String("session-id", "", "session id (UUID)")
	agentID := fs.String("agent-id", "", "agent id")
	summary := fs.String("summary", "", "whether to output summary (true/false)")
	fs.Usage = func() {
		fmt.Fprintf(os.Stderr, "Usage: %s [query] [--session-id ID] [--agent-id ID] [--summary true|false]\n", os.Args[0])
		fs.PrintDefaults()
	}
	if err := fs.Parse(os.Args[1:]); err != nil {
		os.Exit(2)
	}

	query := "Please introduce DAS Agent in about 1000 words."
	if fs.NArg() > 0 {
		query = strings.Join(fs.Args(), " ")
	}

	if err := run(query, *sessionID, *agentID, *summary); err != nil {
		fmt.Fprintln(os.Stderr, "request failed:", err)
		os.Exit(1)
	}
}

运行命令

go run chat_sample.go "麻烦介绍下 DAS Agent,1000 字左右"
go run chat_sample.go "继续介绍下慢 SQL 分析" --session-id <UUID> --agent-id <AgentId> --summary true

注意事项

  • 多轮对话务必传入同一个SessionId,否则模型无法记忆上下文。

  • SSE事件流中会穿插心跳事件(ACTIVITY_DELTA),客户端处理时可直接跳过此类事件。

  • Chat接口按输入/输出字符计费,调试阶段建议先发起简单测试以避免误计费。