3.1 定制传输层
传输层是MCP系统的关键组成部分,负责在客户端与服务器之间传输消息。本章将详细介绍MCP传输协议以及如何创建自定义传输实现。
MCP传输协议概述
MCP传输协议定义了客户端和服务器之间通信的标准方式。MCP支持多种传输方式,每种都有其特点和适用场景。
标准传输类型
MCP内置支持以下传输类型:
- 标准输入/输出 (stdio) - 最简单的传输类型,通过标准输入和输出流通信,适用于本地进程间通信。
- 服务器发送事件 (SSE) - 基于HTTP的单向通信机制,适用于服务器向客户端推送事件。
- WebSocket - 提供全双工通信通道,适用于需要实时双向通信的场景。
- HTTP/REST - 基于HTTP请求和响应的通信方式,适用于简单的请求-响应模式。
消息格式
所有MCP传输协议都使用JSON作为基本消息格式。一个典型的MCP消息结构如下:
{
"id": "msg_123456", // 消息唯一标识符
"type": "request", // 消息类型:request, response, event, error
"method": "execute_tool", // 操作方法
"params": { // 参数对象
"tool_name": "example",
"arguments": {
"param1": "value1",
"param2": 42
}
}
}
消息类型
MCP定义了四种基本消息类型:
- 请求 (request) - 客户端向服务器发送的请求
- 响应 (response) - 服务器对请求的响应
- 事件 (event) - 服务器主动发送的通知
- 错误 (error) - 错误信息
实现自定义传输层
自定义传输层的基本结构
创建自定义传输层需要实现服务器和客户端两部分:
- 服务器传输实现 - 负责接收客户端请求并发送响应
- 客户端传输实现 - 负责向服务器发送请求并接收响应
下面我们将逐步实现这两部分。
服务器传输层实现
创建服务器传输层需要实现BaseTransport接口:
from typing import Any, Dict, Optional, AsyncGenerator
from mcp.server.transport import BaseTransport
class CustomTransport(BaseTransport):
"""自定义传输层实现"""
async def startup(self):
"""启动传输层"""
# 初始化传输层资源
print("启动自定义传输层...")
async def shutdown(self):
"""关闭传输层"""
# 释放传输层资源
print("关闭自定义传输层...")
async def run(self, handler):
"""运行传输层,处理消息"""
# 接收消息并调用处理器
# handler是一个异步函数,接收请求并返回响应
while True:
# 1. 接收请求
request = await self._receive_request()
if request is None:
# 传输结束
break
# 2. 处理请求
response = await handler(request)
# 3. 发送响应
await self._send_response(response)
async def _receive_request(self) -> Optional[Dict[str, Any]]:
"""接收请求消息
实际实现中需要根据具体传输机制接收消息
"""
# 示例:从某个消息源接收消息
# 实际实现可能涉及网络请求、消息队列等
# 返回None表示传输结束
# 返回字典表示接收到的消息
pass
async def _send_response(self, response: Dict[str, Any]):
"""发送响应消息
实际实现中需要根据具体传输机制发送消息
"""
# 示例:向某个目标发送消息
# 实际实现可能涉及网络响应、消息队列等
pass
客户端传输层实现
创建客户端传输层需要实现ClientTransport接口:
from typing import Any, Dict, Optional, AsyncGenerator
from mcp.client.transport import ClientTransport
class CustomClientTransport(ClientTransport):
"""自定义客户端传输层实现"""
async def connect(self):
"""连接到服务器"""
# 建立与服务器的连接
print("连接到服务器...")
async def disconnect(self):
"""断开与服务器的连接"""
# 释放连接资源
print("断开服务器连接...")
async def send_request(self, request: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
"""发送请求并接收响应
参数:
request: 请求消息字典
返回:
响应消息生成器
"""
# 1. 发送请求
await self._send_message(request)
# 2. 接收响应
async for response in self._receive_responses(request["id"]):
yield response
async def _send_message(self, message: Dict[str, Any]):
"""发送消息
实际实现中需要根据具体传输机制发送消息
"""
# 示例:向服务器发送消息
# 实际实现可能涉及网络请求、消息队列等
pass
async def _receive_responses(self, request_id: str) -> AsyncGenerator[Dict[str, Any], None]:
"""接收与请求ID对应的所有响应
参数:
request_id: 请求ID
返回:
响应消息生成器
"""
# 接收响应直到遇到最终响应或错误
while True:
# 接收下一个响应
response = await self._receive_next_response(request_id)
if response is None:
# 没有更多响应
break
# 产生响应
yield response
# 检查是否为最终响应
if self._is_final_response(response):
break
async def _receive_next_response(self, request_id: str) -> Optional[Dict[str, Any]]:
"""接收下一个响应消息
实际实现中需要根据具体传输机制接收消息
"""
# 示例:从服务器接收消息
# 实际实现可能涉及网络响应、消息队列等
pass
def _is_final_response(self, response: Dict[str, Any]) -> bool:
"""检查是否为最终响应"""
# 如果是最终响应或错误,返回True
return response.get("type") in ("response", "error") and not response.get("partial", False)
实现基于ZeroMQ的自定义传输层
下面是一个使用ZeroMQ(一个高性能分布式消息队列库)实现的自定义传输层示例:
import json
import uuid
import asyncio
import zmq
import zmq.asyncio
from typing import Any, Dict, Optional, AsyncGenerator
from mcp.server.transport import BaseTransport
from mcp.client.transport import ClientTransport
from mcp.errors import TransportError
# 服务器传输层实现
class ZeroMQTransport(BaseTransport):
"""基于ZeroMQ的MCP服务器传输层"""
def __init__(self, bind_address="tcp://*:5555"):
"""初始化ZeroMQ传输层
参数:
bind_address: ZeroMQ绑定地址
"""
self.bind_address = bind_address
self.context = None
self.socket = None
async def startup(self):
"""启动传输层"""
# 创建ZeroMQ上下文和套接字
self.context = zmq.asyncio.Context()
self.socket = self.context.socket(zmq.ROUTER) # 使用ROUTER模式支持多客户端
self.socket.bind(self.bind_address)
print(f"ZeroMQ传输层已启动,监听于 {self.bind_address}")
async def shutdown(self):
"""关闭传输层"""
if self.socket:
self.socket.close()
if self.context:
self.context.term()
print("ZeroMQ传输层已关闭")
async def run(self, handler):
"""运行传输层,处理消息"""
if not self.socket:
raise TransportError("传输层未启动")
try:
while True:
# 接收消息:[client_id, empty, message]
client_id, empty, message_bytes = await self.socket.recv_multipart()
# 解析消息
try:
request = json.loads(message_bytes.decode('utf-8'))
except json.JSONDecodeError as e:
# 发送错误响应
error_response = {
"id": str(uuid.uuid4()),
"type": "error",
"error": {
"code": "parse_error",
"message": f"无法解析JSON消息: {str(e)}"
}
}
await self.socket.send_multipart([
client_id,
b'',
json.dumps(error_response).encode('utf-8')
])
continue
# 处理请求
response_generator = handler(request)
# 发送响应
async for response in response_generator:
await self.socket.send_multipart([
client_id,
b'',
json.dumps(response).encode('utf-8')
])
except asyncio.CancelledError:
# 传输层被取消,正常退出
print("ZeroMQ传输层运行被取消")
except Exception as e:
# 其他错误
print(f"ZeroMQ传输层运行错误: {e}")
raise
# 客户端传输层实现
class ZeroMQClientTransport(ClientTransport):
"""基于ZeroMQ的MCP客户端传输层"""
def __init__(self, server_address="tcp://localhost:5555"):
"""初始化ZeroMQ客户端传输层
参数:
server_address: 服务器地址
"""
self.server_address = server_address
self.context = None
self.socket = None
self.connected = False
async def connect(self):
"""连接到服务器"""
# 创建ZeroMQ上下文和套接字
self.context = zmq.asyncio.Context()
self.socket = self.context.socket(zmq.DEALER) # 使用DEALER模式匹配服务器的ROUTER
# 设置随机标识符
self.socket.setsockopt(zmq.IDENTITY, str(uuid.uuid4()).encode('utf-8'))
# 连接到服务器
self.socket.connect(self.server_address)
self.connected = True
print(f"已连接到MCP服务器: {self.server_address}")
async def disconnect(self):
"""断开与服务器的连接"""
if self.socket:
self.socket.close()
if self.context:
self.context.term()
self.connected = False
print("已断开与MCP服务器的连接")
async def send_request(self, request: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
"""发送请求并接收响应"""
if not self.connected or not self.socket:
raise TransportError("未连接到服务器")
# 获取请求ID
request_id = request["id"]
# 发送请求
await self.socket.send_multipart([
b'', # 空帧,匹配ROUTER-DEALER模式
json.dumps(request).encode('utf-8')
])
# 接收响应
while True:
# 接收响应消息
try:
empty, response_bytes = await self.socket.recv_multipart()
response = json.loads(response_bytes.decode('utf-8'))
# 检查响应ID是否匹配
if response.get("id") != request_id:
print(f"警告: 收到ID不匹配的响应,预期 {request_id},实际 {response.get('id')}")
continue
# 产生响应
yield response
# 检查是否为最终响应
if response.get("type") in ("response", "error") and not response.get("partial", False):
break
except json.JSONDecodeError as e:
# 解析错误
error_response = {
"id": request_id,
"type": "error",
"error": {
"code": "parse_error",
"message": f"无法解析响应JSON: {str(e)}"
}
}
yield error_response
break
except Exception as e:
# 其他错误
error_response = {
"id": request_id,
"type": "error",
"error": {
"code": "transport_error",
"message": f"传输错误: {str(e)}"
}
}
yield error_response
break
使用自定义传输层
在服务器中使用自定义传输层
from mcp.server.fastmcp import FastMCP
# 创建MCP服务器
mcp = FastMCP("使用自定义传输层的服务器")
# 定义资源和工具
@mcp.resource("example://resource")
def get_example_resource():
return "这是一个示例资源"
@mcp.tool()
def example_tool(param1: str, param2: int = 0):
"""示例工具函数"""
return f"执行工具,参数: {param1}, {param2}"
# 创建自定义传输层
zeromq_transport = ZeroMQTransport(bind_address="tcp://*:5555")
# 使用自定义传输层运行服务器
if __name__ == "__main__":
mcp.run(transport=zeromq_transport)
在客户端中使用自定义传输层
import asyncio
from mcp.client.base import MCPClient
async def main():
# 创建使用自定义传输层的客户端
client_transport = ZeroMQClientTransport(server_address="tcp://localhost:5555")
# 创建MCP客户端
client = MCPClient(transport=client_transport)
# 连接到服务器
await client.connect()
try:
# 调用工具
result = await client.execute_tool(
"example_tool",
{"param1": "测试", "param2": 42}
)
print(f"工具执行结果: {result}")
# 获取资源
resource = await client.get_resource("example://resource")
print(f"资源内容: {resource}")
finally:
# 断开连接
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
实现基于Redis的发布/订阅传输层
下面是使用Redis发布/订阅机制的另一个自定义传输层示例,适用于需要跨网络、高可用性的场景:
import json
import uuid
import asyncio
import redis.asyncio as redis
from typing import Any, Dict, Optional, AsyncGenerator
from mcp.server.transport import BaseTransport
from mcp.client.transport import ClientTransport
from mcp.errors import TransportError
# 服务器传输层实现
class RedisTransport(BaseTransport):
"""基于Redis的MCP服务器传输层"""
def __init__(self, redis_url="redis://localhost:6379/0", channel_prefix="mcp"):
"""初始化Redis传输层
参数:
redis_url: Redis连接URL
channel_prefix: 通道前缀
"""
self.redis_url = redis_url
self.channel_prefix = channel_prefix
self.redis_client = None
self.pubsub = None
self.request_channel = f"{channel_prefix}:requests"
self.running = False
async def startup(self):
"""启动传输层"""
# 连接到Redis
self.redis_client = redis.from_url(self.redis_url)
# 创建发布/订阅对象
self.pubsub = self.redis_client.pubsub()
# 订阅请求通道
await self.pubsub.subscribe(self.request_channel)
self.running = True
print(f"Redis传输层已启动,监听通道 {self.request_channel}")
async def shutdown(self):
"""关闭传输层"""
self.running = False
if self.pubsub:
await self.pubsub.unsubscribe()
if self.redis_client:
await self.redis_client.close()
print("Redis传输层已关闭")
async def run(self, handler):
"""运行传输层,处理消息"""
if not self.running or not self.redis_client:
raise TransportError("传输层未启动")
try:
# 监听请求通道
while self.running:
# 接收消息
message = await self.pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if not message:
# 没有消息,继续等待
await asyncio.sleep(0.01)
continue
# 解析消息
try:
data = message.get("data")
if isinstance(data, bytes):
data = data.decode("utf-8")
request = json.loads(data)
# 确保请求包含客户端ID和响应通道
client_id = request.get("client_id")
response_channel = request.get("response_channel")
if not client_id or not response_channel:
print("警告: 收到无效请求,缺少client_id或response_channel")
continue
# 移除传输层特定字段,传递给处理器
transport_request = request.copy()
transport_request.pop("client_id", None)
transport_request.pop("response_channel", None)
# 处理请求
response_generator = handler(transport_request)
# 发送响应
async for response in response_generator:
# 发布响应到客户端特定通道
await self.redis_client.publish(
response_channel,
json.dumps(response)
)
except json.JSONDecodeError as e:
print(f"错误: 无法解析JSON消息: {e}")
except Exception as e:
print(f"错误: 处理请求时发生异常: {e}")
except asyncio.CancelledError:
# 传输层被取消,正常退出
print("Redis传输层运行被取消")
except Exception as e:
# 其他错误
print(f"Redis传输层运行错误: {e}")
raise
# 客户端传输层实现
class RedisClientTransport(ClientTransport):
"""基于Redis的MCP客户端传输层"""
def __init__(self, redis_url="redis://localhost:6379/0", channel_prefix="mcp"):
"""初始化Redis客户端传输层
参数:
redis_url: Redis连接URL
channel_prefix: 通道前缀
"""
self.redis_url = redis_url
self.channel_prefix = channel_prefix
self.redis_client = None
self.pubsub = None
self.request_channel = f"{channel_prefix}:requests"
self.client_id = str(uuid.uuid4())
self.connected = False
async def connect(self):
"""连接到服务器"""
# 连接到Redis
self.redis_client = redis.from_url(self.redis_url)
# 创建发布/订阅对象
self.pubsub = self.redis_client.pubsub()
self.connected = True
print(f"已连接到Redis: {self.redis_url}")
async def disconnect(self):
"""断开与服务器的连接"""
self.connected = False
if self.pubsub:
await self.pubsub.unsubscribe()
if self.redis_client:
await self.redis_client.close()
print("已断开与Redis的连接")
async def send_request(self, request: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
"""发送请求并接收响应"""
if not self.connected or not self.redis_client:
raise TransportError("未连接到服务器")
# 获取请求ID
request_id = request["id"]
# 创建响应通道
response_channel = f"{self.channel_prefix}:responses:{self.client_id}:{request_id}"
# 订阅响应通道
await self.pubsub.subscribe(response_channel)
try:
# 构建传输请求
transport_request = request.copy()
transport_request["client_id"] = self.client_id
transport_request["response_channel"] = response_channel
# 发布请求
await self.redis_client.publish(
self.request_channel,
json.dumps(transport_request)
)
# 等待并处理响应
while True:
# 接收响应
message = await self.pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if not message:
# 没有消息,继续等待
await asyncio.sleep(0.01)
continue
try:
data = message.get("data")
if isinstance(data, bytes):
data = data.decode("utf-8")
response = json.loads(data)
# 确认响应ID匹配
if response.get("id") != request_id:
print(f"警告: 收到ID不匹配的响应,预期 {request_id},实际 {response.get('id')}")
continue
# 产生响应
yield response
# 检查是否为最终响应
if response.get("type") in ("response", "error") and not response.get("partial", False):
break
except json.JSONDecodeError as e:
# 解析错误
error_response = {
"id": request_id,
"type": "error",
"error": {
"code": "parse_error",
"message": f"无法解析响应JSON: {str(e)}"
}
}
yield error_response
break
except Exception as e:
# 其他错误
error_response = {
"id": request_id,
"type": "error",
"error": {
"code": "transport_error",
"message": f"传输错误: {str(e)}"
}
}
yield error_response
break
finally:
# 取消订阅响应通道
await self.pubsub.unsubscribe(response_channel)
传输层选择指南
在为MCP应用选择或设计传输层时,考虑以下因素:
-
性能需求
- 延迟敏感度
- 吞吐量需求
- 并发连接数
-
网络环境
- 内部网络或互联网
- 防火墙或代理限制
- 带宽限制
-
部署架构
- 单机部署或分布式
- 容器化或无服务器
- 高可用性需求
-
安全需求
- 传输加密
- 身份验证
- 访问控制
下表提供了不同传输类型的特性比较:
| 传输类型 | 优点 | 缺点 | 最佳使用场景 |
|---|---|---|---|
| stdio | 简单、零依赖、低延迟 | 仅支持单进程通信 | 本地开发、子进程通信 |
| SSE | 简单实现、易于调试、通过HTTP防火墙 | 单向通信、连接数限制 | 服务器到客户端的事件通知 |
| WebSocket | 双向通信、低延迟、支持标准库 | 可能被防火墙阻止、需要心跳机制 | 实时交互应用、聊天应用 |
| HTTP/REST | 广泛支持、易于理解、无状态 | 请求开销高、不适合频繁小消息 | 简单集成、RESTful服务 |
| ZeroMQ | 高性能、灵活模式、跨语言 | 额外依赖、复杂配置 | 高性能系统、微服务架构 |
| Redis | 可靠性高、支持集群、内置消息持久化 | 额外依赖、增加部署复杂性 | 分布式系统、需要消息可靠性的场景 |
小结
在本章中,我们探讨了MCP传输协议和自定义传输层实现:
- MCP传输协议的基础知识,包括消息格式和类型
- 如何实现自定义传输层的服务器和客户端部分
- 基于ZeroMQ和Redis的传输层实现示例
- 使用自定义传输层的服务器和客户端代码
- 传输层选择的考虑因素
掌握传输层的设计和实现,可以使您根据特定需求优化MCP应用的通信效率和可靠性。在下一章中,我们将探讨高性能MCP服务的构建,包括并发和异步处理模型。