1.4_客户端基础

1.4 客户端基础

在前一章中,我们学习了如何创建MCP服务器。在本章中,我们将深入探讨MCP客户端的基础知识,包括如何创建客户端、连接到服务器、执行基本操作以及处理会话生命周期。

创建MCP客户端

MCP客户端是与MCP服务器交互的程序,它负责发送请求、接收响应,并将功能暴露给应用程序或大语言模型。创建MCP客户端的第一步是选择合适的传输方式。

选择传输方式

MCP支持多种传输方式,最常用的是:

  1. stdio - 通过标准输入/输出流通信,适用于本地运行的服务器
  2. SSE (Server-Sent Events) - 通过HTTP连接通信,适用于远程服务器
  3. WebSocket - 通过WebSocket协议通信,适用于需要高性能双向通信的场景

下面是创建不同传输方式客户端的基本模式:

Stdio客户端

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def create_stdio_client():
    # 定义服务器参数
    server_params = StdioServerParameters(
        command="python",  # 可执行命令
        args=["path/to/server.py"],  # 命令参数
        env=None,  # 可选的环境变量
    )
    
    # 创建客户端连接
    async with stdio_client(server_params) as (read, write):
        # 创建会话
        async with ClientSession(read, write) as session:
            # 使用会话执行操作
            await session.initialize()
            # ... 其他操作 ...

SSE客户端

from mcp import ClientSession
from mcp.client.sse import sse_client

async def create_sse_client():
    # 服务器URL
    server_url = "http://localhost:8000/sse"
    
    # 可选的HTTP头信息
    headers = {
        "Authorization": "Bearer your-token",
    }
    
    # 创建客户端连接
    async with sse_client(server_url, headers=headers) as (read, write):
        # 创建会话
        async with ClientSession(read, write) as session:
            # 使用会话执行操作
            await session.initialize()
            # ... 其他操作 ...

WebSocket客户端

from mcp import ClientSession
from mcp.client.websocket import websocket_client

async def create_websocket_client():
    # 服务器URL
    server_url = "ws://localhost:8000/ws"
    
    # 可选的HTTP头信息
    headers = {
        "Authorization": "Bearer your-token",
    }
    
    # 创建客户端连接
    async with websocket_client(server_url, headers=headers) as (read, write):
        # 创建会话
        async with ClientSession(read, write) as session:
            # 使用会话执行操作
            await session.initialize()
            # ... 其他操作 ...

ClientSession参数

除了基本的读写流,ClientSession构造函数还接受多个可选参数:

async with ClientSession(
    read,
    write,
    sampling_callback=None,  # 用于创建消息的回调函数
    request_id_generator=None,  # 自定义请求ID生成器
    client_id=None,  # 客户端ID
) as session:
    # 使用会话

其中,sampling_callback是一个特殊的回调函数,用于在服务器请求生成内容时调用。这在集成大语言模型时特别有用。

连接到服务器

一旦创建了客户端,下一步是连接到服务器并初始化会话。

初始化连接

初始化是客户端与服务器交互的第一步。在这个阶段,客户端和服务器交换基本信息,包括协议版本、支持的功能等。

# 初始化连接
await session.initialize()

# 如果需要,可以指定自定义选项
await session.initialize(
    client_name="My Client",
    client_version="1.0.0",
)

初始化过程会返回服务器支持的功能列表,但通常不需要直接处理这些信息,因为ClientSession会自动管理功能兼容性。

连接错误处理

连接到服务器可能会遇到各种错误,应当妥善处理:

try:
    # 尝试连接和初始化
    await session.initialize()
    print("连接成功!")
except ConnectionError as e:
    print(f"连接错误: {e}")
except TimeoutError as e:
    print(f"连接超时: {e}")
except Exception as e:
    print(f"其他错误: {e}")

特别是对于SSE和WebSocket连接,可能需要设置超时和重试策略:

# SSE连接超时设置
async with sse_client(server_url, timeout=10, sse_read_timeout=60) as (read, write):
    # ...

基本操作

一旦成功连接到服务器,客户端可以执行各种操作,包括列出资源/工具/提示、读取资源、调用工具和获取提示。

列出资源

# 列出所有可用资源
resources = await session.list_resources()
print(f"可用资源: {resources}")

# 分页列出资源(适用于资源较多的情况)
first_page = await session.list_resources(limit=10)
cursor = first_page.nextCursor
if cursor:
    next_page = await session.list_resources(cursor=cursor, limit=10)

列出工具

