2.4_客户端高级功能

2.4 客户端高级功能

在前面的章节中,我们学习了MCP客户端的基础知识。本章将探讨更高级的客户端功能,包括错误处理、重试策略、流式数据处理和自定义传输实现。

错误处理和重试策略

构建健壮的MCP客户端应用需要妥善处理各种错误情况和实现合适的重试策略。

全面的错误处理

有效的错误处理应考虑以下几类错误:

from mcp import ClientSession
from mcp.errors import (
    ConnectionError,           # 连接错误
    TimeoutError,              # 超时错误
    ResourceNotFoundError,     # 资源未找到
    ResourcePermissionError,   # 资源权限错误
    ToolExecutionError,        # 工具执行错误
    ToolInputError,            # 工具输入错误
    ProtocolError,             # 协议错误
    ServerError                # 服务器内部错误
)

async def robust_client_operation():
    try:
        # 执行操作...
        result = await session.call_tool("example_tool")
        
    except ConnectionError as e:
        print(f"连接错误: {e}")
        # 处理连接问题
        
    except TimeoutError as e:
        print(f"超时错误: {e}")
        # 处理超时
        
    except ResourceNotFoundError as e:
        print(f"资源未找到: {e}")
        # 处理缺失资源
        
    except ResourcePermissionError as e:
        print(f"资源权限错误: {e}")
        # 处理权限问题
        
    except ToolExecutionError as e:
        print(f"工具执行错误: {e}")
        # 处理工具执行问题
        
    except ToolInputError as e:
        print(f"工具输入错误: {e}")
        # 处理输入验证问题
        
    except ProtocolError as e:
        print(f"协议错误: {e}")
        # 处理协议问题
        
    except ServerError as e:
        print(f"服务器错误: {e}")
        # 处理服务器内部错误
        
    except Exception as e:
        print(f"未预期的错误: {e}")
        # 处理其他未预期错误

实现重试策略

对于临时性错误,实现重试策略是很有用的:

import asyncio
from functools import wraps

# 使用装饰器实现重试
def with_retry(max_retries=3, base_delay=1.0, backoff_factor=2.0):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            retries = 0
            last_exception = None
            
            while retries <= max_retries:
                try:
                    return await func(*args, **kwargs)
                except (ConnectionError, TimeoutError) as e:
                    last_exception = e
                    retries += 1
                    
                    if retries <= max_retries:
                        # 计算退避延迟
                        delay = base_delay * (backoff_factor ** (retries - 1))
                        print(f"操作失败 ({e}), {retries}/{max_retries}次重试, 等待{delay}秒...")
                        await asyncio.sleep(delay)
                    else:
                        print(f"达到最大重试次数({max_retries}),操作失败")
                        raise
                except Exception as e:
                    # 对于其他错误不重试
                    print(f"遇到不可重试的错误: {e}")
                    raise
            
            raise last_exception
        
        return wrapper
    
    return decorator

# 使用重试装饰器
@with_retry(max_retries=3, base_delay=2.0)
async def call_with_retry(session, tool_name, **tool_args):
    return await session.call_tool(tool_name, arguments=tool_args)

错误分类和处理策略

不同类型的错误需要不同的处理策略:

错误类型 可重试? 处理策略
ConnectionError 重试连接,使用指数退避
TimeoutError 重试,可能增加超时时间
ResourceNotFoundError 报告错误,检查资源参数
ResourcePermissionError 报告权限问题,可能需要提升权限
ToolExecutionError 视情况 检查错误详情决定是否重试
ToolInputError 修正输入参数后重试
ProtocolError 视情况 如果是版本不兼容则需升级客户端
ServerError 可以重试,但频繁出现则报告问题

流式数据处理

MCP支持流式数据处理,这对于处理大型资源或长时间运行的工具非常有用。

流式读取资源

对于大型资源,可以使用流式处理避免一次性加载全部内容:

async def stream_large_resource(session, resource_uri):
    """流式处理大型资源"""
    chunk_size = 1024 * 1024  # 1MB 块大小
    
    # 开始流式请求
    async with session.stream_resource(resource_uri) as stream:
        # 读取元数据
        mime_type = stream.mime_type
        
        total_read = 0
        # 分块读取内容
        async for chunk in stream.iter_content(chunk_size):
            total_read += len(chunk)
            print(f"已读取: {total_read} 字节")
            
            # 处理数据块
            process_chunk(chunk)

