Das Agent 数据流接口,可以进行知识问答,性能诊断等功能。注意,该接口是付费接口,会按照输出字符进行计费,具体可参考 DAS Agent计费说明。
JAVA示例
import com.aliyun.auth.credentials.Credential;
import com.aliyun.auth.credentials.provider.StaticCredentialProvider;
import com.aliyun.sdk.gateway.pop.Configuration;
import com.aliyun.sdk.gateway.pop.auth.SignatureVersion;
import com.aliyun.sdk.service.das20200116.models.GetDasAgentSSERequest;
import com.aliyun.sdk.service.das20200116.models.GetDasAgentSSEResponseBody;
import com.aliyun.sdk.service.das20200116.models.*;
import com.aliyun.sdk.service.das20200116.*;
import darabonba.core.ResponseIterable;
import darabonba.core.client.ClientOverrideConfiguration;
import java.util.*;
import java.io.*;
public class GetDasAgentSSESample {
public static void main(String[] args) throws Exception {
StaticCredentialProvider provider = StaticCredentialProvider.create(Credential.builder()
.accessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
.accessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
.build());
AsyncClient client = AsyncClient.builder()
.region("cn-shanghai")
.credentialsProvider(provider)
.serviceConfiguration(Configuration.create().setSignatureVersion(SignatureVersion.V3))
.overrideConfiguration(ClientOverrideConfiguration.create().setProtocol("HTTPS")
.setEndpointOverride("das.cn-shanghai.aliyuncs.com"))
.build();
GetDasAgentSSERequest request = GetDasAgentSSERequest.builder()
.query("麻烦介绍下DAS Agent,相关文字在1000字左右").build();
ResponseIterable<GetDasAgentSSEResponseBody> x = client.getDasAgentSSEWithResponseIterable(request);
for (GetDasAgentSSEResponseBody event : x) {
System.out.printf(event.getAnswer());
}
client.close();
}
}Python示例
# -*- coding: utf-8 -*-
import os
import sys
import json
import asyncio
from typing import List
from alibabacloud_das20200116.client import Client as DAS20200116Client
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_das20200116 import models as das20200116_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class GetDasAgentSseSample:
def __init__(self):
pass
@staticmethod
def create_client() -> DAS20200116Client:
"""
使用凭据初始化账号Client
@return: Client
@throws Exception
"""
credential = CredentialClient()
config = open_api_models.Config(
credential=credential
)
config.endpoint = f'das.cn-shanghai.aliyuncs.com'
return DAS20200116Client(config)
@staticmethod
def create_api_info() -> open_api_models.Params:
params = open_api_models.Params(
action='GetDasAgentSSE',
version='2020-01-16',
protocol='HTTPS',
pathname='/',
method='POST',
auth_type='AK',
style='RPC',
req_body_type='json',
body_type='sse'
)
return params
@staticmethod
def build_open_api_request(request_obj) -> open_api_models.OpenApiRequest:
query_dict = {}
for attr_name in dir(request_obj):
if attr_name.startswith('_'):
continue
if callable(getattr(request_obj, attr_name, None)):
continue
try:
value = getattr(request_obj, attr_name)
except AttributeError:
continue
if UtilClient.is_unset(value):
continue
api_key = ''.join(word.capitalize() for word in attr_name.split('_'))
query_dict[api_key] = value
return open_api_models.OpenApiRequest(
query=query_dict,
headers={}
)
@staticmethod
def run() -> None:
client = GetDasAgentSseSample.create_client()
params = GetDasAgentSseSample.create_api_info()
runtime = util_models.RuntimeOptions()
# 创建请求对象(可以通过命令行参数传递 query)
get_das_agent_sserequest = das20200116_models.GetDasAgentSSERequest(
query='麻烦介绍下DAS Agent,相关文字在1000字左右'
)
# 转换为 OpenApiRequest 格式
open_api_request = GetDasAgentSseSample.build_open_api_request(get_das_agent_sserequest)
response = client.call_sseapi(params, open_api_request, runtime)
for res in response:
answer = None
if hasattr(res.event, 'data'):
# event.data 可能是字符串或字典
event_data = res.event.data
if isinstance(event_data, str):
try:
event_data = json.loads(event_data)
except json.JSONDecodeError:
pass
if isinstance(event_data, dict):
answer = event_data.get('Answer')
if answer:
print(answer, end='', flush=True)
if __name__ == '__main__':
GetDasAgentSseSample.run()该文章对您有帮助吗?