# 列出所有可用工具
tools = await session.list_tools()
print(f"可用工具: {tools}")

# 分页列出工具
first_page = await session.list_tools(limit=10)
cursor = first_page.nextCursor
if cursor:
    next_page = await session.list_tools(cursor=cursor, limit=10)

列出提示

# 列出所有可用提示
prompts = await session.list_prompts()
print(f"可用提示: {prompts}")

# 分页列出提示
first_page = await session.list_prompts(limit=10)
cursor = first_page.nextCursor
if cursor:
    next_page = await session.list_prompts(cursor=cursor, limit=10)

读取资源

# 读取资源(返回内容和MIME类型)
content, mime_type = await session.read_resource("resource://path")

# 读取带参数的资源
content, mime_type = await session.read_resource("user://{user_id}", {"user_id": "123"})

对于资源URI中的参数,有两种方式处理:

  1. 在URI中直接替换:"user://123"
  2. 使用参数字典:"user://{user_id}" + {"user_id": "123"}

调用工具

# 调用不带参数的工具
result = await session.call_tool("tool-name")

# 调用带参数的工具
result = await session.call_tool("calculate", arguments={
    "operation": "add",
    "a": 5,
    "b": 3
})

# 处理工具结果
if result.isError:
    print(f"工具调用错误: {result.content}")
else:
    # 工具可能返回不同类型的内容
    for content_item in result.content:
        if content_item.type == "text":
            print(f"文本结果: {content_item.text}")
        elif content_item.type == "image":
            print(f"图片结果: {content_item.url}")

获取提示

# 获取不带参数的提示
prompt = await session.get_prompt("prompt-name")

# 获取带参数的提示
prompt = await session.get_prompt("welcome-message", arguments={
    "user_name": "张三",
    "time_of_day": "上午"
})

# 处理提示内容
for message in prompt.messages:
    print(f"角色: {message.role}")
    content = message.content
    if hasattr(content, "text"):
        print(f"内容: {content.text}")

处理会话和连接生命周期

MCP客户端会话和连接有明确的生命周期,正确管理这些生命周期对于资源利用和错误处理至关重要。

会话生命周期

MCP会话的标准生命周期如下:

  1. 创建会话 - 初始化ClientSession对象
  2. 连接到服务器 - 调用initialize()方法
  3. 执行操作 - 列出资源/工具/提示,读取资源,调用工具等
  4. 关闭会话 - 释放资源

使用异步上下文管理器可以自动处理会话的创建和关闭:

async with ClientSession(read, write) as session:
    # 会话在这里有效
    await session.initialize()
    # ... 执行操作 ...
# 会话在这里自动关闭

连接断开处理

在长时间运行的应用中,可能需要处理连接断开和重新连接:

import asyncio

async def run_with_reconnect():
    max_retries = 3
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            async with sse_client(server_url) as (read, write):
                async with ClientSession(read, write) as session:
                    await session.initialize()
                    print("连接成功!")
                    
                    # 执行操作
                    while True:
                        try:
                            # 执行周期性操作
                            result = await session.call_tool("heartbeat")
                            print("心跳成功")
                            await asyncio.sleep(60)  # 每分钟检查一次
                        except Exception as e:
                            print(f"操作错误,尝试恢复: {e}")
                            break
            
        except Exception as e:
            retry_count += 1
            wait_time = 2 ** retry_count  # 指数退避
            print(f"连接断开: {e}")
            print(f"将在 {wait_time} 秒后尝试重新连接...")
            await asyncio.sleep(wait_time)
        else:
            # 正常退出循环
            break
    
    if retry_count >= max_retries:
        print("达到最大重试次数,放弃连接")

资源清理

确保在任何情况下都能正确清理资源:

async def safe_client_run():
    client = None
    session = None
    
    try:
        # 创建客户端和会话
        client_resources = await stdio_client(server_params).__aenter__()
        read, write = client_resources
        
        session = ClientSession(read, write)
        await session.__aenter__()
        
        # 执行操作
        await session.initialize()
        # ... 其他操作 ...
        
    except Exception as e:
        print(f"发生错误: {e}")
    finally:
        # 确保资源被清理
        if session:
            await session.__aexit__(None, None, None)
        if client:
            await client.__aexit__(None, None, None)

不过,通常推荐使用异步上下文管理器(async with)来自动处理这些清理工作。

示例:构建简单命令行客户端

让我们将所学知识结合起来,构建一个简单的命令行客户端,用于与MCP服务器交互:

#!/usr/bin/env python3
import asyncio
import argparse
import sys
from typing import Optional, List, Dict, Any

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

class MCPClient:
    def __init__(self):
        self.session: Optional[ClientSession] = None
        self.read = None
        self.write = None
    
    async def connect(self, transport: str, server_path: Optional[str] = None, server_url: Optional[str] = None):
        """连接到MCP服务器"""
        if transport == "stdio":
            if not server_path:
                raise ValueError("使用stdio传输时必须提供server_path")
            
            server_params = StdioServerParameters(
                command="python",
                args=[server_path],
            )
            
            self.read, self.write = await stdio_client(server_params).__aenter__()
        
        elif transport == "sse":
            if not server_url:
                raise ValueError("使用SSE传输时必须提供server_url")
            
            self.read, self.write = await sse_client(server_url).__aenter__()
        
        else:
            raise ValueError(f"不支持的传输方式: {transport}")
        
        self.session = await ClientSession(self.read, self.write).__aenter__()
        await self.session.initialize()
        print("已连接到服务器")
    
    async def disconnect(self):
        """断开与服务器的连接"""
        if self.session:
            await self.session.__aexit__(None, None, None)
            self.session = None
            print("已断开连接")
    
    async def list_resources(self):
        """列出可用资源"""
        if not self.session:
            print("错误: 未连接到服务器")
            return
        
        resources = await self.session.list_resources()
        print("\n可用资源:")
        if resources and hasattr(resources, 'resources'):
            for resource in resources.resources:
                print(f"  - {resource}")
        else:
            print("  (没有可用资源)")
    
    async def list_tools(self):
        """列出可用工具"""
        if not self.session:
            print("错误: 未连接到服务器")
            return
        
        tools = await self.session.list_tools()
        print("\n可用工具:")
        if tools and hasattr(tools, 'tools'):
            for tool in tools.tools:
                print(f"  - {tool.name}: {tool.description}")
        else:
            print("  (没有可用工具)")
    
    async def list_prompts(self):
        """列出可用提示"""
        if not self.session:
            print("错误: 未连接到服务器")
            return
        
        prompts = await self.session.list_prompts()
        print("\n可用提示:")
        if prompts and hasattr(prompts, 'prompts'):
            for prompt in prompts.prompts:
                print(f"  - {prompt.name}: {prompt.description}")
        else:
            print("  (没有可用提示)")
    
    async def read_resource(self, uri: str, params: Optional[Dict[str, Any]] = None):
        """读取资源"""
        if not self.session:
            print("错误: 未连接到服务器")
            return
        
        try:
            content, mime_type = await self.session.read_resource(uri, params)
            print(f"\n资源: {uri}")
            print(f"MIME类型: {mime_type}")
            print("内容:")
            print(content)
        except Exception as e:
            print(f"读取资源错误: {e}")
    
    async def call_tool(self, name: str, args: Optional[Dict[str, Any]] = None):
        """调用工具"""
        if not self.session:
            print("错误: 未连接到服务器")
            return
        
        try:
            result = await self.session.call_tool(name, arguments=args or {})
            print(f"\n工具: {name}")
            if args:
                print(f"参数: {args}")
            
            if result.isError:
                print(f"错误: {result.content}")
            else:
                print("结果:")
                for item in result.content:
                    if hasattr(item, 'text'):
                        print(item.text)
                    else:
                        print(f"(不支持的内容类型: {item.type})")
        except Exception as e:
            print(f"调用工具错误: {e}")
    
    async def get_prompt(self, name: str, args: Optional[Dict[str, Any]] = None):
        """获取提示"""
        if not self.session:
            print("错误: 未连接到服务器")
            return
        
        try:
            prompt = await self.session.get_prompt(name, arguments=args or {})
            print(f"\n提示: {name}")
            if args:
                print(f"参数: {args}")
            
            print("消息:")
            for msg in prompt.messages:
                print(f"- {msg.role}: ", end="")
                if hasattr(msg.content, 'text'):
                    print(msg.content.text)
                else:
                    print(f"(不支持的内容类型)")
        except Exception as e:
            print(f"获取提示错误: {e}")