处理长时间运行的工具

对于长时间运行的工具,可以使用流式处理获取实时进度:

async def monitor_long_running_tool(session, tool_name, **tool_args):
    """监控长时间运行的工具执行"""
    
    # 启动工具执行
    async with session.stream_tool_execution(tool_name, arguments=tool_args) as stream:
        # 处理进度更新
        async for update in stream.iter_updates():
            if update.type == "progress":
                print(f"进度: {update.progress}%")
                print(f"状态: {update.status}")
            elif update.type == "partial_result":
                print(f"部分结果: {update.content}")
    
    # 获取最终结果
    final_result = stream.result
    print(f"最终结果: {final_result}")

流式提示处理

处理大型提示模板:

async def process_streaming_prompt(session, prompt_name, **prompt_args):
    """处理流式提示"""
    
    async with session.stream_prompt(prompt_name, arguments=prompt_args) as stream:
        async for message in stream.iter_messages():
            print(f"角色: {message.role}")
            print(f"内容: {message.content.text}")

实现自定义传输层

MCP允许实现自定义传输层,以支持特定的通信需求。

基本传输接口

创建自定义传输需要实现读写接口:

from typing import Any, AsyncGenerator, Dict, Optional
from mcp.transport.base import Reader, Writer, Message

class CustomReader(Reader):
    """自定义读取器实现"""
    
    def __init__(self, connection):
        self.connection = connection
    
    async def read(self) -> AsyncGenerator[Message, None]:
        """读取消息的生成器"""
        while True:
            # 从连接获取数据
            data = await self.connection.receive()
            
            if not data:
                break
            
            # 解析数据为消息
            message = Message.parse(data)
            yield message

class CustomWriter(Writer):
    """自定义写入器实现"""
    
    def __init__(self, connection):
        self.connection = connection
    
    async def write(self, message: Message) -> None:
        """写入消息"""
        # 序列化消息
        data = message.serialize()
        
        # 发送到连接
        await self.connection.send(data)

完整传输实现示例

一个基于WebSocket的自定义传输实现:

import json
import websockets
from typing import AsyncGenerator, Dict, Optional, Tuple
from mcp.transport.base import Reader, Writer, Message

class WebSocketTransport:
    """WebSocket传输实现"""
    
    def __init__(self, uri: str, headers: Optional[Dict[str, str]] = None):
        self.uri = uri
        self.headers = headers or {}
        self.connection = None
    
    async def __aenter__(self) -> Tuple[Reader, Writer]:
        """建立连接并返回读写器"""
        # 连接到WebSocket服务器
        self.connection = await websockets.connect(
            self.uri,
            extra_headers=self.headers
        )
        
        # 创建读写器
        reader = WebSocketReader(self.connection)
        writer = WebSocketWriter(self.connection)
        
        return reader, writer
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """关闭连接"""
        if self.connection:
            await self.connection.close()

class WebSocketReader(Reader):
    """WebSocket读取器"""
    
    def __init__(self, connection):
        self.connection = connection
    
    async def read(self) -> AsyncGenerator[Message, None]:
        """从WebSocket读取消息"""
        try:
            while True:
                # 接收WebSocket消息
                data = await self.connection.recv()
                
                # 解析JSON消息
                if isinstance(data, str):
                    json_data = json.loads(data)
                    message = Message(
                        id=json_data.get("id"),
                        method=json_data.get("method"),
                        params=json_data.get("params"),
                        result=json_data.get("result"),
                        error=json_data.get("error")
                    )
                    yield message
        except websockets.exceptions.ConnectionClosed:
            # 连接关闭
            return

class WebSocketWriter(Writer):
    """WebSocket写入器"""
    
    def __init__(self, connection):
        self.connection = connection
    
    async def write(self, message: Message) -> None:
        """向WebSocket写入消息"""
        # 构建JSON-RPC消息
        json_data = {
            "jsonrpc": "2.0",
            "id": message.id
        }
        
        if message.method:
            json_data["method"] = message.method
            if message.params:
                json_data["params"] = message.params
        else:
            if message.result is not None:
                json_data["result"] = message.result
            if message.error:
                json_data["error"] = message.error
        
        # 发送JSON消息
        await self.connection.send(json.dumps(json_data))

