import requests
import json
import time
# 配置
BASE_URL_CONTROL = "https://agentrun.{regionid}.aliyuncs.com"
BASE_URL_DATA = "https://{accountid}.agentrun-data.{regionid}.aliyuncs.com"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer YOUR_TOKEN"
}
# 1. 创建代码解释器
def create_interpreter():
payload = {
"codeInterpreterName": "data-analysis-interpreter",
"description": "用于数据分析的代码解释器",
"networkConfiguration": {
"networkMode": "PUBLIC"
},
"cpu": 4,
"memory": 8192,
"sessionIdleTimeoutSeconds": 3600
}
response = requests.post(
f"{BASE_URL_CONTROL}/agents/code-interpreters",
headers=headers,
data=json.dumps(payload)
)
result = response.json()
interpreter_id = result["data"]["codeInterpreterId"]
# 等待创建完成
while True:
status_response = requests.get(
f"{BASE_URL_CONTROL}/agents/code-interpreters/{interpreter_id}",
headers=headers
)
status = status_response.json()["data"]["status"]
if status == "READY":
break
elif status in ["CREATE_FAILED", "DELETE_FAILED"]:
raise Exception(f"创建失败: {status}")
time.sleep(10)
return interpreter_id
# 2. 启动会话
def start_session(interpreter_id):
payload = {
"sessionIdleTimeoutSeconds": 3600
}
response = requests.post(
f"{BASE_URL_CONTROL}/agents/code-interpreters/{interpreter_id}/sessions",
headers=headers,
data=json.dumps(payload)
)
result = response.json()
return result["data"]["sessionId"]
# 3. 创建执行上下文
def create_context(interpreter_id, session_id):
headers_with_session = headers.copy()
headers_with_session["X-AgentRun-Session-ID"] = session_id
payload = {
"name": "data-analysis",
"config": {}
}
response = requests.post(
f"{BASE_URL_DATA}/agents/code-interpreters/{interpreter_id}/contexts",
headers=headers_with_session,
data=json.dumps(payload)
)
result = response.json()
return result["data"]["id"]
# 4. 上传数据文件
def upload_data_file(interpreter_id, session_id, file_path):
headers_with_session = {
"X-AgentRun-Session-ID": session_id,
"Authorization": headers["Authorization"]
}
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(
f"{BASE_URL_DATA}/agents/code-interpreters/{interpreter_id}/files",
headers=headers_with_session,
files=files,
params={'path': '/workspace/data.csv'}
)
return response.json()
# 5. 执行数据分析代码
def execute_analysis(interpreter_id, session_id, context_id):
headers_with_session = headers.copy()
headers_with_session["X-AgentRun-Session-ID"] = session_id
analysis_code = """
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
# 读取数据
df = pd.read_csv('/workspace/data.csv')
print(f"数据形状: {df.shape}")
print(f"数据列: {df.columns.tolist()}")
# 基本统计信息
print("\\n基本统计信息:")
print(df.describe())
# 检查缺失值
print(f"\\n缺失值情况:")
print(df.isnull().sum())
# 数据可视化
if len(df.columns) >= 2:
plt.figure(figsize=(10, 6))
# 如果是数值列,创建散点图
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) >= 2:
plt.scatter(df[numeric_cols[0]], df[numeric_cols[1]])
plt.xlabel(numeric_cols[0])
plt.ylabel(numeric_cols[1])
plt.title('数据散点图')
plt.savefig('/workspace/analysis_plot.png', dpi=150, bbox_inches='tight')
print("\\n已生成散点图: /workspace/analysis_plot.png")
plt.close()
# 保存处理后的数据
if 'processed_data' not in locals():
processed_data = df.copy()
# 数据清洗示例
processed_data = processed_data.dropna()
processed_data.to_csv('/workspace/processed_data.csv', index=False)
print(f"\\n已保存处理后的数据: /workspace/processed_data.csv")
print(f"处理后数据形状: {processed_data.shape}")
"""
payload = {
"code": analysis_code,
"timeout": 120
}
response = requests.post(
f"{BASE_URL_DATA}/agents/code-interpreters/{interpreter_id}/contexts/{context_id}/execute",
headers=headers_with_session,
data=json.dumps(payload)
)
result = response.json()
print("代码执行结果:")
print(result["data"])
return result
# 6. 下载结果文件
def download_result(interpreter_id, session_id, file_path, local_path):
headers_with_session = {
"X-AgentRun-Session-ID": session_id,
"Authorization": headers["Authorization"]
}
response = requests.get(
f"{BASE_URL_DATA}/agents/code-interpreters/{interpreter_id}/files",
headers=headers_with_session,
params={'path': file_path}
)
if response.status_code == 200:
with open(local_path, 'wb') as f:
f.write(response.content)
print(f"文件已下载到: {local_path}")
else:
print(f"下载失败: {response.status_code}")
# 7. 清理资源
def cleanup(interpreter_id, session_id, context_id):
headers_with_session = headers.copy()
headers_with_session["X-AgentRun-Session-ID"] = session_id
# 删除上下文
requests.delete(
f"{BASE_URL_DATA}/agents/code-interpreters/{interpreter_id}/contexts/{context_id}",
headers=headers_with_session
)
# 停止会话
requests.delete(
f"{BASE_URL_CONTROL}/agents/code-interpreters/{interpreter_id}/sessions/{session_id}",
headers=headers
)
# 删除代码解释器(可选)
# requests.delete(
# f"{BASE_URL_CONTROL}/agents/code-interpreters/{interpreter_id}",
# headers=headers
# )
# 主工作流
def main():
try:
# 1. 创建代码解释器
print("创建代码解释器...")
interpreter_id = create_interpreter()
print(f"代码解释器创建成功: {interpreter_id}")
# 2. 启动会话
print("启动会话...")
session_id = start_session(interpreter_id)
print(f"会话启动成功: {session_id}")
# 3. 创建执行上下文
print("创建执行上下文...")
context_id = create_context(interpreter_id, session_id)
print(f"上下文创建成功: {context_id}")
# 4. 上传数据文件
print("上传数据文件...")
upload_result = upload_data_file(interpreter_id, session_id, "local_data.csv")
print("文件上传成功")
# 5. 执行数据分析
print("执行数据分析...")
analysis_result = execute_analysis(interpreter_id, session_id, context_id)
# 6. 下载结果文件
print("下载结果文件...")
download_result(interpreter_id, session_id, "/workspace/processed_data.csv", "processed_data.csv")
download_result(interpreter_id, session_id, "/workspace/analysis_plot.png", "analysis_plot.png")
print("数据分析工作流完成!")
except Exception as e:
print(f"工作流执行失败: {str(e)}")
finally:
# 7. 清理资源
if 'interpreter_id' in locals() and 'session_id' in locals():
print("清理资源...")
cleanup(interpreter_id, session_id, context_id)
print("资源清理完成")
if __name__ == "__main__":
main()
以上示例展示了:
代码解释器的完整生命周期管理
数据文件的上传和下载
复杂数据分析代码的执行
结果文件的处理
适当的错误处理和资源清理