async def interactive_mode(client: MCPClient):
    """交互模式"""
    
    print("MCP 客户端 - 交互模式")
    print("输入 'help' 查看命令列表,输入 'exit' 退出")
    
    while True:
        try:
            command = input("\n> ").strip()
            
            if command == "exit":
                break
            
            elif command == "help":
                print("\n可用命令:")
                print("  resources       - 列出可用资源")
                print("  tools           - 列出可用工具")
                print("  prompts         - 列出可用提示")
                print("  read <uri>      - 读取资源")
                print("  call <tool>     - 调用工具")
                print("  prompt <name>   - 获取提示")
                print("  exit            - 退出程序")
                print("  help            - 显示此帮助")
            
            elif command == "resources":
                await client.list_resources()
            
            elif command == "tools":
                await client.list_tools()
            
            elif command == "prompts":
                await client.list_prompts()
            
            elif command.startswith("read "):
                uri = command[5:].strip()
                
                # 解析参数(如有)
                params = {}
                if " " in uri:
                    parts = uri.split(" ", 1)
                    uri = parts[0]
                    try:
                        # 简单解析 key=value 格式的参数
                        param_str = parts[1]
                        for pair in param_str.split(","):
                            if "=" in pair:
                                k, v = pair.split("=", 1)
                                params[k.strip()] = v.strip()
                    except Exception:
                        print("参数格式错误。正确格式: read uri param1=value1,param2=value2")
                        continue
                
                await client.read_resource(uri, params)
            
            elif command.startswith("call "):
                parts = command[5:].strip().split(" ", 1)
                tool_name = parts[0]
                
                args = {}
                if len(parts) > 1:
                    try:
                        # 简单解析 key=value 格式的参数
                        for pair in parts[1].split(","):
                            if "=" in pair:
                                k, v = pair.split("=", 1)
                                v = v.strip()
                                # 简单类型转换
                                if v.isdigit():
                                    v = int(v)
                                elif v.lower() == "true":
                                    v = True
                                elif v.lower() == "false":
                                    v = False
                                args[k.strip()] = v
                    except Exception:
                        print("参数格式错误。正确格式: call tool-name param1=value1,param2=value2")
                        continue
                
                await client.call_tool(tool_name, args)
            
            elif command.startswith("prompt "):
                parts = command[7:].strip().split(" ", 1)
                prompt_name = parts[0]
                
                args = {}
                if len(parts) > 1:
                    try:
                        # 简单解析 key=value 格式的参数
                        for pair in parts[1].split(","):
                            if "=" in pair:
                                k, v = pair.split("=", 1)
                                args[k.strip()] = v.strip()
                    except Exception:
                        print("参数格式错误。正确格式: prompt prompt-name param1=value1,param2=value2")
                        continue
                
                await client.get_prompt(prompt_name, args)
            
            else:
                print(f"未知命令: {command}")
                print("输入 'help' 查看可用命令")
        
        except KeyboardInterrupt:
            break
        except EOFError:
            break
        except Exception as e:
            print(f"错误: {e}")
    
    print("\n再见!")

async def main():
    parser = argparse.ArgumentParser(description="MCP 命令行客户端")
    parser.add_argument("--transport", choices=["stdio", "sse"], default="stdio",
                        help="传输方式 (默认: stdio)")
    parser.add_argument("--server", help="服务器脚本路径 (stdio模式)")
    parser.add_argument("--url", help="服务器URL (SSE模式)")
    
    args = parser.parse_args()
    
    client = MCPClient()
    
    try:
        # 连接到服务器
        await client.connect(
            transport=args.transport,
            server_path=args.server,
            server_url=args.url
        )
        
        # 进入交互模式
        await interactive_mode(client)
    
    finally:
        # 确保断开连接
        await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

这个命令行客户端支持:

  1. 连接到stdio或SSE传输的MCP服务器
  2. 列出可用的资源、工具和提示
  3. 读取资源
  4. 调用工具
  5. 获取提示
  6. 提供交互式命令行界面

使用方法示例:

# 使用stdio连接到服务器
python mcp_client.py --transport stdio --server note_taking.py

# 使用SSE连接到服务器
python mcp_client.py --transport sse --url http://localhost:8000/sse

小结

在本章中,我们深入探讨了MCP客户端的基础知识,包括:

  • 如何创建MCP客户端并选择合适的传输方式
  • 如何连接到服务器并初始化会话
  • 如何执行基本操作,如列出资源/工具/提示、读取资源、调用工具和获取提示
  • 如何处理会话和连接生命周期
  • 如何构建一个简单但功能完整的命令行客户端

理解这些基础知识对于构建更复杂的MCP应用程序至关重要。在下一章中,我们将深入探讨资源的高级用法,包括如何设计和使用复杂的资源模式。

相关实现文件

使用 Hugo 构建
主题 StackJimmy 设计