2.4 客户端高级功能
在前面的章节中,我们学习了MCP客户端的基础知识。本章将探讨更高级的客户端功能,包括错误处理、重试策略、流式数据处理和自定义传输实现。
错误处理和重试策略
构建健壮的MCP客户端应用需要妥善处理各种错误情况和实现合适的重试策略。
全面的错误处理
有效的错误处理应考虑以下几类错误:
from mcp import ClientSession
from mcp.errors import (
ConnectionError, # 连接错误
TimeoutError, # 超时错误
ResourceNotFoundError, # 资源未找到
ResourcePermissionError, # 资源权限错误
ToolExecutionError, # 工具执行错误
ToolInputError, # 工具输入错误
ProtocolError, # 协议错误
ServerError # 服务器内部错误
)
async def robust_client_operation():
try:
# 执行操作...
result = await session.call_tool("example_tool")
except ConnectionError as e:
print(f"连接错误: {e}")
# 处理连接问题
except TimeoutError as e:
print(f"超时错误: {e}")
# 处理超时
except ResourceNotFoundError as e:
print(f"资源未找到: {e}")
# 处理缺失资源
except ResourcePermissionError as e:
print(f"资源权限错误: {e}")
# 处理权限问题
except ToolExecutionError as e:
print(f"工具执行错误: {e}")
# 处理工具执行问题
except ToolInputError as e:
print(f"工具输入错误: {e}")
# 处理输入验证问题
except ProtocolError as e:
print(f"协议错误: {e}")
# 处理协议问题
except ServerError as e:
print(f"服务器错误: {e}")
# 处理服务器内部错误
except Exception as e:
print(f"未预期的错误: {e}")
# 处理其他未预期错误
实现重试策略
对于临时性错误,实现重试策略是很有用的:
import asyncio
from functools import wraps
# 使用装饰器实现重试
def with_retry(max_retries=3, base_delay=1.0, backoff_factor=2.0):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
retries = 0
last_exception = None
while retries <= max_retries:
try:
return await func(*args, **kwargs)
except (ConnectionError, TimeoutError) as e:
last_exception = e
retries += 1
if retries <= max_retries:
# 计算退避延迟
delay = base_delay * (backoff_factor ** (retries - 1))
print(f"操作失败 ({e}), {retries}/{max_retries}次重试, 等待{delay}秒...")
await asyncio.sleep(delay)
else:
print(f"达到最大重试次数({max_retries}),操作失败")
raise
except Exception as e:
# 对于其他错误不重试
print(f"遇到不可重试的错误: {e}")
raise
raise last_exception
return wrapper
return decorator
# 使用重试装饰器
@with_retry(max_retries=3, base_delay=2.0)
async def call_with_retry(session, tool_name, **tool_args):
return await session.call_tool(tool_name, arguments=tool_args)
错误分类和处理策略
不同类型的错误需要不同的处理策略:
| 错误类型 | 可重试? | 处理策略 |
|---|---|---|
| ConnectionError | ✅ | 重试连接,使用指数退避 |
| TimeoutError | ✅ | 重试,可能增加超时时间 |
| ResourceNotFoundError | ❌ | 报告错误,检查资源参数 |
| ResourcePermissionError | ❌ | 报告权限问题,可能需要提升权限 |
| ToolExecutionError | 视情况 | 检查错误详情决定是否重试 |
| ToolInputError | ❌ | 修正输入参数后重试 |
| ProtocolError | 视情况 | 如果是版本不兼容则需升级客户端 |
| ServerError | ✅ | 可以重试,但频繁出现则报告问题 |
流式数据处理
MCP支持流式数据处理,这对于处理大型资源或长时间运行的工具非常有用。
流式读取资源
对于大型资源,可以使用流式处理避免一次性加载全部内容:
async def stream_large_resource(session, resource_uri):
"""流式处理大型资源"""
chunk_size = 1024 * 1024 # 1MB 块大小
# 开始流式请求
async with session.stream_resource(resource_uri) as stream:
# 读取元数据
mime_type = stream.mime_type
total_read = 0
# 分块读取内容
async for chunk in stream.iter_content(chunk_size):
total_read += len(chunk)
print(f"已读取: {total_read} 字节")
# 处理数据块
process_chunk(chunk)
处理长时间运行的工具
对于长时间运行的工具,可以使用流式处理获取实时进度:
async def monitor_long_running_tool(session, tool_name, **tool_args):
"""监控长时间运行的工具执行"""
# 启动工具执行
async with session.stream_tool_execution(tool_name, arguments=tool_args) as stream:
# 处理进度更新
async for update in stream.iter_updates():
if update.type == "progress":
print(f"进度: {update.progress}%")
print(f"状态: {update.status}")
elif update.type == "partial_result":
print(f"部分结果: {update.content}")
# 获取最终结果
final_result = stream.result
print(f"最终结果: {final_result}")
流式提示处理
处理大型提示模板:
async def process_streaming_prompt(session, prompt_name, **prompt_args):
"""处理流式提示"""
async with session.stream_prompt(prompt_name, arguments=prompt_args) as stream:
async for message in stream.iter_messages():
print(f"角色: {message.role}")
print(f"内容: {message.content.text}")
实现自定义传输层
MCP允许实现自定义传输层,以支持特定的通信需求。
基本传输接口
创建自定义传输需要实现读写接口:
from typing import Any, AsyncGenerator, Dict, Optional
from mcp.transport.base import Reader, Writer, Message
class CustomReader(Reader):
"""自定义读取器实现"""
def __init__(self, connection):
self.connection = connection
async def read(self) -> AsyncGenerator[Message, None]:
"""读取消息的生成器"""
while True:
# 从连接获取数据
data = await self.connection.receive()
if not data:
break
# 解析数据为消息
message = Message.parse(data)
yield message
class CustomWriter(Writer):
"""自定义写入器实现"""
def __init__(self, connection):
self.connection = connection
async def write(self, message: Message) -> None:
"""写入消息"""
# 序列化消息
data = message.serialize()
# 发送到连接
await self.connection.send(data)
完整传输实现示例
一个基于WebSocket的自定义传输实现:
import json
import websockets
from typing import AsyncGenerator, Dict, Optional, Tuple
from mcp.transport.base import Reader, Writer, Message
class WebSocketTransport:
"""WebSocket传输实现"""
def __init__(self, uri: str, headers: Optional[Dict[str, str]] = None):
self.uri = uri
self.headers = headers or {}
self.connection = None
async def __aenter__(self) -> Tuple[Reader, Writer]:
"""建立连接并返回读写器"""
# 连接到WebSocket服务器
self.connection = await websockets.connect(
self.uri,
extra_headers=self.headers
)
# 创建读写器
reader = WebSocketReader(self.connection)
writer = WebSocketWriter(self.connection)
return reader, writer
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""关闭连接"""
if self.connection:
await self.connection.close()
class WebSocketReader(Reader):
"""WebSocket读取器"""
def __init__(self, connection):
self.connection = connection
async def read(self) -> AsyncGenerator[Message, None]:
"""从WebSocket读取消息"""
try:
while True:
# 接收WebSocket消息
data = await self.connection.recv()
# 解析JSON消息
if isinstance(data, str):
json_data = json.loads(data)
message = Message(
id=json_data.get("id"),
method=json_data.get("method"),
params=json_data.get("params"),
result=json_data.get("result"),
error=json_data.get("error")
)
yield message
except websockets.exceptions.ConnectionClosed:
# 连接关闭
return
class WebSocketWriter(Writer):
"""WebSocket写入器"""
def __init__(self, connection):
self.connection = connection
async def write(self, message: Message) -> None:
"""向WebSocket写入消息"""
# 构建JSON-RPC消息
json_data = {
"jsonrpc": "2.0",
"id": message.id
}
if message.method:
json_data["method"] = message.method
if message.params:
json_data["params"] = message.params
else:
if message.result is not None:
json_data["result"] = message.result
if message.error:
json_data["error"] = message.error
# 发送JSON消息
await self.connection.send(json.dumps(json_data))
# 使用自定义传输
async def use_custom_transport():
async with WebSocketTransport("ws://example.com/mcp") as (reader, writer):
async with ClientSession(reader, writer) as session:
await session.initialize()
# 使用会话...
处理不同格式的返回值
MCP可以返回各种格式的数据,客户端需要能够正确处理这些不同格式。
文本内容处理
from mcp.types import TextContent
async def handle_text_result(session, tool_name):
result = await session.call_tool(tool_name)
for item in result.content:
if isinstance(item, TextContent):
print(f"文本内容: {item.text}")
# 处理文本上的注释(如果有)
if item.annotations:
for annotation in item.annotations:
print(f"注释: {annotation.text} (位置: {annotation.span})")
结构化数据处理
import json
from mcp.types import JSONContent
async def handle_json_result(session, tool_name):
result = await session.call_tool(tool_name)
for item in result.content:
if hasattr(item, 'type') and item.type == 'json':
# 处理JSON数据
data = item.data
print(f"JSON数据: {json.dumps(data, indent=2)}")
# 可以直接访问JSON字段
if 'items' in data:
for i, item in enumerate(data['items']):
print(f"项目 {i+1}: {item['name']}")
图像和二进制数据处理
import aiohttp
import io
from PIL import Image
from mcp.types import ImageContent, FileContent
async def handle_image_result(session, tool_name):
result = await session.call_tool(tool_name)
for item in result.content:
if hasattr(item, 'type'):
if item.type == 'image':
# 处理图像URL
image_url = item.url
# 可能需要下载图像
if image_url.startswith('http'):
async with aiohttp.ClientSession() as http_session:
async with http_session.get(image_url) as response:
image_data = await response.read()
# 使用PIL处理图像
image = Image.open(io.BytesIO(image_data))
print(f"图像大小: {image.size}")
# 还可以访问alt文本
print(f"图像描述: {item.alt_text}")
elif item.type == 'file':
# 处理文件内容
filename = item.filename
content_type = item.content_type
file_data = item.content
print(f"文件名: {filename}")
print(f"内容类型: {content_type}")
print(f"文件大小: {len(file_data)} 字节")
# 保存文件
with open(f"downloaded_{filename}", "wb") as f:
f.write(file_data)
客户端与LLM集成的最佳实践
将MCP客户端与大语言模型集成时,有一些最佳实践可以遵循。
提供结构化上下文
async def provide_context_to_llm(session, model, query):
"""为LLM提供结构化上下文"""
# 收集相关资源
try:
user_data, _ = await session.read_resource("user://profile")
documentation, _ = await session.read_resource("docs://relevant-section")
# 构建上下文
context = f"""
用户信息:
{user_data}
相关文档:
{documentation}
"""
# 将上下文提供给模型
response = await model.generate(context + "\n\n" + query)
return response
except Exception as e:
print(f"获取上下文出错: {e}")
# 降级处理:仅使用查询
return await model.generate(query)
处理工具调用
import json
import re
async def handle_tool_calls(session, model_response):
"""处理模型响应中的工具调用"""
# 匹配工具调用格式
# 假设模型输出类似: "使用工具: tool_name(arg1="value1", arg2="value2")"
tool_pattern = r'使用工具:\s+(\w+)\(([^)]+)\)'
matches = re.findall(tool_pattern, model_response)
results = []
for tool_name, args_str in matches:
# 解析参数
args = {}
arg_pairs = args_str.split(',')
for pair in arg_pairs:
if '=' in pair:
key, value = pair.split('=', 1)
key = key.strip()
value = value.strip().strip('"\'')
args[key] = value
try:
# 调用工具
result = await session.call_tool(tool_name, arguments=args)
# 格式化结果
formatted_result = f"工具 {tool_name} 结果:\n"
for item in result.content:
if hasattr(item, 'text'):
formatted_result += item.text
results.append(formatted_result)
except Exception as e:
results.append(f"工具 {tool_name} 调用失败: {e}")
# 返回所有工具结果
return "\n\n".join(results)
流式输出集成
async def streaming_llm_with_mcp(session, model, query):
"""将MCP与流式LLM输出集成"""
# 创建一个处理器来拦截潜在的工具调用
async def process_stream(stream):
buffer = ""
tool_pattern = r'使用工具:\s+(\w+)\(([^)]+)\)'
async for chunk in stream:
buffer += chunk
# 检查是否有完整的工具调用
match = re.search(tool_pattern, buffer)
if match:
tool_part = match.group(0)
tool_name = match.group(1)
args_str = match.group(2)
# 解析参数
args = {}
arg_pairs = args_str.split(',')
for pair in arg_pairs:
if '=' in pair:
key, value = pair.split('=', 1)
args[key] = value.strip().strip('"\'')
try:
# 调用工具
result = await session.call_tool(tool_name, arguments=args)
# 格式化结果
tool_result = ""
for item in result.content:
if hasattr(item, 'text'):
tool_result += item.text
# 替换工具调用为结果
buffer = buffer.replace(tool_part, f"{tool_part}\n结果: {tool_result}")
except Exception as e:
# 标记错误
buffer = buffer.replace(tool_part, f"{tool_part}\n错误: {e}")
# 输出当前缓冲区
yield buffer
buffer = ""
# 获取模型流式输出
model_stream = await model.generate_stream(query)
# 处理流式输出
return process_stream(model_stream)
进阶示例:AI助手与MCP工具集成
下面是一个将MCP客户端与AI助手模型集成的综合示例:
import asyncio
import json
import re
from typing import Dict, List, Any, AsyncGenerator, Optional
from mcp import ClientSession
from mcp.client.sse import sse_client
class MCPEnabledAIAssistant:
"""支持MCP工具的AI助手"""
def __init__(self, llm_client, mcp_url: str):
self.llm_client = llm_client # LLM客户端(如OpenAI, Anthropic等)
self.mcp_url = mcp_url
self.session = None
self.available_tools = []
self.available_resources = []
async def initialize(self):
"""初始化MCP会话和工具列表"""
try:
# 连接到MCP服务器
reader, writer = await sse_client(self.mcp_url).__aenter__()
self.session = ClientSession(reader, writer)
await self.session.__aenter__()
# 初始化连接
await self.session.initialize()
# 获取可用工具和资源
tools_response = await self.session.list_tools()
if hasattr(tools_response, 'tools'):
self.available_tools = tools_response.tools
resources_response = await self.session.list_resources()
if hasattr(resources_response, 'resources'):
self.available_resources = resources_response.resources
# 构建系统提示
self._system_prompt = await self._build_system_prompt()
return True
except Exception as e:
print(f"初始化错误: {e}")
return False
async def _build_system_prompt(self) -> str:
"""构建包含工具和资源信息的系统提示"""
tools_description = "可用工具:\n"
for tool in self.available_tools:
tools_description += f"- {tool.name}: {tool.description}\n"
# 添加参数信息
if hasattr(tool, 'arguments') and tool.arguments:
if 'properties' in tool.arguments:
tools_description += " 参数:\n"
for param_name, param_info in tool.arguments['properties'].items():
param_type = param_info.get('type', 'any')
param_desc = param_info.get('description', '无描述')
tools_description += f" - {param_name} ({param_type}): {param_desc}\n"
resources_description = "可用资源:\n"
for resource in self.available_resources:
resources_description += f"- {resource}\n"
system_prompt = f"""
你是一个智能助手,可以调用外部工具和访问资源来帮助用户。
当你需要使用工具时,使用以下格式:
使用工具: tool_name(param1="value1", param2="value2")
当你需要访问资源时,使用以下格式:
访问资源: resource://path
{tools_description}
{resources_description}
请尽可能使用这些工具和资源来回答用户问题。
"""
return system_prompt
async def chat(self, user_message: str) -> AsyncGenerator[str, None]:
"""与用户聊天,支持工具调用"""
if not self.session:
yield "错误: MCP会话未初始化"
return
try:
# 创建LLM请求
messages = [
{"role": "system", "content": self._system_prompt},
{"role": "user", "content": user_message}
]
# 获取LLM流式响应
buffer = ""
async for chunk in self.llm_client.chat.completions.create(
model="gpt-4", # 或其他模型
messages=messages,
stream=True
):
if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
buffer += content
# 检查是否有工具调用
tool_match = re.search(r'使用工具:\s+(\w+)\(([^)]+)\)', buffer)
if tool_match:
tool_part = tool_match.group(0)
tool_name = tool_match.group(1)
args_str = tool_match.group(2)
# 解析工具参数
args = {}
arg_pairs = args_str.split(',')
for pair in arg_pairs:
if '=' in pair:
key, value = pair.split('=', 1)
key = key.strip()
value = value.strip().strip('"\'')
args[key] = value
try:
# 调用工具
tool_result = await self.session.call_tool(tool_name, arguments=args)
# 格式化结果
result_text = ""
for item in tool_result.content:
if hasattr(item, 'text'):
result_text += item.text
# 替换缓冲区中的工具调用
buffer = buffer.replace(tool_part, f"{tool_part}\n结果: {result_text}")
except Exception as e:
# 标记错误
buffer = buffer.replace(tool_part, f"{tool_part}\n错误: {str(e)}")
# 检查是否有资源访问
resource_match = re.search(r'访问资源:\s+([\w:/\-{}.]+)', buffer)
if resource_match:
resource_part = resource_match.group(0)
resource_uri = resource_match.group(1)
try:
# 读取资源
resource_content, _ = await self.session.read_resource(resource_uri)
# 替换缓冲区中的资源访问
buffer = buffer.replace(resource_part, f"{resource_part}\n内容: {resource_content}")
except Exception as e:
# 标记错误
buffer = buffer.replace(resource_part, f"{resource_part}\n错误: {str(e)}")
# 返回当前缓冲区
yield buffer
buffer = ""
except Exception as e:
yield f"处理消息时出错: {str(e)}"
async def close(self):
"""关闭MCP会话"""
if self.session:
await self.session.__aexit__(None, None, None)
# 使用示例
async def main():
# 初始化LLM客户端(示例使用OpenAI,实际应用可以替换)
import openai
openai_client = openai.AsyncClient(api_key="your-api-key")
# 创建AI助手
assistant = MCPEnabledAIAssistant(
llm_client=openai_client,
mcp_url="http://localhost:8000/sse"
)
# 初始化
success = await assistant.initialize()
if not success:
print("初始化失败")
return
# 处理用户输入
user_input = "我需要查询明天北京的天气,并帮我找一下关于人工智能的最新研究"
async for response_chunk in assistant.chat(user_input):
print(response_chunk, end="", flush=True)
# 关闭会话
await assistant.close()
if __name__ == "__main__":
asyncio.run(main())
这个示例展示了:
- 如何将MCP客户端集成到AI助手中
- 如何在LLM响应中检测和处理工具调用
- 如何处理资源引用
- 如何在流式输出中实时更新结果
小结
在本章中,我们探讨了MCP客户端的高级功能:
- 错误处理和重试策略
- 流式数据处理方法
- 实现自定义传输层
- 处理不同格式的返回值
- 客户端与LLM集成的最佳实践
- 通过AI助手示例,展示了MCP工具与大语言模型的集成
掌握这些高级功能将帮助您构建更强大、更灵活的MCP客户端应用。在下一章中,我们将探讨服务器生命周期和上下文管理。