在您使用MCP官方源码开发符合OAuth 2.0规范的授权流程,并准备在Agent中集成OpenAPI MCP Server时,本文将为您提供全面且详尽的指导。
自定义OAuth授权流程
示例代码基于MCP官方源码实现了OAuth授权流程。如需修改OAuth授权流程,请调整handle_redirect
和handle_callback
中的相关代码。
重要
在生产环境中,应根据实际情况妥善存储OAuth Token。本文将以InMemoryTokenStorage为例进行演示。
# oauth_handler.py
import asyncio
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer
import threading
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.shared.auth import OAuthToken, OAuthClientInformationFull
from urllib.parse import parse_qs, urlparse
class InMemoryTokenStorage(TokenStorage):
"""Demo In-memory token storage implementation."""
def __init__(self):
self.tokens: OAuthToken | None = None
self.client_info: OAuthClientInformationFull | None = None
async def get_tokens(self) -> OAuthToken | None:
"""Get stored tokens."""
return self.tokens
async def set_tokens(self, tokens: OAuthToken) -> None:
"""Store tokens."""
self.tokens = tokens
async def get_client_info(self) -> OAuthClientInformationFull | None:
"""Get stored client information."""
return self.client_info
async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
"""Store client information."""
self.client_info = client_info
class CallbackHandler(BaseHTTPRequestHandler):
"""HTTP handler for OAuth callback."""
def __init__(self, callback_server, *args, **kwargs):
self.callback_server = callback_server
super().__init__(*args, **kwargs)
def do_GET(self):
"""Handle GET request for OAuth callback."""
try:
# 解析回调URL中的参数
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
if 'code' in params:
# 获取授权码
code = params['code'][0]
state = params.get('state', [None])[0]
# 存储结果
self.callback_server.auth_code = code
self.callback_server.auth_state = state
self.callback_server.auth_received = True
# 返回成功页面
self.send_response(200)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
success_html = """<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>授权成功</title>
</head>
<body>
<h1>授权成功</h1>
<p>您已完成授权,可以返回应用继续使用。</p>
<p>窗口将在 <span id="countdown">3</span> 秒后自动关闭。</p>
<button onclick="window.close()">立即关闭</button>
<script>
let count = 3;
const el = document.getElementById('countdown');
const timer = setInterval(() => {
count--;
el.textContent = count;
if (count <= 0) {
clearInterval(timer);
window.close();
}
}, 1000);
</script>
</body>
</html>
"""
self.wfile.write(success_html.encode('utf-8'))
elif 'error' in params:
# 处理错误
error = params['error'][0]
error_description = params.get('error_description', ['Unknown error'])[0]
self.callback_server.auth_error = f"{error}: {error_description}"
self.callback_server.auth_received = True
# 返回错误页面
self.send_response(400)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
error_html = f"""<!DOCTYPE html>
<html lang=\"zh-CN\">
<head>
<meta charset=\"UTF-8\">
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
<title>授权失败</title>
</head>
<body>
<h1>授权失败</h1>
<p>在授权过程中发生错误。</p>
<p><strong>错误代码:</strong>{error}</p>
<p><strong>错误描述:</strong>{error_description}</p>
<button onclick=\"window.close()\">关闭窗口</button>
</body>
</html>
"""
self.wfile.write(error_html.encode('utf-8'))
except Exception as e:
self.callback_server.auth_error = str(e)
self.callback_server.auth_received = True
self.send_response(500)
self.send_header('Content-type', 'text/html; charset=utf-8')
self.end_headers()
internal_error_html = f"""<!DOCTYPE html>
<html lang=\"zh-CN\">
<head>
<meta charset=\"UTF-8\">
<meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\">
<title>服务器错误</title>
</head>
<body>
<h1>服务器内部错误</h1>
<p>抱歉,服务器遇到了一个内部错误,无法完成您的请求。</p>
<pre>{str(e)}</pre>
<button onclick=\"window.close()\">关闭窗口</button>
</body>
</html>
"""
self.wfile.write(internal_error_html.encode('utf-8'))
def log_message(self, format, *args):
"""静默日志输出"""
pass
class CallbackServer:
"""OAuth 回调服务器"""
def __init__(self, port=3000):
self.port = port
self.server = None
self.thread = None
self.auth_code = None
self.auth_state = None
self.auth_error = None
self.auth_received = False
def start(self):
"""启动回调服务器"""
handler = lambda *args, **kwargs: CallbackHandler(self, *args, **kwargs)
self.server = HTTPServer(('localhost', self.port), handler)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
print(f"OAuth 回调服务器已启动,监听端口 {self.port}")
def stop(self):
"""停止回调服务器"""
if self.server:
self.server.shutdown()
self.server.server_close()
if self.thread:
self.thread.join(timeout=1)
print("OAuth 回调服务器已停止")
async def wait_for_callback(self, timeout=300):
"""等待OAuth回调"""
start_time = asyncio.get_event_loop().time()
while not self.auth_received:
if asyncio.get_event_loop().time() - start_time > timeout:
raise TimeoutError("等待OAuth回调超时")
await asyncio.sleep(0.1)
if self.auth_error:
raise Exception(f"OAuth授权失败: {self.auth_error}")
return self.auth_code, self.auth_state
# 全局回调服务器实例
_callback_server = None
async def handle_redirect(auth_url: str) -> None:
"""自动打开浏览器进行OAuth授权"""
global _callback_server
# 启动回调服务器
if _callback_server is None:
_callback_server = CallbackServer(port=3000)
_callback_server.start()
print(f"正在打开浏览器进行OAuth授权...")
print(f"授权URL: {auth_url}")
# 自动打开浏览器
webbrowser.open(auth_url)
async def handle_callback() -> tuple[str, str | None]:
"""自动处理OAuth回调"""
global _callback_server
if _callback_server is None:
raise Exception("回调服务器未启动")
print("等待OAuth授权完成...")
try:
# 等待回调
code, state = await _callback_server.wait_for_callback()
print("OAuth授权成功!")
return code, state
except Exception as e:
print(f"OAuth授权失败: {e}")
raise
finally:
# 清理服务器状态,但保持服务器运行以便重用
_callback_server.auth_code = None
_callback_server.auth_state = None
_callback_server.auth_error = None
_callback_server.auth_received = False
在Agent中集成MCP
本文将利用主流Agent框架,通过OAuth认证实现与OpenAPI MCP Server的连接,并结合大模型与MCP Tools完成对阿里云资源的操作。
AgentScope
AgentScope是阿里巴巴开源的Agent框架,支持智能体工具管理、智能体长期记忆控制和智能化RAG等。
# -*- coding: utf-8 -*-
"""The main entry point of the ReAct agent example."""
import asyncio
import os
from agentscope.agent import ReActAgent, UserAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.memory import InMemoryMemory
from agentscope.model import DashScopeChatModel
from agentscope.tool import (
Toolkit,
execute_shell_command,
execute_python_code,
view_text_file,
)
from agentscope.mcp import HttpStatelessClient
from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl
from oauth_handler import InMemoryTokenStorage, handle_redirect, handle_callback
# openai base
# read from .env
load_dotenv()
server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/14******/custom/****/id/KXy******/mcp"
memory_token_storage = InMemoryTokenStorage()
oauth_provider = OAuthClientProvider(
server_url=server_url,
client_metadata=OAuthClientMetadata(
client_name="AgentScopeExampleClient",
redirect_uris=[AnyUrl("http://localhost:3000/callback")],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=None,
),
storage=memory_token_storage,
redirect_handler=handle_redirect,
callback_handler=handle_callback,
)
stateless_client = HttpStatelessClient(
# 用于标识 MCP 的名称
name="mcp_services_stateless",
transport="streamable_http",
url=server_url,
auth=oauth_provider,
)
async def main() -> None:
"""The main entry point for the ReAct agent example."""
toolkit = Toolkit()
# toolkit.register_tool_function(execute_shell_command)
# toolkit.register_tool_function(execute_python_code)
# toolkit.register_tool_function(view_text_file)
await toolkit.register_mcp_client(stateless_client)
agent = ReActAgent(
name="AlibabaCloudOpsAgent",
sys_prompt="你是阿里云运维助手,善于使用各种阿里云产品如 ECS、RDS、VPC 等,完成我的需求。",
model=DashScopeChatModel(
api_key=os.environ.get("DASHSCOPE_API_KEY"),
model_name="qwen3-max-preview",
enable_thinking=False,
stream=True,
),
formatter=DashScopeChatFormatter(),
toolkit=toolkit,
memory=InMemoryMemory(),
)
user = UserAgent("User")
msg = None
while True:
msg = await user(msg)
if msg.get_text_content() == "exit":
break
msg = await agent(msg)
asyncio.run(main())
效果演示:
LangGraph
LangGraph是一个用于构建、管理和部署长期运行、有状态智能体的底层编排框架。
import asyncio
import sys
from dotenv import load_dotenv
import os
from langgraph.prebuilt import create_react_agent
from langchain.chat_models import init_chat_model
from langchain_mcp_adapters.client import MultiServerMCPClient
from mcp.client.auth import OAuthClientProvider, OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from pydantic import AnyUrl
from oauth_handler import InMemoryTokenStorage, handle_callback, handle_redirect
# openai base
# read from .env
load_dotenv()
async def make_agent():
model = init_chat_model(model=os.getenv("OPENAI_MODEL"), api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"), model_provider='openai')
# 使用与 MCP 服务相同的服务器 URL
server_url = "https://openapi-mcp.cn-hangzhou.aliyuncs.com/accounts/1025904068912955/custom/test-ecs/id/1kB196nPAhRIbH1z/mcp"
oauth_provider = OAuthClientProvider(
server_url=server_url,
client_metadata=OAuthClientMetadata(
client_name="Example MCP Client",
redirect_uris=[AnyUrl("http://localhost:3000/callback")],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=None,
),
storage=InMemoryTokenStorage(),
redirect_handler=handle_redirect,
callback_handler=handle_callback,
)
mcp_client = MultiServerMCPClient(
{
"resourcecenter": {
"url": server_url,
"transport": "streamable_http",
"auth": oauth_provider
}
}
)
tools = await mcp_client.get_tools()
agent = create_react_agent(
model=model,
tools=tools,
prompt="You are a helpful assistant"
)
return agent
async def chat_loop():
"""对话循环"""
# 创建代理
print("正在初始化AI助手...")
agent = await make_agent()
print("AI助手已就绪!输入 'quit' 或 'exit' 退出\n")
# 对话历史
messages = []
while True:
try:
# 获取用户输入
user_input = input("用户: ").strip()
# 检查退出命令
if user_input.lower() in ['quit', 'exit', '退出']:
print("再见!")
break
# 跳过空输入
if not user_input:
continue
# 添加用户消息到历史
messages.append({"role": "user", "content": user_input})
print("AI: ", end="", flush=True)
# 调用代理
response = await agent.ainvoke(
{"messages": messages},
{"recursion_limit": 50}
)
# 提取AI回复
ai_response = response["messages"][-1].content
print(ai_response)
# 添加AI回复到历史
messages.append({"role": "assistant", "content": ai_response})
print() # 空行分隔
except KeyboardInterrupt:
print("\n\n再见!")
break
except Exception as e:
print(f"错误: {e}")
print("请重试...\n")
async def main():
await chat_loop()
# 运行主函数
if __name__ == "__main__":
asyncio.run(main())
效果演示:
该文章对您有帮助吗?