# 使用自定义传输
async def use_custom_transport():
    async with WebSocketTransport("ws://example.com/mcp") as (reader, writer):
        async with ClientSession(reader, writer) as session:
            await session.initialize()
            # 使用会话...

处理不同格式的返回值

MCP可以返回各种格式的数据,客户端需要能够正确处理这些不同格式。

文本内容处理

from mcp.types import TextContent

async def handle_text_result(session, tool_name):
    result = await session.call_tool(tool_name)
    
    for item in result.content:
        if isinstance(item, TextContent):
            print(f"文本内容: {item.text}")
            
            # 处理文本上的注释(如果有)
            if item.annotations:
                for annotation in item.annotations:
                    print(f"注释: {annotation.text} (位置: {annotation.span})")

结构化数据处理

import json
from mcp.types import JSONContent

async def handle_json_result(session, tool_name):
    result = await session.call_tool(tool_name)
    
    for item in result.content:
        if hasattr(item, 'type') and item.type == 'json':
            # 处理JSON数据
            data = item.data
            print(f"JSON数据: {json.dumps(data, indent=2)}")
            
            # 可以直接访问JSON字段
            if 'items' in data:
                for i, item in enumerate(data['items']):
                    print(f"项目 {i+1}: {item['name']}")

图像和二进制数据处理

import aiohttp
import io
from PIL import Image
from mcp.types import ImageContent, FileContent

async def handle_image_result(session, tool_name):
    result = await session.call_tool(tool_name)
    
    for item in result.content:
        if hasattr(item, 'type'):
            if item.type == 'image':
                # 处理图像URL
                image_url = item.url
                
                # 可能需要下载图像
                if image_url.startswith('http'):
                    async with aiohttp.ClientSession() as http_session:
                        async with http_session.get(image_url) as response:
                            image_data = await response.read()
                            
                            # 使用PIL处理图像
                            image = Image.open(io.BytesIO(image_data))
                            print(f"图像大小: {image.size}")
                
                # 还可以访问alt文本
                print(f"图像描述: {item.alt_text}")
            
            elif item.type == 'file':
                # 处理文件内容
                filename = item.filename
                content_type = item.content_type
                file_data = item.content
                
                print(f"文件名: {filename}")
                print(f"内容类型: {content_type}")
                print(f"文件大小: {len(file_data)} 字节")
                
                # 保存文件
                with open(f"downloaded_{filename}", "wb") as f:
                    f.write(file_data)

客户端与LLM集成的最佳实践

将MCP客户端与大语言模型集成时,有一些最佳实践可以遵循。

提供结构化上下文

async def provide_context_to_llm(session, model, query):
    """为LLM提供结构化上下文"""
    
    # 收集相关资源
    try:
        user_data, _ = await session.read_resource("user://profile")
        documentation, _ = await session.read_resource("docs://relevant-section")
        
        # 构建上下文
        context = f"""
        用户信息:
        {user_data}
        
        相关文档:
        {documentation}
        """
        
        # 将上下文提供给模型
        response = await model.generate(context + "\n\n" + query)
        return response
    
    except Exception as e:
        print(f"获取上下文出错: {e}")
        # 降级处理:仅使用查询
        return await model.generate(query)

处理工具调用

import json
import re

async def handle_tool_calls(session, model_response):
    """处理模型响应中的工具调用"""
    
    # 匹配工具调用格式
    # 假设模型输出类似: "使用工具: tool_name(arg1="value1", arg2="value2")"
    tool_pattern = r'使用工具:\s+(\w+)\(([^)]+)\)'
    matches = re.findall(tool_pattern, model_response)
    
    results = []
    for tool_name, args_str in matches:
        # 解析参数
        args = {}
        arg_pairs = args_str.split(',')
        for pair in arg_pairs:
            if '=' in pair:
                key, value = pair.split('=', 1)
                key = key.strip()
                value = value.strip().strip('"\'')
                args[key] = value
        
        try:
            # 调用工具
            result = await session.call_tool(tool_name, arguments=args)
            
            # 格式化结果
            formatted_result = f"工具 {tool_name} 结果:\n"
            for item in result.content:
                if hasattr(item, 'text'):
                    formatted_result += item.text
            
            results.append(formatted_result)
        
        except Exception as e:
            results.append(f"工具 {tool_name} 调用失败: {e}")
    
    # 返回所有工具结果
    return "\n\n".join(results)

