3.5 测试与部署
测试和部署是MCP服务开发周期中的关键环节。本章将介绍如何对MCP服务进行全面测试,以及如何安全高效地部署到生产环境中。
MCP服务测试策略
单元测试
单元测试是验证MCP服务各个组件正确性的基础。对于MCP服务,我们应该测试资源处理函数、工具函数和中间件等组件。
测试工具函数
使用Python标准库的unittest或第三方库如pytest来测试MCP工具函数:
import pytest
from mcp.testing import create_test_context
from myapp.tools import calculate_sum, format_text
# 使用pytest进行工具函数单元测试
async def test_calculate_sum():
"""测试求和工具"""
# 构造测试数据
numbers = [1, 2, 3, 4, 5]
# 直接调用函数
result = await calculate_sum(numbers)
# 断言结果
assert result == 15
assert isinstance(result, int)
async def test_format_text():
"""测试文本格式化工具"""
# 构造测试数据
text = "hello world"
format_type = "uppercase"
# 直接调用函数
result = await format_text(text, format_type)
# 断言结果
assert result == "HELLO WORLD"
# 测试其他格式类型
result = await format_text(text, "capitalize")
assert result == "Hello world"
# 使用上下文测试
async def test_tool_with_context():
"""测试需要上下文的工具"""
# 创建测试上下文
context = create_test_context()
context.state.user_id = "test_user"
# 调用依赖上下文的函数
result = await user_specific_tool(context, "test_data")
# 断言结果
assert result["user_id"] == "test_user"
assert "processed_data" in result
测试资源处理函数
对于资源处理函数,我们需要模拟请求并验证响应:
import pytest
from mcp.testing import create_test_context
from myapp.resources import get_user_profile, list_items
# 测试资源处理函数
async def test_get_user_profile():
"""测试获取用户资料资源"""
# 创建测试上下文
context = create_test_context()
context.state.user_id = "test_user"
# 模拟数据库依赖
mock_db = MockDatabase()
mock_db.add_user({
"id": "test_user",
"name": "Test User",
"email": "test@example.com"
})
# 注入模拟依赖
context.state.db = mock_db
# 调用资源处理函数
response = await get_user_profile(context, "test_user")
# 断言响应
assert response["id"] == "test_user"
assert response["name"] == "Test User"
assert response["email"] == "test@example.com"
# 测试分页资源
async def test_list_items_pagination():
"""测试项目列表分页"""
# 创建测试上下文
context = create_test_context()
# 模拟数据
mock_items = [{"id": i, "name": f"Item {i}"} for i in range(100)]
# 模拟数据库
mock_db = MockDatabase()
mock_db.add_items(mock_items)
context.state.db = mock_db
# 测试第一页
response = await list_items(context, page=1, page_size=10)
assert len(response["items"]) == 10
assert response["total"] == 100
assert response["page"] == 1
assert response["total_pages"] == 10
# 测试最后一页
response = await list_items(context, page=10, page_size=10)
assert len(response["items"]) == 10
assert response["items"][0]["id"] == 90
测试中间件
中间件测试需要模拟请求上下文并验证中间件逻辑:
import pytest
from mcp.testing import create_test_context
from myapp.middleware import AuthMiddleware, LoggingMiddleware
# 测试认证中间件
async def test_auth_middleware():
"""测试认证中间件"""
# 创建中间件实例
auth_middleware = AuthMiddleware()
# 创建测试上下文(带有认证令牌)
context = create_test_context()
context.scope["headers"] = {
"authorization": "Bearer valid_token"
}
# 模拟令牌验证
async def mock_token_verify(token):
if token == "valid_token":
return {"sub": "test_user"}
return None
auth_middleware.verify_token = mock_token_verify
# 模拟下一个处理器
async def next_handler(ctx):
return {"result": "success"}
# 调用中间件
response = await auth_middleware(context, next_handler)
# 断言结果
assert response["result"] == "success"
assert context.state.user_id == "test_user"
# 测试无效令牌
context.scope["headers"] = {
"authorization": "Bearer invalid_token"
}
response = await auth_middleware(context, next_handler)
# 断言结果(应返回错误)
assert response[1] == 401 # 状态码
集成测试
集成测试验证MCP服务不同组件之间的协作,包括资源、工具和中间件的交互。
创建测试客户端
import pytest
from mcp.client.base import MCPClient
from mcp.server.fastmcp import FastMCP
from mcp.testing import TestTransport
@pytest.fixture
async def test_client():
"""创建测试客户端"""
# 创建MCP服务器
mcp_server = FastMCP("Test Server")
# 注册资源和工具
@mcp_server.resource("test://resource")
async def test_resource():
return {"message": "Test Resource"}
@mcp.tool()
async def test_tool(param: str):
return {"result": f"Processed: {param}"}
# 启动服务器
await mcp_server.startup()
# 创建测试传输层
transport = TestTransport(mcp_server)
# 创建客户端
client = MCPClient(transport=transport)
await client.connect()
# 返回客户端
yield client
# 清理
await client.disconnect()
await mcp_server.shutdown()
# 使用测试客户端进行集成测试
async def test_get_resource(test_client):
"""测试获取资源"""
# 调用资源
result = await test_client.get_resource("test://resource")
# 断言结果
assert result["message"] == "Test Resource"
async def test_execute_tool(test_client):
"""测试执行工具"""
# 调用工具
result = await test_client.execute_tool("test_tool", {"param": "test"})
# 断言结果
assert result["result"] == "Processed: test"
端到端测试
端到端测试在真实环境中运行MCP服务,确保整个服务能够正确工作:
import pytest
import asyncio
import aiohttp
from mcp.client.base import MCPClient
from myapp.server import create_app
@pytest.fixture(scope="module")
async def running_server():
"""启动真实服务器用于测试"""
# 创建应用
app = create_app()
# 在单独的线程中启动服务器
server_task = asyncio.create_task(
app.run_server(host="localhost", port=9999)
)
# 等待服务器启动
await asyncio.sleep(1)
yield {"host": "localhost", "port": 9999}
# 关闭服务器
server_task.cancel()
await asyncio.sleep(0.5)
@pytest.fixture
async def e2e_client(running_server):
"""创建真实客户端"""
# 创建客户端
client = MCPClient(
transport_type="websocket",
host=running_server["host"],
port=running_server["port"]
)
# 连接到服务器
await client.connect()
yield client
# 断开连接
await client.disconnect()
# 端到端测试
async def test_e2e_workflow(e2e_client):
"""测试完整工作流程"""
# 1. 用户认证
auth_result = await e2e_client.execute_tool(
"login",
{"username": "testuser", "password": "password"}
)
assert "access_token" in auth_result
# 设置认证令牌
e2e_client.set_auth_token(auth_result["access_token"])
# 2. 创建资源
create_result = await e2e_client.execute_tool(
"create_document",
{
"title": "Test Document",
"content": "This is a test document."
}
)
assert "id" in create_result
doc_id = create_result["id"]
# 3. 获取资源
doc = await e2e_client.get_resource(f"documents://{doc_id}")
assert doc["title"] == "Test Document"
# 4. 更新资源
update_result = await e2e_client.execute_tool(
"update_document",
{
"id": doc_id,
"title": "Updated Document",
"content": "This document has been updated."
}
)
assert update_result["success"] is True
# 5. 再次获取并验证
updated_doc = await e2e_client.get_resource(f"documents://{doc_id}")
assert updated_doc["title"] == "Updated Document"
assert updated_doc["content"] == "This document has been updated."
性能测试
性能测试评估MCP服务在高负载下的表现,包括响应时间、吞吐量和资源使用情况。
负载测试
import asyncio
import time
import statistics
from mcp.client.base import MCPClient
async def load_test(client, tool_name, arguments, num_requests=100, concurrency=10):
"""执行负载测试
参数:
client: MCP客户端
tool_name: 要测试的工具名称
arguments: 工具参数
num_requests: 总请求数
concurrency: 并发数
返回:
测试结果
"""
# 创建信号量控制并发
semaphore = asyncio.Semaphore(concurrency)
# 测试单个请求
async def test_request():
start_time = time.time()
try:
async with semaphore:
await client.execute_tool(tool_name, arguments)
elapsed = time.time() - start_time
return {"success": True, "elapsed": elapsed}
except Exception as e:
elapsed = time.time() - start_time
return {"success": False, "elapsed": elapsed, "error": str(e)}
# 创建所有任务
tasks = [test_request() for _ in range(num_requests)]
# 执行所有任务
print(f"开始负载测试: {num_requests}个请求, 并发数{concurrency}")
start_time = time.time()
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# 分析结果
successes = [r for r in results if r["success"]]
failures = [r for r in results if not r["success"]]
success_times = [r["elapsed"] for r in successes]
# 计算统计数据
stats = {
"total_requests": num_requests,
"successful_requests": len(successes),
"failed_requests": len(failures),
"total_time": total_time,
"requests_per_second": num_requests / total_time,
"min_response_time": min(success_times) if success_times else None,
"max_response_time": max(success_times) if success_times else None,
"avg_response_time": statistics.mean(success_times) if success_times else None,
"median_response_time": statistics.median(success_times) if success_times else None,
"p95_response_time": statistics.quantiles(success_times, n=20)[18] if len(success_times) >= 20 else None
}
return stats
# 执行负载测试
async def run_load_tests():
"""运行一系列负载测试"""
# 创建客户端
client = MCPClient(
transport_type="websocket",
host="localhost",
port=8000
)
# 连接到服务器
await client.connect()
try:
# 测试1: 低负载
results_low = await load_test(
client,
"echo",
{"message": "test"},
num_requests=100,
concurrency=5
)
print("低负载测试结果:", results_low)
# 测试2: 中负载
results_medium = await load_test(
client,
"echo",
{"message": "test"},
num_requests=500,
concurrency=20
)
print("中负载测试结果:", results_medium)
# 测试3: 高负载
results_high = await load_test(
client,
"echo",
{"message": "test"},
num_requests=1000,
concurrency=50
)
print("高负载测试结果:", results_high)
finally:
# 断开连接
await client.disconnect()
长时间运行测试
import asyncio
import time
import psutil
import os
from mcp.client.base import MCPClient
async def monitor_resources(pid, interval=1.0, duration=60.0):
"""监控进程资源使用情况
参数:
pid: 进程ID
interval: 监控间隔(秒)
duration: 监控持续时间(秒)
返回:
监控结果
"""
process = psutil.Process(pid)
start_time = time.time()
end_time = start_time + duration
# 存储监控数据
cpu_usage = []
memory_usage = []
while time.time() < end_time:
# 获取CPU和内存使用情况
try:
cpu_percent = process.cpu_percent()
memory_info = process.memory_info()
cpu_usage.append(cpu_percent)
memory_usage.append(memory_info.rss / (1024 * 1024)) # MB
# 等待下一个间隔
await asyncio.sleep(interval)
except psutil.NoSuchProcess:
break
# 计算统计数据
stats = {
"duration": time.time() - start_time,
"cpu_usage": {
"avg": sum(cpu_usage) / len(cpu_usage) if cpu_usage else 0,
"max": max(cpu_usage) if cpu_usage else 0,
"min": min(cpu_usage) if cpu_usage else 0
},
"memory_usage": {
"avg": sum(memory_usage) / len(memory_usage) if memory_usage else 0,
"max": max(memory_usage) if memory_usage else 0,
"min": min(memory_usage) if memory_usage else 0
}
}
return stats
async def long_running_test(server_process, client, duration=300):
"""执行长时间运行测试
参数:
server_process: 服务器进程
client: MCP客户端
duration: 测试持续时间(秒)
返回:
测试结果
"""
# 启动资源监控
monitor_task = asyncio.create_task(
monitor_resources(server_process.pid, interval=1.0, duration=duration)
)
# 持续发送请求
start_time = time.time()
end_time = start_time + duration
request_results = []
# 每秒发送一个请求
while time.time() < end_time:
try:
start_request = time.time()
await client.execute_tool("echo", {"message": "test"})
request_time = time.time() - start_request
request_results.append({
"timestamp": time.time(),
"elapsed": request_time,
"success": True
})
except Exception as e:
request_results.append({
"timestamp": time.time(),
"success": False,
"error": str(e)
})
# 等待下一秒
await asyncio.sleep(1.0)
# 等待监控任务完成
resource_stats = await monitor_task
# 分析请求结果
successes = [r for r in request_results if r.get("success")]
failures = [r for r in request_results if not r.get("success")]
success_times = [r["elapsed"] for r in successes if "elapsed" in r]
request_stats = {
"total_requests": len(request_results),
"successful_requests": len(successes),
"failed_requests": len(failures),
"avg_response_time": sum(success_times) / len(success_times) if success_times else None
}
return {
"duration": duration,
"request_stats": request_stats,
"resource_stats": resource_stats
}
测试最佳实践
- 测试数据隔离:确保测试使用隔离的数据集,不影响生产数据。
- 测试环境一致性:测试环境应尽可能与生产环境相似。
- 自动化测试:将测试集成到CI/CD流程中,自动运行。
- 测试覆盖率:定期检查测试覆盖率,确保关键代码路径得到测试。
- 故障注入测试:模拟各种故障情况,测试服务的稳定性。
下面是一个使用pytest配置自动化测试的示例:
# conftest.py
import pytest
import os
import asyncio
from mcp.testing import create_test_server, create_test_client
# 测试前设置环境变量
def pytest_configure(config):
os.environ["TESTING"] = "1"
os.environ["MCP_ENV"] = "test"
os.environ["DB_URL"] = "sqlite:///:memory:"
# 创建测试服务器
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def test_server():
"""创建共享测试服务器"""
server = create_test_server()
await server.startup()
yield server
await server.shutdown()
@pytest.fixture
async def test_client(test_server):
"""创建测试客户端"""
client = create_test_client(test_server)
await client.connect()
yield client
await client.disconnect()
MCP服务部署策略
部署MCP服务需要考虑可用性、可扩展性和安全性。本节介绍几种常见的部署策略。
容器化部署
使用Docker容器部署MCP服务是一种流行的方法,它提供了一致的运行环境和简单的扩展能力。
创建Dockerfile
# 使用官方Python镜像作为基础镜像
FROM python:3.10-slim
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露MCP服务端口
EXPOSE 8000
# 设置健康检查
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# 启动MCP服务
CMD ["python", "-m", "myapp.main"]
部署配置示例
使用Docker Compose简化多容器部署:
# docker-compose.yml
version: '3.8'
services:
mcp-service:
build: .
ports:
- "8000:8000"
environment:
- MCP_ENV=production
- DB_URL=postgres://user:password@db:5432/mcpdb
- REDIS_URL=redis://redis:6379/0
- LOG_LEVEL=info
depends_on:
- db
- redis
restart: always
deploy:
replicas: 2
resources:
limits:
cpus: '0.5'
memory: 512M
update_config:
parallelism: 1
delay: 10s
restart_policy:
condition: on-failure
db:
image: postgres:14
volumes:
- db-data:/var/lib/postgresql/data
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=mcpdb
restart: always
redis:
image: redis:7
volumes:
- redis-data:/data
restart: always
volumes:
db-data:
redis-data:
云原生部署
在Kubernetes上部署MCP服务可以提供高可用性和自动扩展能力。
Kubernetes部署配置
# mcp-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-service
labels:
app: mcp-service
spec:
replicas: 3
selector:
matchLabels:
app: mcp-service
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
template:
metadata:
labels:
app: mcp-service
spec:
containers:
- name: mcp-service
image: myregistry/mcp-service:latest
ports:
- containerPort: 8000
env:
- name: MCP_ENV
value: "production"
- name: DB_URL
valueFrom:
secretKeyRef:
name: mcp-secrets
key: db-url
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: mcp-secrets
key: redis-url
resources:
limits:
cpu: "500m"
memory: "512Mi"
requests:
cpu: "200m"
memory: "256Mi"
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 15
periodSeconds: 20
---
apiVersion: v1
kind: Service
metadata:
name: mcp-service
spec:
selector:
app: mcp-service
ports:
- port: 80
targetPort: 8000
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-service
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-service
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
配置Ingress
# mcp-ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: mcp-ingress
annotations:
nginx.ingress.kubernetes.io/ssl-redirect: "true"
cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
tls:
- hosts:
- mcp.example.com
secretName: mcp-tls
rules:
- host: mcp.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: mcp-service
port:
number: 80
无服务器部署
对于轻量级MCP服务,无服务器部署提供了按需扩展和降低成本的好处。以下是使用AWS Lambda和API Gateway部署MCP服务的示例:
使用Serverless框架部署
# serverless.yml
service: mcp-serverless
provider:
name: aws
runtime: python3.10
stage: ${opt:stage, 'dev'}
region: ${opt:region, 'us-east-1'}
memorySize: 512
timeout: 10
environment:
MCP_ENV: ${self:provider.stage}
DB_URL: ${ssm:/mcp/${self:provider.stage}/db_url}
REDIS_URL: ${ssm:/mcp/${self:provider.stage}/redis_url}
iamRoleStatements:
- Effect: Allow
Action:
- dynamodb:Query
- dynamodb:Scan
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
Resource: !GetAtt MCPTable.Arn
functions:
mcp_handler:
handler: handler.lambda_handler
events:
- http:
path: /
method: ANY
cors: true
- http:
path: /{proxy+}
method: ANY
cors: true
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: $default
resources:
Resources:
MCPTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: mcp-${self:provider.stage}
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
Lambda处理器代码
# handler.py
import json
import asyncio
import os
from mangum import Mangum
from mcp.server.fastmcp import FastMCP
from myapp.server import create_app
# 创建MCP应用程序
app = create_app()
# 创建Lambda处理器
handler = Mangum(app.get_asgi_app())
def lambda_handler(event, context):
"""AWS Lambda处理器"""
return handler(event, context)
持续集成与持续部署(CI/CD)
使用CI/CD管道自动化MCP服务的测试和部署过程。
GitHub Actions工作流示例
# .github/workflows/main.yml
name: MCP Service CI/CD
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest pytest-asyncio pytest-cov
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi
- name: Lint with flake8
run: |
pip install flake8
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
- name: Test with pytest
run: |
pytest --cov=myapp --cov-report=xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
build-and-push:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: myregistry/mcp-service:latest,myregistry/mcp-service:${{ github.sha }}
deploy:
needs: build-and-push
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up kubectl
uses: azure/setup-kubectl@v3
with:
version: 'latest'
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1
- name: Update kube config
run: aws eks update-kubeconfig --name mcp-cluster --region us-east-1
- name: Deploy to Kubernetes
run: |
# 更新部署镜像
kubectl set image deployment/mcp-service mcp-service=myregistry/mcp-service:${{ github.sha }} --record
# 等待部署完成
kubectl rollout status deployment/mcp-service
部署最佳实践
- 环境配置分离:使用环境变量或配置文件分离不同环境(开发、测试、生产)的配置。
- 零停机部署:使用滚动更新或蓝绿部署确保服务不中断。
- 自动扩展:根据负载自动扩展服务实例。
- 监控和告警:建立全面的监控和告警系统,及时发现问题。
- 回滚策略:准备快速回滚策略,以应对部署失败。
配置管理示例
# config.py
import os
from pathlib import Path
from typing import Dict, Any, Optional
import yaml
class Config:
"""配置管理类"""
def __init__(self):
self._config = {}
self._load_config()
def _load_config(self):
"""加载配置"""
# 确定环境
env = os.environ.get("MCP_ENV", "development")
# 基础配置
base_config = self._load_yaml("config/base.yaml")
# 环境特定配置
env_config = self._load_yaml(f"config/{env}.yaml")
# 合并配置
self._config = {**base_config, **(env_config or {})}
# 环境变量覆盖
for key in self._config:
env_value = os.environ.get(key.upper())
if env_value is not None:
self._config[key] = env_value
def _load_yaml(self, path: str) -> Dict[str, Any]:
"""加载YAML配置文件"""
config_path = Path(path)
if not config_path.exists():
return {}
with open(config_path, "r") as f:
return yaml.safe_load(f)
def get(self, key: str, default: Optional[Any] = None) -> Any:
"""获取配置值"""
return self._config.get(key, default)
def __getitem__(self, key: str) -> Any:
"""通过下标访问配置"""
return self._config[key]
# 单例配置实例
config = Config()
总结
测试和部署是MCP服务开发生命周期中不可或缺的环节。本章介绍了全面的测试策略,包括单元测试、集成测试和性能测试,以及多种部署选项,从容器化到无服务器部署。通过采用这些最佳实践,可以确保MCP服务的质量和可靠性。
在实际项目中,应根据具体需求选择合适的测试和部署策略,并随着服务的发展不断优化这些策略。记住,好的测试和部署流程不仅能提高服务质量,还能使开发团队更加高效。