1.4 客户端基础
在前一章中,我们学习了如何创建MCP服务器。在本章中,我们将深入探讨MCP客户端的基础知识,包括如何创建客户端、连接到服务器、执行基本操作以及处理会话生命周期。
创建MCP客户端
MCP客户端是与MCP服务器交互的程序,它负责发送请求、接收响应,并将功能暴露给应用程序或大语言模型。创建MCP客户端的第一步是选择合适的传输方式。
选择传输方式
MCP支持多种传输方式,最常用的是:
- stdio - 通过标准输入/输出流通信,适用于本地运行的服务器
- SSE (Server-Sent Events) - 通过HTTP连接通信,适用于远程服务器
- WebSocket - 通过WebSocket协议通信,适用于需要高性能双向通信的场景
下面是创建不同传输方式客户端的基本模式:
Stdio客户端
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def create_stdio_client():
# 定义服务器参数
server_params = StdioServerParameters(
command="python", # 可执行命令
args=["path/to/server.py"], # 命令参数
env=None, # 可选的环境变量
)
# 创建客户端连接
async with stdio_client(server_params) as (read, write):
# 创建会话
async with ClientSession(read, write) as session:
# 使用会话执行操作
await session.initialize()
# ... 其他操作 ...
SSE客户端
from mcp import ClientSession
from mcp.client.sse import sse_client
async def create_sse_client():
# 服务器URL
server_url = "http://localhost:8000/sse"
# 可选的HTTP头信息
headers = {
"Authorization": "Bearer your-token",
}
# 创建客户端连接
async with sse_client(server_url, headers=headers) as (read, write):
# 创建会话
async with ClientSession(read, write) as session:
# 使用会话执行操作
await session.initialize()
# ... 其他操作 ...
WebSocket客户端
from mcp import ClientSession
from mcp.client.websocket import websocket_client
async def create_websocket_client():
# 服务器URL
server_url = "ws://localhost:8000/ws"
# 可选的HTTP头信息
headers = {
"Authorization": "Bearer your-token",
}
# 创建客户端连接
async with websocket_client(server_url, headers=headers) as (read, write):
# 创建会话
async with ClientSession(read, write) as session:
# 使用会话执行操作
await session.initialize()
# ... 其他操作 ...
ClientSession参数
除了基本的读写流,ClientSession构造函数还接受多个可选参数:
async with ClientSession(
read,
write,
sampling_callback=None, # 用于创建消息的回调函数
request_id_generator=None, # 自定义请求ID生成器
client_id=None, # 客户端ID
) as session:
# 使用会话
其中,sampling_callback是一个特殊的回调函数,用于在服务器请求生成内容时调用。这在集成大语言模型时特别有用。
连接到服务器
一旦创建了客户端,下一步是连接到服务器并初始化会话。
初始化连接
初始化是客户端与服务器交互的第一步。在这个阶段,客户端和服务器交换基本信息,包括协议版本、支持的功能等。
# 初始化连接
await session.initialize()
# 如果需要,可以指定自定义选项
await session.initialize(
client_name="My Client",
client_version="1.0.0",
)
初始化过程会返回服务器支持的功能列表,但通常不需要直接处理这些信息,因为ClientSession会自动管理功能兼容性。
连接错误处理
连接到服务器可能会遇到各种错误,应当妥善处理:
try:
# 尝试连接和初始化
await session.initialize()
print("连接成功!")
except ConnectionError as e:
print(f"连接错误: {e}")
except TimeoutError as e:
print(f"连接超时: {e}")
except Exception as e:
print(f"其他错误: {e}")
特别是对于SSE和WebSocket连接,可能需要设置超时和重试策略:
# SSE连接超时设置
async with sse_client(server_url, timeout=10, sse_read_timeout=60) as (read, write):
# ...
基本操作
一旦成功连接到服务器,客户端可以执行各种操作,包括列出资源/工具/提示、读取资源、调用工具和获取提示。
列出资源
# 列出所有可用资源
resources = await session.list_resources()
print(f"可用资源: {resources}")
# 分页列出资源(适用于资源较多的情况)
first_page = await session.list_resources(limit=10)
cursor = first_page.nextCursor
if cursor:
next_page = await session.list_resources(cursor=cursor, limit=10)
列出工具
# 列出所有可用工具
tools = await session.list_tools()
print(f"可用工具: {tools}")
# 分页列出工具
first_page = await session.list_tools(limit=10)
cursor = first_page.nextCursor
if cursor:
next_page = await session.list_tools(cursor=cursor, limit=10)
列出提示
# 列出所有可用提示
prompts = await session.list_prompts()
print(f"可用提示: {prompts}")
# 分页列出提示
first_page = await session.list_prompts(limit=10)
cursor = first_page.nextCursor
if cursor:
next_page = await session.list_prompts(cursor=cursor, limit=10)
读取资源
# 读取资源(返回内容和MIME类型)
content, mime_type = await session.read_resource("resource://path")
# 读取带参数的资源
content, mime_type = await session.read_resource("user://{user_id}", {"user_id": "123"})
对于资源URI中的参数,有两种方式处理:
- 在URI中直接替换:
"user://123" - 使用参数字典:
"user://{user_id}"+{"user_id": "123"}
调用工具
# 调用不带参数的工具
result = await session.call_tool("tool-name")
# 调用带参数的工具
result = await session.call_tool("calculate", arguments={
"operation": "add",
"a": 5,
"b": 3
})
# 处理工具结果
if result.isError:
print(f"工具调用错误: {result.content}")
else:
# 工具可能返回不同类型的内容
for content_item in result.content:
if content_item.type == "text":
print(f"文本结果: {content_item.text}")
elif content_item.type == "image":
print(f"图片结果: {content_item.url}")
获取提示
# 获取不带参数的提示
prompt = await session.get_prompt("prompt-name")
# 获取带参数的提示
prompt = await session.get_prompt("welcome-message", arguments={
"user_name": "张三",
"time_of_day": "上午"
})
# 处理提示内容
for message in prompt.messages:
print(f"角色: {message.role}")
content = message.content
if hasattr(content, "text"):
print(f"内容: {content.text}")
处理会话和连接生命周期
MCP客户端会话和连接有明确的生命周期,正确管理这些生命周期对于资源利用和错误处理至关重要。
会话生命周期
MCP会话的标准生命周期如下:
- 创建会话 - 初始化ClientSession对象
- 连接到服务器 - 调用initialize()方法
- 执行操作 - 列出资源/工具/提示,读取资源,调用工具等
- 关闭会话 - 释放资源
使用异步上下文管理器可以自动处理会话的创建和关闭:
async with ClientSession(read, write) as session:
# 会话在这里有效
await session.initialize()
# ... 执行操作 ...
# 会话在这里自动关闭
连接断开处理
在长时间运行的应用中,可能需要处理连接断开和重新连接:
import asyncio
async def run_with_reconnect():
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
async with sse_client(server_url) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
print("连接成功!")
# 执行操作
while True:
try:
# 执行周期性操作
result = await session.call_tool("heartbeat")
print("心跳成功")
await asyncio.sleep(60) # 每分钟检查一次
except Exception as e:
print(f"操作错误,尝试恢复: {e}")
break
except Exception as e:
retry_count += 1
wait_time = 2 ** retry_count # 指数退避
print(f"连接断开: {e}")
print(f"将在 {wait_time} 秒后尝试重新连接...")
await asyncio.sleep(wait_time)
else:
# 正常退出循环
break
if retry_count >= max_retries:
print("达到最大重试次数,放弃连接")
资源清理
确保在任何情况下都能正确清理资源:
async def safe_client_run():
client = None
session = None
try:
# 创建客户端和会话
client_resources = await stdio_client(server_params).__aenter__()
read, write = client_resources
session = ClientSession(read, write)
await session.__aenter__()
# 执行操作
await session.initialize()
# ... 其他操作 ...
except Exception as e:
print(f"发生错误: {e}")
finally:
# 确保资源被清理
if session:
await session.__aexit__(None, None, None)
if client:
await client.__aexit__(None, None, None)
不过,通常推荐使用异步上下文管理器(async with)来自动处理这些清理工作。
示例:构建简单命令行客户端
让我们将所学知识结合起来,构建一个简单的命令行客户端,用于与MCP服务器交互:
#!/usr/bin/env python3
import asyncio
import argparse
import sys
from typing import Optional, List, Dict, Any
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client
class MCPClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.read = None
self.write = None
async def connect(self, transport: str, server_path: Optional[str] = None, server_url: Optional[str] = None):
"""连接到MCP服务器"""
if transport == "stdio":
if not server_path:
raise ValueError("使用stdio传输时必须提供server_path")
server_params = StdioServerParameters(
command="python",
args=[server_path],
)
self.read, self.write = await stdio_client(server_params).__aenter__()
elif transport == "sse":
if not server_url:
raise ValueError("使用SSE传输时必须提供server_url")
self.read, self.write = await sse_client(server_url).__aenter__()
else:
raise ValueError(f"不支持的传输方式: {transport}")
self.session = await ClientSession(self.read, self.write).__aenter__()
await self.session.initialize()
print("已连接到服务器")
async def disconnect(self):
"""断开与服务器的连接"""
if self.session:
await self.session.__aexit__(None, None, None)
self.session = None
print("已断开连接")
async def list_resources(self):
"""列出可用资源"""
if not self.session:
print("错误: 未连接到服务器")
return
resources = await self.session.list_resources()
print("\n可用资源:")
if resources and hasattr(resources, 'resources'):
for resource in resources.resources:
print(f" - {resource}")
else:
print(" (没有可用资源)")
async def list_tools(self):
"""列出可用工具"""
if not self.session:
print("错误: 未连接到服务器")
return
tools = await self.session.list_tools()
print("\n可用工具:")
if tools and hasattr(tools, 'tools'):
for tool in tools.tools:
print(f" - {tool.name}: {tool.description}")
else:
print(" (没有可用工具)")
async def list_prompts(self):
"""列出可用提示"""
if not self.session:
print("错误: 未连接到服务器")
return
prompts = await self.session.list_prompts()
print("\n可用提示:")
if prompts and hasattr(prompts, 'prompts'):
for prompt in prompts.prompts:
print(f" - {prompt.name}: {prompt.description}")
else:
print(" (没有可用提示)")
async def read_resource(self, uri: str, params: Optional[Dict[str, Any]] = None):
"""读取资源"""
if not self.session:
print("错误: 未连接到服务器")
return
try:
content, mime_type = await self.session.read_resource(uri, params)
print(f"\n资源: {uri}")
print(f"MIME类型: {mime_type}")
print("内容:")
print(content)
except Exception as e:
print(f"读取资源错误: {e}")
async def call_tool(self, name: str, args: Optional[Dict[str, Any]] = None):
"""调用工具"""
if not self.session:
print("错误: 未连接到服务器")
return
try:
result = await self.session.call_tool(name, arguments=args or {})
print(f"\n工具: {name}")
if args:
print(f"参数: {args}")
if result.isError:
print(f"错误: {result.content}")
else:
print("结果:")
for item in result.content:
if hasattr(item, 'text'):
print(item.text)
else:
print(f"(不支持的内容类型: {item.type})")
except Exception as e:
print(f"调用工具错误: {e}")
async def get_prompt(self, name: str, args: Optional[Dict[str, Any]] = None):
"""获取提示"""
if not self.session:
print("错误: 未连接到服务器")
return
try:
prompt = await self.session.get_prompt(name, arguments=args or {})
print(f"\n提示: {name}")
if args:
print(f"参数: {args}")
print("消息:")
for msg in prompt.messages:
print(f"- {msg.role}: ", end="")
if hasattr(msg.content, 'text'):
print(msg.content.text)
else:
print(f"(不支持的内容类型)")
except Exception as e:
print(f"获取提示错误: {e}")
async def interactive_mode(client: MCPClient):
"""交互模式"""
print("MCP 客户端 - 交互模式")
print("输入 'help' 查看命令列表,输入 'exit' 退出")
while True:
try:
command = input("\n> ").strip()
if command == "exit":
break
elif command == "help":
print("\n可用命令:")
print(" resources - 列出可用资源")
print(" tools - 列出可用工具")
print(" prompts - 列出可用提示")
print(" read <uri> - 读取资源")
print(" call <tool> - 调用工具")
print(" prompt <name> - 获取提示")
print(" exit - 退出程序")
print(" help - 显示此帮助")
elif command == "resources":
await client.list_resources()
elif command == "tools":
await client.list_tools()
elif command == "prompts":
await client.list_prompts()
elif command.startswith("read "):
uri = command[5:].strip()
# 解析参数(如有)
params = {}
if " " in uri:
parts = uri.split(" ", 1)
uri = parts[0]
try:
# 简单解析 key=value 格式的参数
param_str = parts[1]
for pair in param_str.split(","):
if "=" in pair:
k, v = pair.split("=", 1)
params[k.strip()] = v.strip()
except Exception:
print("参数格式错误。正确格式: read uri param1=value1,param2=value2")
continue
await client.read_resource(uri, params)
elif command.startswith("call "):
parts = command[5:].strip().split(" ", 1)
tool_name = parts[0]
args = {}
if len(parts) > 1:
try:
# 简单解析 key=value 格式的参数
for pair in parts[1].split(","):
if "=" in pair:
k, v = pair.split("=", 1)
v = v.strip()
# 简单类型转换
if v.isdigit():
v = int(v)
elif v.lower() == "true":
v = True
elif v.lower() == "false":
v = False
args[k.strip()] = v
except Exception:
print("参数格式错误。正确格式: call tool-name param1=value1,param2=value2")
continue
await client.call_tool(tool_name, args)
elif command.startswith("prompt "):
parts = command[7:].strip().split(" ", 1)
prompt_name = parts[0]
args = {}
if len(parts) > 1:
try:
# 简单解析 key=value 格式的参数
for pair in parts[1].split(","):
if "=" in pair:
k, v = pair.split("=", 1)
args[k.strip()] = v.strip()
except Exception:
print("参数格式错误。正确格式: prompt prompt-name param1=value1,param2=value2")
continue
await client.get_prompt(prompt_name, args)
else:
print(f"未知命令: {command}")
print("输入 'help' 查看可用命令")
except KeyboardInterrupt:
break
except EOFError:
break
except Exception as e:
print(f"错误: {e}")
print("\n再见!")
async def main():
parser = argparse.ArgumentParser(description="MCP 命令行客户端")
parser.add_argument("--transport", choices=["stdio", "sse"], default="stdio",
help="传输方式 (默认: stdio)")
parser.add_argument("--server", help="服务器脚本路径 (stdio模式)")
parser.add_argument("--url", help="服务器URL (SSE模式)")
args = parser.parse_args()
client = MCPClient()
try:
# 连接到服务器
await client.connect(
transport=args.transport,
server_path=args.server,
server_url=args.url
)
# 进入交互模式
await interactive_mode(client)
finally:
# 确保断开连接
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
这个命令行客户端支持:
- 连接到stdio或SSE传输的MCP服务器
- 列出可用的资源、工具和提示
- 读取资源
- 调用工具
- 获取提示
- 提供交互式命令行界面
使用方法示例:
# 使用stdio连接到服务器
python mcp_client.py --transport stdio --server note_taking.py
# 使用SSE连接到服务器
python mcp_client.py --transport sse --url http://localhost:8000/sse
小结
在本章中,我们深入探讨了MCP客户端的基础知识,包括:
- 如何创建MCP客户端并选择合适的传输方式
- 如何连接到服务器并初始化会话
- 如何执行基本操作,如列出资源/工具/提示、读取资源、调用工具和获取提示
- 如何处理会话和连接生命周期
- 如何构建一个简单但功能完整的命令行客户端
理解这些基础知识对于构建更复杂的MCP应用程序至关重要。在下一章中,我们将深入探讨资源的高级用法,包括如何设计和使用复杂的资源模式。
相关实现文件
- example/example_client.py - 完整的MCP客户端示例
- test_client.py - 测试用的基础客户端
- test_mcp_client.py - 另一个MCP客户端测试实现
- mcp/client - MCP客户端库目录