3.1_定制传输层

3.1 定制传输层

传输层是MCP系统的关键组成部分,负责在客户端与服务器之间传输消息。本章将详细介绍MCP传输协议以及如何创建自定义传输实现。

MCP传输协议概述

MCP传输协议定义了客户端和服务器之间通信的标准方式。MCP支持多种传输方式,每种都有其特点和适用场景。

标准传输类型

MCP内置支持以下传输类型:

  1. 标准输入/输出 (stdio) - 最简单的传输类型,通过标准输入和输出流通信,适用于本地进程间通信。
  2. 服务器发送事件 (SSE) - 基于HTTP的单向通信机制,适用于服务器向客户端推送事件。
  3. WebSocket - 提供全双工通信通道,适用于需要实时双向通信的场景。
  4. HTTP/REST - 基于HTTP请求和响应的通信方式,适用于简单的请求-响应模式。

消息格式

所有MCP传输协议都使用JSON作为基本消息格式。一个典型的MCP消息结构如下:

{
  "id": "msg_123456",          // 消息唯一标识符
  "type": "request",           // 消息类型:request, response, event, error
  "method": "execute_tool",    // 操作方法
  "params": {                  // 参数对象
    "tool_name": "example",
    "arguments": {
      "param1": "value1",
      "param2": 42
    }
  }
}

消息类型

MCP定义了四种基本消息类型:

  1. 请求 (request) - 客户端向服务器发送的请求
  2. 响应 (response) - 服务器对请求的响应
  3. 事件 (event) - 服务器主动发送的通知
  4. 错误 (error) - 错误信息

实现自定义传输层

自定义传输层的基本结构

创建自定义传输层需要实现服务器和客户端两部分:

  1. 服务器传输实现 - 负责接收客户端请求并发送响应
  2. 客户端传输实现 - 负责向服务器发送请求并接收响应

下面我们将逐步实现这两部分。

服务器传输层实现

创建服务器传输层需要实现BaseTransport接口:

from typing import Any, Dict, Optional, AsyncGenerator
from mcp.server.transport import BaseTransport

class CustomTransport(BaseTransport):
    """自定义传输层实现"""
    
    async def startup(self):
        """启动传输层"""
        # 初始化传输层资源
        print("启动自定义传输层...")
    
    async def shutdown(self):
        """关闭传输层"""
        # 释放传输层资源
        print("关闭自定义传输层...")
    
    async def run(self, handler):
        """运行传输层,处理消息"""
        # 接收消息并调用处理器
        # handler是一个异步函数,接收请求并返回响应
        
        while True:
            # 1. 接收请求
            request = await self._receive_request()
            
            if request is None:
                # 传输结束
                break
            
            # 2. 处理请求
            response = await handler(request)
            
            # 3. 发送响应
            await self._send_response(response)
    
    async def _receive_request(self) -> Optional[Dict[str, Any]]:
        """接收请求消息
        
        实际实现中需要根据具体传输机制接收消息
        """
        # 示例:从某个消息源接收消息
        # 实际实现可能涉及网络请求、消息队列等
        
        # 返回None表示传输结束
        # 返回字典表示接收到的消息
        pass
    
    async def _send_response(self, response: Dict[str, Any]):
        """发送响应消息
        
        实际实现中需要根据具体传输机制发送消息
        """
        # 示例:向某个目标发送消息
        # 实际实现可能涉及网络响应、消息队列等
        pass

客户端传输层实现

创建客户端传输层需要实现ClientTransport接口:

from typing import Any, Dict, Optional, AsyncGenerator
from mcp.client.transport import ClientTransport