流式输出集成

async def streaming_llm_with_mcp(session, model, query):
    """将MCP与流式LLM输出集成"""
    
    # 创建一个处理器来拦截潜在的工具调用
    async def process_stream(stream):
        buffer = ""
        tool_pattern = r'使用工具:\s+(\w+)\(([^)]+)\)'
        
        async for chunk in stream:
            buffer += chunk
            
            # 检查是否有完整的工具调用
            match = re.search(tool_pattern, buffer)
            if match:
                tool_part = match.group(0)
                tool_name = match.group(1)
                args_str = match.group(2)
                
                # 解析参数
                args = {}
                arg_pairs = args_str.split(',')
                for pair in arg_pairs:
                    if '=' in pair:
                        key, value = pair.split('=', 1)
                        args[key] = value.strip().strip('"\'')
                
                try:
                    # 调用工具
                    result = await session.call_tool(tool_name, arguments=args)
                    
                    # 格式化结果
                    tool_result = ""
                    for item in result.content:
                        if hasattr(item, 'text'):
                            tool_result += item.text
                    
                    # 替换工具调用为结果
                    buffer = buffer.replace(tool_part, f"{tool_part}\n结果: {tool_result}")
                
                except Exception as e:
                    # 标记错误
                    buffer = buffer.replace(tool_part, f"{tool_part}\n错误: {e}")
            
            # 输出当前缓冲区
            yield buffer
            buffer = ""
    
    # 获取模型流式输出
    model_stream = await model.generate_stream(query)
    
    # 处理流式输出
    return process_stream(model_stream)

进阶示例:AI助手与MCP工具集成

下面是一个将MCP客户端与AI助手模型集成的综合示例:

import asyncio
import json
import re
from typing import Dict, List, Any, AsyncGenerator, Optional
from mcp import ClientSession
from mcp.client.sse import sse_client

