3.2 高性能MCP服务
构建高性能的MCP服务需要理解并发模型、异步处理和资源优化技术。本章将探讨如何设计和实现能够高效处理大量请求的MCP服务。
并发和异步处理模型
MCP服务器建立在Python的异步编程模型之上,使用asyncio库实现高效的并发处理。
异步编程基础
import asyncio
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("高性能MCP服务示例")
@mcp.resource("example://async")
async def async_resource():
"""异步资源示例"""
# 模拟耗时操作
await asyncio.sleep(0.1)
return "异步资源响应"
@mcp.tool()
async def async_tool(param: str):
"""异步工具示例"""
# 并行执行多个异步操作
task1 = asyncio.create_task(async_operation_1(param))
task2 = asyncio.create_task(async_operation_2(param))
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
return {
"result1": results[0],
"result2": results[1]
}
async def async_operation_1(data):
await asyncio.sleep(0.2)
return f"操作1结果: {data}"
async def async_operation_2(data):
await asyncio.sleep(0.3)
return f"操作2结果: {data}"
并发请求处理
MCP服务器本身已设计为高效处理并发请求。当多个客户端同时发送请求时,服务器会为每个请求创建独立的任务,允许它们并行执行。
@mcp.middleware("request")
async def concurrency_limiter(context, call_next):
"""限制并发请求数量的中间件"""
# 使用信号量控制并发
async with mcp.state.request_semaphore:
return await call_next(context)
@mcp.on_startup
async def setup_concurrency_control():
"""设置并发控制"""
# 创建限制最大50个并发请求的信号量
mcp.state.request_semaphore = asyncio.Semaphore(50)
mcp.logger.info("已设置并发限制: 50")
批处理和任务队列
对于需要处理大量任务的场景,可以实现任务队列和批处理机制:
from collections import deque
import time
@mcp.on_startup
async def setup_task_queue():
"""设置任务队列"""
# 创建任务队列
mcp.state.task_queue = deque()
# 启动后台任务处理器
asyncio.create_task(process_task_queue())
async def process_task_queue():
"""处理任务队列的后台任务"""
while True:
# 每秒检查队列
await asyncio.sleep(1)
# 批量处理队列中的任务
batch = []
batch_size = 20 # 每批处理的任务数
# 从队列中提取任务
while len(batch) < batch_size and mcp.state.task_queue:
if mcp.state.task_queue:
batch.append(mcp.state.task_queue.popleft())
else:
break
if batch:
# 并行处理批次任务
await process_batch(batch)
async def process_batch(batch):
"""并行处理一批任务"""
tasks = [process_single_task(task) for task in batch]
await asyncio.gather(*tasks)
async def process_single_task(task):
"""处理单个任务"""
# 任务处理逻辑
task_type = task.get("type")
task_data = task.get("data")
# 根据任务类型执行不同操作
if task_type == "data_processing":
await process_data(task_data)
elif task_type == "notification":
await send_notification(task_data)
# 其他任务类型...
@mcp.tool()
async def submit_background_task(task_type: str, task_data: dict):
"""提交后台任务
参数:
task_type: 任务类型
task_data: 任务数据
返回:
提交状态
"""
# 将任务添加到队列
task = {
"type": task_type,
"data": task_data,
"submitted_at": time.time()
}
mcp.state.task_queue.append(task)
return {
"success": True,
"message": f"任务已提交到队列,当前队列长度: {len(mcp.state.task_queue)}"
}
资源缓存和优化
MCP服务性能的关键在于高效管理资源和数据。
资源缓存
对于频繁请求但变化不大的资源,缓存是提高性能的有效方法:
import time
from functools import wraps
# 简单的内存缓存
cache = {}
def cached(ttl_seconds=60):
"""资源缓存装饰器
参数:
ttl_seconds: 缓存过期时间(秒)
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# 检查缓存
if cache_key in cache:
entry = cache[cache_key]
# 检查是否过期
if time.time() - entry["timestamp"] < ttl_seconds:
return entry["data"]
# 执行原始函数
result = await func(*args, **kwargs)
# 更新缓存
cache[cache_key] = {
"data": result,
"timestamp": time.time()
}
return result
return wrapper
return decorator
@mcp.resource("data://expensive/{item_id}")
@cached(ttl_seconds=300) # 缓存5分钟
async def get_expensive_data(item_id: str):
"""获取计算成本高的数据"""
# 模拟耗时计算或数据库查询
await asyncio.sleep(2)
return f"Item {item_id} expensive data result"
使用更高级的缓存
对于生产环境,可以使用更强大的缓存解决方案:
import aioredis
from mcp.server.fastmcp import FastMCP, Depends
mcp = FastMCP("Redis缓存示例")
# Redis连接依赖
async def get_redis():
"""提供Redis连接"""
if not hasattr(mcp.state, "redis"):
# 首次连接时初始化
mcp.state.redis = await aioredis.from_url("redis://localhost:6379")
return mcp.state.redis
async def redis_cached(key_prefix, ttl_seconds=60):
"""Redis缓存装饰器
参数:
key_prefix: 缓存键前缀
ttl_seconds: 缓存过期时间(秒)
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, redis=Depends(get_redis), **kwargs):
# 前两个参数通常是context和资源路径参数
# 从第三个参数开始构建缓存键
cache_args = args[2:] if len(args) > 2 else ()
cache_key = f"{key_prefix}:{func.__name__}:{str(cache_args)}:{str(kwargs)}"
# 尝试从Redis获取缓存
cached_data = await redis.get(cache_key)
if cached_data:
return json.loads(cached_data)
# 执行原始函数
result = await func(*args, **kwargs)
# 设置缓存
await redis.set(
cache_key,
json.dumps(result),
ex=ttl_seconds
)
return result
return wrapper
return decorator
@mcp.resource("products://{product_id}")
@redis_cached("products", ttl_seconds=3600) # 缓存1小时
async def get_product(context, product_id: str, redis=Depends(get_redis)):
"""获取产品信息"""
# 模拟数据库查询
await asyncio.sleep(0.5)
return {
"id": product_id,
"name": f"Product {product_id}",
"price": float(product_id) * 10.99,
"description": f"This is product {product_id}"
}
资源优化策略
1. 分页和限流
处理大型数据集时,实现分页和限流是必要的:
@mcp.resource("data://list/{resource_type}")
async def list_resources(resource_type: str, page: int = 1, page_size: int = 20):
"""获取分页资源列表
参数:
resource_type: 资源类型
page: 页码(从1开始)
page_size: 每页项目数
"""
# 计算分页参数
offset = (page - 1) * page_size
limit = page_size
# 模拟从数据库获取分页数据
total_items = 1000 # 总项目数
# 获取分页数据
items = [
{
"id": i,
"name": f"{resource_type} item {i}",
"created_at": "2023-01-01T00:00:00Z"
}
for i in range(offset, min(offset + limit, total_items))
]
# 返回分页结果
return {
"items": items,
"total": total_items,
"page": page,
"page_size": page_size,
"total_pages": (total_items + page_size - 1) // page_size
}
2. 资源压缩
对于大型资源,可以使用压缩减少传输量:
import gzip
import json
@mcp.resource("data://large")
async def get_large_dataset():
"""获取大型数据集"""
# 生成大型数据
large_data = [
{"id": i, "data": f"Sample data {i}" * 100}
for i in range(1000)
]
# 将数据转换为JSON字符串
json_data = json.dumps(large_data)
# 压缩数据
compressed_data = gzip.compress(json_data.encode('utf-8'))
# 返回压缩数据和内容类型
return compressed_data, "application/gzip"
3. 延迟加载和惰性计算
对于复杂操作,可以使用延迟加载和惰性计算:
@mcp.resource("report://{report_id}")
async def get_report(report_id: str, sections: list = None):
"""获取报告,支持选择性加载章节
参数:
report_id: 报告ID
sections: 要加载的章节列表,如果为空则加载所有章节
"""
# 获取报告基本信息
report_base = await get_report_base(report_id)
# 如果没有指定章节,获取所有章节
if not sections:
sections = [section["id"] for section in report_base["available_sections"]]
# 只加载请求的章节
report_sections = {}
for section_id in sections:
report_sections[section_id] = await get_report_section(report_id, section_id)
# 组装最终报告
report = {
**report_base,
"sections": report_sections
}
return report
性能监控和分析
为了持续优化MCP服务性能,需要实现性能监控和分析。
请求性能监控
import time
from collections import defaultdict
@mcp.on_startup
async def setup_performance_monitoring():
"""设置性能监控"""
# 初始化性能指标存储
mcp.state.performance_metrics = {
"requests": defaultdict(list), # 按endpoint存储请求时间
"resource_hits": defaultdict(int), # 资源请求计数
"tool_calls": defaultdict(int), # 工具调用计数
}
@mcp.middleware("request")
async def performance_tracker(context, call_next):
"""跟踪请求性能的中间件"""
# 记录开始时间
start_time = time.time()
# 处理请求
response = await call_next(context)
# 计算处理时间
processing_time = time.time() - start_time
# 记录性能指标
request_type = context.scope.get("type", "unknown")
endpoint = context.scope.get("endpoint", "unknown")
# 存储请求时间
key = f"{request_type}:{endpoint}"
metrics = mcp.state.performance_metrics["requests"]
metrics[key].append(processing_time)
# 保持列表在合理大小
if len(metrics[key]) > 1000:
metrics[key] = metrics[key][-1000:]
# 记录资源请求或工具调用
if request_type == "resource":
mcp.state.performance_metrics["resource_hits"][endpoint] += 1
elif request_type == "tool":
mcp.state.performance_metrics["tool_calls"][endpoint] += 1
return response
@mcp.resource("admin://performance")
async def get_performance_metrics():
"""获取性能指标"""
metrics = mcp.state.performance_metrics
# 计算平均请求时间
avg_request_times = {}
for key, times in metrics["requests"].items():
if times:
avg_request_times[key] = sum(times) / len(times)
# 构建性能报告
return {
"average_request_times": avg_request_times,
"resource_hits": dict(metrics["resource_hits"]),
"tool_calls": dict(metrics["tool_calls"])
}
性能调优建议
- 使用异步I/O:所有可能阻塞的操作都应该使用异步API。
- 合理设置并发限制:根据服务器硬件和外部依赖设置合适的并发限制。
- 监控内存使用:使用
tracemalloc或其他工具监控内存使用。 - 分析关键路径:使用profiler识别并优化瓶颈。
- 使用连接池:对于数据库或外部服务,使用连接池减少连接建立开销。
- 批量处理:合并相关操作,减少网络往返。
- 优先使用uvloop:可能的情况下使用
uvloop代替标准asyncio事件循环。
进阶示例:高性能数据处理服务
以下是一个综合示例,展示如何构建高性能的数据处理MCP服务:
import asyncio
import time
import json
import aiohttp
import aioredis
from collections import deque
from functools import wraps
from mcp.server.fastmcp import FastMCP, Context, Depends
import uvloop
# 使用uvloop替代标准事件循环
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 创建MCP服务器
mcp = FastMCP("高性能数据处理服务")
# 初始化状态
@mcp.on_startup
async def initialize():
"""初始化服务器状态"""
# 配置
mcp.state.config = {
"max_workers": 10,
"batch_size": 50,
"request_timeout": 30,
"cache_ttl": 300 # 5分钟
}
# 任务队列
mcp.state.task_queue = deque()
# 工作线程信号量
mcp.state.worker_semaphore = asyncio.Semaphore(
mcp.state.config["max_workers"]
)
# 连接池
mcp.state.http_session = aiohttp.ClientSession()
# Redis连接
mcp.state.redis = await aioredis.from_url("redis://localhost:6379")
# 启动后台任务处理器
asyncio.create_task(process_task_queue())
mcp.logger.info("服务器初始化完成")
@mcp.on_shutdown
async def cleanup():
"""清理资源"""
if hasattr(mcp.state, "http_session"):
await mcp.state.http_session.close()
if hasattr(mcp.state, "redis"):
await mcp.state.redis.close()
mcp.logger.info("资源清理完成")
# Redis依赖
async def get_redis():
"""获取Redis连接"""
return mcp.state.redis
# HTTP会话依赖
async def get_http_session():
"""获取HTTP会话"""
return mcp.state.http_session
# 缓存装饰器
def redis_cache(key_prefix, ttl=None):
"""Redis缓存装饰器"""
if ttl is None:
ttl = mcp.state.config["cache_ttl"]
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{key_prefix}:{func.__name__}"
for arg in args[1:]: # 跳过self/cls/context参数
if isinstance(arg, (str, int, float, bool)):
cache_key += f":{arg}"
for k, v in sorted(kwargs.items()):
if isinstance(v, (str, int, float, bool)):
cache_key += f":{k}={v}"
# 检查缓存
cached = await mcp.state.redis.get(cache_key)
if cached:
return json.loads(cached)
# 执行原始函数
result = await func(*args, **kwargs)
# 设置缓存
await mcp.state.redis.set(
cache_key,
json.dumps(result),
ex=ttl
)
return result
return wrapper
return decorator
# 后台任务处理
async def process_task_queue():
"""处理任务队列"""
while True:
# 批量处理任务
batch_size = mcp.state.config["batch_size"]
if len(mcp.state.task_queue) > 0:
# 提取批次
batch = []
while len(batch) < batch_size and mcp.state.task_queue:
batch.append(mcp.state.task_queue.popleft())
# 处理批次
if batch:
mcp.logger.info(f"处理批次任务,数量: {len(batch)}")
tasks = [process_task(task) for task in batch]
await asyncio.gather(*tasks)
# 等待下一个检查周期
await asyncio.sleep(0.1)
async def process_task(task):
"""处理单个任务"""
# 限制最大并发工作线程
async with mcp.state.worker_semaphore:
task_type = task.get("type")
task_data = task.get("data")
try:
if task_type == "data_fetch":
await fetch_external_data(task_data)
elif task_type == "data_process":
await process_data_batch(task_data)
elif task_type == "notification":
await send_notification(task_data)
else:
mcp.logger.warning(f"未知任务类型: {task_type}")
except Exception as e:
mcp.logger.error(f"处理任务异常: {e}")
# 数据获取工具
@mcp.tool()
@redis_cache("data_fetch")
async def fetch_data(
url: str,
params: dict = None,
http_session = Depends(get_http_session)
):
"""从外部API获取数据
参数:
url: API URL
params: 查询参数
"""
async with http_session.get(
url,
params=params,
timeout=mcp.state.config["request_timeout"]
) as response:
response.raise_for_status()
data = await response.json()
return data
# 数据处理资源
@mcp.resource("data://processed/{dataset_id}")
@redis_cache("processed_data")
async def get_processed_data(context: Context, dataset_id: str):
"""获取处理后的数据集
参数:
dataset_id: 数据集ID
"""
# 从原始数据源获取数据
raw_data = await fetch_raw_data(dataset_id)
# 处理数据
processed_data = await process_dataset(raw_data)
return processed_data
async def fetch_raw_data(dataset_id: str):
"""获取原始数据"""
# 模拟数据获取
await asyncio.sleep(0.2)
return {
"id": dataset_id,
"items": [{"value": i} for i in range(100)]
}
async def process_dataset(dataset):
"""处理数据集"""
# 创建处理任务
tasks = [process_item(item) for item in dataset["items"]]
# 并行处理所有项目
processed_items = await asyncio.gather(*tasks)
return {
"id": dataset["id"],
"processed_items": processed_items,
"stats": calculate_stats(processed_items)
}
async def process_item(item):
"""处理单个数据项目"""
# 模拟处理
await asyncio.sleep(0.01)
return {
"original_value": item["value"],
"processed_value": item["value"] * 2,
"metadata": {
"processed_at": time.time()
}
}
def calculate_stats(items):
"""计算数据统计"""
values = [item["processed_value"] for item in items]
return {
"count": len(values),
"min": min(values) if values else None,
"max": max(values) if values else None,
"mean": sum(values) / len(values) if values else None
}
# 批量数据处理工具
@mcp.tool()
async def submit_batch_processing(dataset_ids: list):
"""提交批量数据处理任务
参数:
dataset_ids: 要处理的数据集ID列表
"""
for dataset_id in dataset_ids:
# 将每个数据集添加到处理队列
task = {
"type": "data_process",
"data": {
"dataset_id": dataset_id,
"submitted_at": time.time()
}
}
mcp.state.task_queue.append(task)
return {
"success": True,
"message": f"已提交{len(dataset_ids)}个数据集处理任务",
"queue_length": len(mcp.state.task_queue)
}
# 性能监控资源
@mcp.resource("admin://monitoring")
async def get_monitoring_data():
"""获取服务性能监控数据"""
return {
"queue_length": len(mcp.state.task_queue),
"active_workers": mcp.state.config["max_workers"] - mcp.state.worker_semaphore._value,
"system_stats": await get_system_stats(),
"cache_stats": await get_cache_stats()
}
async def get_system_stats():
"""获取系统统计信息"""
# 在实际实现中,这里可能包含内存、CPU使用率等信息
return {
"timestamp": time.time()
}
async def get_cache_stats():
"""获取缓存统计信息"""
# 从Redis获取缓存统计
info = await mcp.state.redis.info("memory")
keys = await mcp.state.redis.dbsize()
return {
"memory_used": info.get("used_memory_human", "unknown"),
"total_keys": keys
}
if __name__ == "__main__":
# 运行高性能服务器
mcp.run(
transport="websocket", # 使用WebSocket为最佳性能
host="0.0.0.0",
port=8000
)
小结
在本章中,我们探讨了构建高性能MCP服务的关键技术:
- 并发和异步处理模型,包括任务队列和批处理
- 资源缓存和优化策略,从简单内存缓存到Redis分布式缓存
- 性能监控和分析技术
- 一个综合示例,展示了如何构建高性能数据处理服务
掌握这些技术将帮助您构建能够处理大量并发请求的高效MCP服务。在下一章中,我们将探讨如何将MCP与现有系统集成,包括Web框架和数据库。