class CustomClientTransport(ClientTransport):
    """自定义客户端传输层实现"""
    
    async def connect(self):
        """连接到服务器"""
        # 建立与服务器的连接
        print("连接到服务器...")
    
    async def disconnect(self):
        """断开与服务器的连接"""
        # 释放连接资源
        print("断开服务器连接...")
    
    async def send_request(self, request: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
        """发送请求并接收响应
        
        参数:
            request: 请求消息字典
            
        返回:
            响应消息生成器
        """
        # 1. 发送请求
        await self._send_message(request)
        
        # 2. 接收响应
        async for response in self._receive_responses(request["id"]):
            yield response
    
    async def _send_message(self, message: Dict[str, Any]):
        """发送消息
        
        实际实现中需要根据具体传输机制发送消息
        """
        # 示例:向服务器发送消息
        # 实际实现可能涉及网络请求、消息队列等
        pass
    
    async def _receive_responses(self, request_id: str) -> AsyncGenerator[Dict[str, Any], None]:
        """接收与请求ID对应的所有响应
        
        参数:
            request_id: 请求ID
            
        返回:
            响应消息生成器
        """
        # 接收响应直到遇到最终响应或错误
        while True:
            # 接收下一个响应
            response = await self._receive_next_response(request_id)
            
            if response is None:
                # 没有更多响应
                break
            
            # 产生响应
            yield response
            
            # 检查是否为最终响应
            if self._is_final_response(response):
                break
    
    async def _receive_next_response(self, request_id: str) -> Optional[Dict[str, Any]]:
        """接收下一个响应消息
        
        实际实现中需要根据具体传输机制接收消息
        """
        # 示例:从服务器接收消息
        # 实际实现可能涉及网络响应、消息队列等
        pass
    
    def _is_final_response(self, response: Dict[str, Any]) -> bool:
        """检查是否为最终响应"""
        # 如果是最终响应或错误,返回True
        return response.get("type") in ("response", "error") and not response.get("partial", False)

实现基于ZeroMQ的自定义传输层

下面是一个使用ZeroMQ(一个高性能分布式消息队列库)实现的自定义传输层示例:

import json
import uuid
import asyncio
import zmq
import zmq.asyncio
from typing import Any, Dict, Optional, AsyncGenerator
from mcp.server.transport import BaseTransport
from mcp.client.transport import ClientTransport
from mcp.errors import TransportError

# 服务器传输层实现
class ZeroMQTransport(BaseTransport):
    """基于ZeroMQ的MCP服务器传输层"""
    
    def __init__(self, bind_address="tcp://*:5555"):
        """初始化ZeroMQ传输层
        
        参数:
            bind_address: ZeroMQ绑定地址
        """
        self.bind_address = bind_address
        self.context = None
        self.socket = None
    
    async def startup(self):
        """启动传输层"""
        # 创建ZeroMQ上下文和套接字
        self.context = zmq.asyncio.Context()
        self.socket = self.context.socket(zmq.ROUTER)  # 使用ROUTER模式支持多客户端
        self.socket.bind(self.bind_address)
        print(f"ZeroMQ传输层已启动,监听于 {self.bind_address}")
    
    async def shutdown(self):
        """关闭传输层"""
        if self.socket:
            self.socket.close()
        if self.context:
            self.context.term()
        print("ZeroMQ传输层已关闭")
    
    async def run(self, handler):
        """运行传输层,处理消息"""
        if not self.socket:
            raise TransportError("传输层未启动")
        
        try:
            while True:
                # 接收消息:[client_id, empty, message]
                client_id, empty, message_bytes = await self.socket.recv_multipart()
                
                # 解析消息
                try:
                    request = json.loads(message_bytes.decode('utf-8'))
                except json.JSONDecodeError as e:
                    # 发送错误响应
                    error_response = {
                        "id": str(uuid.uuid4()),
                        "type": "error",
                        "error": {
                            "code": "parse_error",
                            "message": f"无法解析JSON消息: {str(e)}"
                        }
                    }
                    await self.socket.send_multipart([
                        client_id,
                        b'',
                        json.dumps(error_response).encode('utf-8')
                    ])
                    continue
                
                # 处理请求
                response_generator = handler(request)
                
                # 发送响应
                async for response in response_generator:
                    await self.socket.send_multipart([
                        client_id,
                        b'',
                        json.dumps(response).encode('utf-8')
                    ])
        
        except asyncio.CancelledError:
            # 传输层被取消,正常退出
            print("ZeroMQ传输层运行被取消")
        except Exception as e:
            # 其他错误
            print(f"ZeroMQ传输层运行错误: {e}")
            raise

# 客户端传输层实现
class ZeroMQClientTransport(ClientTransport):
    """基于ZeroMQ的MCP客户端传输层"""
    
    def __init__(self, server_address="tcp://localhost:5555"):
        """初始化ZeroMQ客户端传输层
        
        参数:
            server_address: 服务器地址
        """
        self.server_address = server_address
        self.context = None
        self.socket = None
        self.connected = False
    
    async def connect(self):
        """连接到服务器"""
        # 创建ZeroMQ上下文和套接字
        self.context = zmq.asyncio.Context()
        self.socket = self.context.socket(zmq.DEALER)  # 使用DEALER模式匹配服务器的ROUTER
        
        # 设置随机标识符
        self.socket.setsockopt(zmq.IDENTITY, str(uuid.uuid4()).encode('utf-8'))
        
        # 连接到服务器
        self.socket.connect(self.server_address)
        self.connected = True
        print(f"已连接到MCP服务器: {self.server_address}")
    
    async def disconnect(self):
        """断开与服务器的连接"""
        if self.socket:
            self.socket.close()
        if self.context:
            self.context.term()
        self.connected = False
        print("已断开与MCP服务器的连接")
    
    async def send_request(self, request: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
        """发送请求并接收响应"""
        if not self.connected or not self.socket:
            raise TransportError("未连接到服务器")
        
        # 获取请求ID
        request_id = request["id"]
        
        # 发送请求
        await self.socket.send_multipart([
            b'',  # 空帧,匹配ROUTER-DEALER模式
            json.dumps(request).encode('utf-8')
        ])
        
        # 接收响应
        while True:
            # 接收响应消息
            try:
                empty, response_bytes = await self.socket.recv_multipart()
                response = json.loads(response_bytes.decode('utf-8'))
                
                # 检查响应ID是否匹配
                if response.get("id") != request_id:
                    print(f"警告: 收到ID不匹配的响应,预期 {request_id},实际 {response.get('id')}")
                    continue
                
                # 产生响应
                yield response
                
                # 检查是否为最终响应
                if response.get("type") in ("response", "error") and not response.get("partial", False):
                    break
                
            except json.JSONDecodeError as e:
                # 解析错误
                error_response = {
                    "id": request_id,
                    "type": "error",
                    "error": {
                        "code": "parse_error",
                        "message": f"无法解析响应JSON: {str(e)}"
                    }
                }
                yield error_response
                break
            except Exception as e:
                # 其他错误
                error_response = {
                    "id": request_id,
                    "type": "error",
                    "error": {
                        "code": "transport_error",
                        "message": f"传输错误: {str(e)}"
                    }
                }
                yield error_response
                break

使用自定义传输层

在服务器中使用自定义传输层

from mcp.server.fastmcp import FastMCP

# 创建MCP服务器
mcp = FastMCP("使用自定义传输层的服务器")

# 定义资源和工具
@mcp.resource("example://resource")
def get_example_resource():
    return "这是一个示例资源"

@mcp.tool()
def example_tool(param1: str, param2: int = 0):
    """示例工具函数"""
    return f"执行工具,参数: {param1}, {param2}"

# 创建自定义传输层
zeromq_transport = ZeroMQTransport(bind_address="tcp://*:5555")

# 使用自定义传输层运行服务器
if __name__ == "__main__":
    mcp.run(transport=zeromq_transport)

在客户端中使用自定义传输层

import asyncio
from mcp.client.base import MCPClient

async def main():
    # 创建使用自定义传输层的客户端
    client_transport = ZeroMQClientTransport(server_address="tcp://localhost:5555")
    
    # 创建MCP客户端
    client = MCPClient(transport=client_transport)
    
    # 连接到服务器
    await client.connect()
    
    try:
        # 调用工具
        result = await client.execute_tool(
            "example_tool",
            {"param1": "测试", "param2": 42}
        )
        print(f"工具执行结果: {result}")
        
        # 获取资源
        resource = await client.get_resource("example://resource")
        print(f"资源内容: {resource}")
    
    finally:
        # 断开连接
        await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

实现基于Redis的发布/订阅传输层

下面是使用Redis发布/订阅机制的另一个自定义传输层示例,适用于需要跨网络、高可用性的场景:

import json
import uuid
import asyncio
import redis.asyncio as redis
from typing import Any, Dict, Optional, AsyncGenerator
from mcp.server.transport import BaseTransport
from mcp.client.transport import ClientTransport
from mcp.errors import TransportError

# 服务器传输层实现
class RedisTransport(BaseTransport):
    """基于Redis的MCP服务器传输层"""
    
    def __init__(self, redis_url="redis://localhost:6379/0", channel_prefix="mcp"):
        """初始化Redis传输层
        
        参数:
            redis_url: Redis连接URL
            channel_prefix: 通道前缀
        """
        self.redis_url = redis_url
        self.channel_prefix = channel_prefix
        self.redis_client = None
        self.pubsub = None
        self.request_channel = f"{channel_prefix}:requests"
        self.running = False
    
    async def startup(self):
        """启动传输层"""
        # 连接到Redis
        self.redis_client = redis.from_url(self.redis_url)
        # 创建发布/订阅对象
        self.pubsub = self.redis_client.pubsub()
        # 订阅请求通道
        await self.pubsub.subscribe(self.request_channel)
        self.running = True
        print(f"Redis传输层已启动,监听通道 {self.request_channel}")
    
    async def shutdown(self):
        """关闭传输层"""
        self.running = False
        if self.pubsub:
            await self.pubsub.unsubscribe()
        if self.redis_client:
            await self.redis_client.close()
        print("Redis传输层已关闭")
    
    async def run(self, handler):
        """运行传输层,处理消息"""
        if not self.running or not self.redis_client:
            raise TransportError("传输层未启动")
        
        try:
            # 监听请求通道
            while self.running:
                # 接收消息
                message = await self.pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
                
                if not message:
                    # 没有消息,继续等待
                    await asyncio.sleep(0.01)
                    continue
                
                # 解析消息
                try:
                    data = message.get("data")
                    if isinstance(data, bytes):
                        data = data.decode("utf-8")
                    
                    request = json.loads(data)
                    
                    # 确保请求包含客户端ID和响应通道
                    client_id = request.get("client_id")
                    response_channel = request.get("response_channel")
                    
                    if not client_id or not response_channel:
                        print("警告: 收到无效请求,缺少client_id或response_channel")
                        continue
                    
                    # 移除传输层特定字段,传递给处理器
                    transport_request = request.copy()
                    transport_request.pop("client_id", None)
                    transport_request.pop("response_channel", None)
                    
                    # 处理请求
                    response_generator = handler(transport_request)
                    
                    # 发送响应
                    async for response in response_generator:
                        # 发布响应到客户端特定通道
                        await self.redis_client.publish(
                            response_channel,
                            json.dumps(response)
                        )
                
                except json.JSONDecodeError as e:
                    print(f"错误: 无法解析JSON消息: {e}")
                except Exception as e:
                    print(f"错误: 处理请求时发生异常: {e}")
        
        except asyncio.CancelledError:
            # 传输层被取消,正常退出
            print("Redis传输层运行被取消")
        except Exception as e:
            # 其他错误
            print(f"Redis传输层运行错误: {e}")
            raise

# 客户端传输层实现
class RedisClientTransport(ClientTransport):
    """基于Redis的MCP客户端传输层"""
    
    def __init__(self, redis_url="redis://localhost:6379/0", channel_prefix="mcp"):
        """初始化Redis客户端传输层
        
        参数:
            redis_url: Redis连接URL
            channel_prefix: 通道前缀
        """
        self.redis_url = redis_url
        self.channel_prefix = channel_prefix
        self.redis_client = None
        self.pubsub = None
        self.request_channel = f"{channel_prefix}:requests"
        self.client_id = str(uuid.uuid4())
        self.connected = False
    
    async def connect(self):
        """连接到服务器"""
        # 连接到Redis
        self.redis_client = redis.from_url(self.redis_url)
        # 创建发布/订阅对象
        self.pubsub = self.redis_client.pubsub()
        self.connected = True
        print(f"已连接到Redis: {self.redis_url}")
    
    async def disconnect(self):
        """断开与服务器的连接"""
        self.connected = False
        if self.pubsub:
            await self.pubsub.unsubscribe()
        if self.redis_client:
            await self.redis_client.close()
        print("已断开与Redis的连接")
    
    async def send_request(self, request: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]:
        """发送请求并接收响应"""
        if not self.connected or not self.redis_client:
            raise TransportError("未连接到服务器")
        
        # 获取请求ID
        request_id = request["id"]
        
        # 创建响应通道
        response_channel = f"{self.channel_prefix}:responses:{self.client_id}:{request_id}"
        
        # 订阅响应通道
        await self.pubsub.subscribe(response_channel)
        
        try:
            # 构建传输请求
            transport_request = request.copy()
            transport_request["client_id"] = self.client_id
            transport_request["response_channel"] = response_channel
            
            # 发布请求
            await self.redis_client.publish(
                self.request_channel,
                json.dumps(transport_request)
            )
            
            # 等待并处理响应
            while True:
                # 接收响应
                message = await self.pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
                
                if not message:
                    # 没有消息,继续等待
                    await asyncio.sleep(0.01)
                    continue
                
                try:
                    data = message.get("data")
                    if isinstance(data, bytes):
                        data = data.decode("utf-8")
                    
                    response = json.loads(data)
                    
                    # 确认响应ID匹配
                    if response.get("id") != request_id:
                        print(f"警告: 收到ID不匹配的响应,预期 {request_id},实际 {response.get('id')}")
                        continue
                    
                    # 产生响应
                    yield response
                    
                    # 检查是否为最终响应
                    if response.get("type") in ("response", "error") and not response.get("partial", False):
                        break
                
                except json.JSONDecodeError as e:
                    # 解析错误
                    error_response = {
                        "id": request_id,
                        "type": "error",
                        "error": {
                            "code": "parse_error",
                            "message": f"无法解析响应JSON: {str(e)}"
                        }
                    }
                    yield error_response
                    break
                except Exception as e:
                    # 其他错误
                    error_response = {
                        "id": request_id,
                        "type": "error",
                        "error": {
                            "code": "transport_error",
                            "message": f"传输错误: {str(e)}"
                        }
                    }
                    yield error_response
                    break
        
        finally:
            # 取消订阅响应通道
            await self.pubsub.unsubscribe(response_channel)

传输层选择指南

在为MCP应用选择或设计传输层时,考虑以下因素:

  1. 性能需求

    • 延迟敏感度
    • 吞吐量需求
    • 并发连接数
  2. 网络环境

    • 内部网络或互联网
    • 防火墙或代理限制
    • 带宽限制
  3. 部署架构

    • 单机部署或分布式
    • 容器化或无服务器
    • 高可用性需求
  4. 安全需求

    • 传输加密
    • 身份验证
    • 访问控制

下表提供了不同传输类型的特性比较:

传输类型 优点 缺点 最佳使用场景
stdio 简单、零依赖、低延迟 仅支持单进程通信 本地开发、子进程通信
SSE 简单实现、易于调试、通过HTTP防火墙 单向通信、连接数限制 服务器到客户端的事件通知
WebSocket 双向通信、低延迟、支持标准库 可能被防火墙阻止、需要心跳机制 实时交互应用、聊天应用
HTTP/REST 广泛支持、易于理解、无状态 请求开销高、不适合频繁小消息 简单集成、RESTful服务
ZeroMQ 高性能、灵活模式、跨语言 额外依赖、复杂配置 高性能系统、微服务架构
Redis 可靠性高、支持集群、内置消息持久化 额外依赖、增加部署复杂性 分布式系统、需要消息可靠性的场景

小结

在本章中,我们探讨了MCP传输协议和自定义传输层实现:

  • MCP传输协议的基础知识,包括消息格式和类型
  • 如何实现自定义传输层的服务器和客户端部分
  • 基于ZeroMQ和Redis的传输层实现示例
  • 使用自定义传输层的服务器和客户端代码
  • 传输层选择的考虑因素

掌握传输层的设计和实现,可以使您根据特定需求优化MCP应用的通信效率和可靠性。在下一章中,我们将探讨高性能MCP服务的构建,包括并发和异步处理模型。

使用 Hugo 构建
主题 StackJimmy 设计