class MCPEnabledAIAssistant:
    """支持MCP工具的AI助手"""
    
    def __init__(self, llm_client, mcp_url: str):
        self.llm_client = llm_client  # LLM客户端(如OpenAI, Anthropic等)
        self.mcp_url = mcp_url
        self.session = None
        self.available_tools = []
        self.available_resources = []
    
    async def initialize(self):
        """初始化MCP会话和工具列表"""
        try:
            # 连接到MCP服务器
            reader, writer = await sse_client(self.mcp_url).__aenter__()
            self.session = ClientSession(reader, writer)
            await self.session.__aenter__()
            
            # 初始化连接
            await self.session.initialize()
            
            # 获取可用工具和资源
            tools_response = await self.session.list_tools()
            if hasattr(tools_response, 'tools'):
                self.available_tools = tools_response.tools
            
            resources_response = await self.session.list_resources()
            if hasattr(resources_response, 'resources'):
                self.available_resources = resources_response.resources
            
            # 构建系统提示
            self._system_prompt = await self._build_system_prompt()
            
            return True
        
        except Exception as e:
            print(f"初始化错误: {e}")
            return False
    
    async def _build_system_prompt(self) -> str:
        """构建包含工具和资源信息的系统提示"""
        
        tools_description = "可用工具:\n"
        for tool in self.available_tools:
            tools_description += f"- {tool.name}: {tool.description}\n"
            
            # 添加参数信息
            if hasattr(tool, 'arguments') and tool.arguments:
                if 'properties' in tool.arguments:
                    tools_description += "  参数:\n"
                    for param_name, param_info in tool.arguments['properties'].items():
                        param_type = param_info.get('type', 'any')
                        param_desc = param_info.get('description', '无描述')
                        tools_description += f"  - {param_name} ({param_type}): {param_desc}\n"
        
        resources_description = "可用资源:\n"
        for resource in self.available_resources:
            resources_description += f"- {resource}\n"
        
        system_prompt = f"""
        你是一个智能助手,可以调用外部工具和访问资源来帮助用户。
        
        当你需要使用工具时,使用以下格式:
        使用工具: tool_name(param1="value1", param2="value2")
        
        当你需要访问资源时,使用以下格式:
        访问资源: resource://path
        
        {tools_description}
        
        {resources_description}
        
        请尽可能使用这些工具和资源来回答用户问题。
        """
        
        return system_prompt
    
    async def chat(self, user_message: str) -> AsyncGenerator[str, None]:
        """与用户聊天,支持工具调用"""
        if not self.session:
            yield "错误: MCP会话未初始化"
            return
        
        try:
            # 创建LLM请求
            messages = [
                {"role": "system", "content": self._system_prompt},
                {"role": "user", "content": user_message}
            ]
            
            # 获取LLM流式响应
            buffer = ""
            async for chunk in self.llm_client.chat.completions.create(
                model="gpt-4",  # 或其他模型
                messages=messages,
                stream=True
            ):
                if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content:
                    content = chunk.choices[0].delta.content
                    buffer += content
                    
                    # 检查是否有工具调用
                    tool_match = re.search(r'使用工具:\s+(\w+)\(([^)]+)\)', buffer)
                    if tool_match:
                        tool_part = tool_match.group(0)
                        tool_name = tool_match.group(1)
                        args_str = tool_match.group(2)
                        
                        # 解析工具参数
                        args = {}
                        arg_pairs = args_str.split(',')
                        for pair in arg_pairs:
                            if '=' in pair:
                                key, value = pair.split('=', 1)
                                key = key.strip()
                                value = value.strip().strip('"\'')
                                args[key] = value
                        
                        try:
                            # 调用工具
                            tool_result = await self.session.call_tool(tool_name, arguments=args)
                            
                            # 格式化结果
                            result_text = ""
                            for item in tool_result.content:
                                if hasattr(item, 'text'):
                                    result_text += item.text
                            
                            # 替换缓冲区中的工具调用
                            buffer = buffer.replace(tool_part, f"{tool_part}\n结果: {result_text}")
                        
                        except Exception as e:
                            # 标记错误
                            buffer = buffer.replace(tool_part, f"{tool_part}\n错误: {str(e)}")
                    
                    # 检查是否有资源访问
                    resource_match = re.search(r'访问资源:\s+([\w:/\-{}.]+)', buffer)
                    if resource_match:
                        resource_part = resource_match.group(0)
                        resource_uri = resource_match.group(1)
                        
                        try:
                            # 读取资源
                            resource_content, _ = await self.session.read_resource(resource_uri)
                            
                            # 替换缓冲区中的资源访问
                            buffer = buffer.replace(resource_part, f"{resource_part}\n内容: {resource_content}")
                        
                        except Exception as e:
                            # 标记错误
                            buffer = buffer.replace(resource_part, f"{resource_part}\n错误: {str(e)}")
                    
                    # 返回当前缓冲区
                    yield buffer
                    buffer = ""
        
        except Exception as e:
            yield f"处理消息时出错: {str(e)}"
    
    async def close(self):
        """关闭MCP会话"""
        if self.session:
            await self.session.__aexit__(None, None, None)

# 使用示例
async def main():
    # 初始化LLM客户端(示例使用OpenAI,实际应用可以替换)
    import openai
    openai_client = openai.AsyncClient(api_key="your-api-key")
    
    # 创建AI助手
    assistant = MCPEnabledAIAssistant(
        llm_client=openai_client,
        mcp_url="http://localhost:8000/sse"
    )
    
    # 初始化
    success = await assistant.initialize()
    if not success:
        print("初始化失败")
        return
    
    # 处理用户输入
    user_input = "我需要查询明天北京的天气,并帮我找一下关于人工智能的最新研究"
    
    async for response_chunk in assistant.chat(user_input):
        print(response_chunk, end="", flush=True)
    
    # 关闭会话
    await assistant.close()

if __name__ == "__main__":
    asyncio.run(main())

这个示例展示了:

  1. 如何将MCP客户端集成到AI助手中
  2. 如何在LLM响应中检测和处理工具调用
  3. 如何处理资源引用
  4. 如何在流式输出中实时更新结果

小结

在本章中,我们探讨了MCP客户端的高级功能:

  • 错误处理和重试策略
  • 流式数据处理方法
  • 实现自定义传输层
  • 处理不同格式的返回值
  • 客户端与LLM集成的最佳实践
  • 通过AI助手示例,展示了MCP工具与大语言模型的集成

掌握这些高级功能将帮助您构建更强大、更灵活的MCP客户端应用。在下一章中,我们将探讨服务器生命周期和上下文管理。

使用 Hugo 构建
主题 StackJimmy 设计