3.5_测试与部署

3.5 测试与部署

测试和部署是MCP服务开发周期中的关键环节。本章将介绍如何对MCP服务进行全面测试,以及如何安全高效地部署到生产环境中。

MCP服务测试策略

单元测试

单元测试是验证MCP服务各个组件正确性的基础。对于MCP服务,我们应该测试资源处理函数、工具函数和中间件等组件。

测试工具函数

使用Python标准库的unittest或第三方库如pytest来测试MCP工具函数:

import pytest
from mcp.testing import create_test_context
from myapp.tools import calculate_sum, format_text

# 使用pytest进行工具函数单元测试
async def test_calculate_sum():
    """测试求和工具"""
    # 构造测试数据
    numbers = [1, 2, 3, 4, 5]
    
    # 直接调用函数
    result = await calculate_sum(numbers)
    
    # 断言结果
    assert result == 15
    assert isinstance(result, int)

async def test_format_text():
    """测试文本格式化工具"""
    # 构造测试数据
    text = "hello world"
    format_type = "uppercase"
    
    # 直接调用函数
    result = await format_text(text, format_type)
    
    # 断言结果
    assert result == "HELLO WORLD"
    
    # 测试其他格式类型
    result = await format_text(text, "capitalize")
    assert result == "Hello world"

# 使用上下文测试
async def test_tool_with_context():
    """测试需要上下文的工具"""
    # 创建测试上下文
    context = create_test_context()
    context.state.user_id = "test_user"
    
    # 调用依赖上下文的函数
    result = await user_specific_tool(context, "test_data")
    
    # 断言结果
    assert result["user_id"] == "test_user"
    assert "processed_data" in result

测试资源处理函数

对于资源处理函数,我们需要模拟请求并验证响应:

import pytest
from mcp.testing import create_test_context
from myapp.resources import get_user_profile, list_items

# 测试资源处理函数
async def test_get_user_profile():
    """测试获取用户资料资源"""
    # 创建测试上下文
    context = create_test_context()
    context.state.user_id = "test_user"
    
    # 模拟数据库依赖
    mock_db = MockDatabase()
    mock_db.add_user({
        "id": "test_user",
        "name": "Test User",
        "email": "test@example.com"
    })
    
    # 注入模拟依赖
    context.state.db = mock_db
    
    # 调用资源处理函数
    response = await get_user_profile(context, "test_user")
    
    # 断言响应
    assert response["id"] == "test_user"
    assert response["name"] == "Test User"
    assert response["email"] == "test@example.com"

# 测试分页资源
async def test_list_items_pagination():
    """测试项目列表分页"""
    # 创建测试上下文
    context = create_test_context()
    
    # 模拟数据
    mock_items = [{"id": i, "name": f"Item {i}"} for i in range(100)]
    
    # 模拟数据库
    mock_db = MockDatabase()
    mock_db.add_items(mock_items)
    context.state.db = mock_db
    
    # 测试第一页
    response = await list_items(context, page=1, page_size=10)
    assert len(response["items"]) == 10
    assert response["total"] == 100
    assert response["page"] == 1
    assert response["total_pages"] == 10
    
    # 测试最后一页
    response = await list_items(context, page=10, page_size=10)
    assert len(response["items"]) == 10
    assert response["items"][0]["id"] == 90

测试中间件

中间件测试需要模拟请求上下文并验证中间件逻辑:

import pytest
from mcp.testing import create_test_context
from myapp.middleware import AuthMiddleware, LoggingMiddleware

# 测试认证中间件
async def test_auth_middleware():
    """测试认证中间件"""
    # 创建中间件实例
    auth_middleware = AuthMiddleware()
    
    # 创建测试上下文(带有认证令牌)
    context = create_test_context()
    context.scope["headers"] = {
        "authorization": "Bearer valid_token"
    }
    
    # 模拟令牌验证
    async def mock_token_verify(token):
        if token == "valid_token":
            return {"sub": "test_user"}
        return None
    
    auth_middleware.verify_token = mock_token_verify
    
    # 模拟下一个处理器
    async def next_handler(ctx):
        return {"result": "success"}
    
    # 调用中间件
    response = await auth_middleware(context, next_handler)
    
    # 断言结果
    assert response["result"] == "success"
    assert context.state.user_id == "test_user"
    
    # 测试无效令牌
    context.scope["headers"] = {
        "authorization": "Bearer invalid_token"
    }
    response = await auth_middleware(context, next_handler)
    
    # 断言结果(应返回错误)
    assert response[1] == 401  # 状态码

集成测试

集成测试验证MCP服务不同组件之间的协作,包括资源、工具和中间件的交互。

创建测试客户端

import pytest
from mcp.client.base import MCPClient
from mcp.server.fastmcp import FastMCP
from mcp.testing import TestTransport

@pytest.fixture
async def test_client():
    """创建测试客户端"""
    # 创建MCP服务器
    mcp_server = FastMCP("Test Server")
    
    # 注册资源和工具
    @mcp_server.resource("test://resource")
    async def test_resource():
        return {"message": "Test Resource"}
    
    @mcp.tool()
    async def test_tool(param: str):
        return {"result": f"Processed: {param}"}
    
    # 启动服务器
    await mcp_server.startup()
    
    # 创建测试传输层
    transport = TestTransport(mcp_server)
    
    # 创建客户端
    client = MCPClient(transport=transport)
    await client.connect()
    
    # 返回客户端
    yield client
    
    # 清理
    await client.disconnect()
    await mcp_server.shutdown()

# 使用测试客户端进行集成测试
async def test_get_resource(test_client):
    """测试获取资源"""
    # 调用资源
    result = await test_client.get_resource("test://resource")
    
    # 断言结果
    assert result["message"] == "Test Resource"

async def test_execute_tool(test_client):
    """测试执行工具"""
    # 调用工具
    result = await test_client.execute_tool("test_tool", {"param": "test"})
    
    # 断言结果
    assert result["result"] == "Processed: test"

端到端测试

端到端测试在真实环境中运行MCP服务,确保整个服务能够正确工作:

import pytest
import asyncio
import aiohttp
from mcp.client.base import MCPClient
from myapp.server import create_app

@pytest.fixture(scope="module")
async def running_server():
    """启动真实服务器用于测试"""
    # 创建应用
    app = create_app()
    
    # 在单独的线程中启动服务器
    server_task = asyncio.create_task(
        app.run_server(host="localhost", port=9999)
    )
    
    # 等待服务器启动
    await asyncio.sleep(1)
    
    yield {"host": "localhost", "port": 9999}
    
    # 关闭服务器
    server_task.cancel()
    await asyncio.sleep(0.5)

@pytest.fixture
async def e2e_client(running_server):
    """创建真实客户端"""
    # 创建客户端
    client = MCPClient(
        transport_type="websocket",
        host=running_server["host"],
        port=running_server["port"]
    )
    
    # 连接到服务器
    await client.connect()
    
    yield client
    
    # 断开连接
    await client.disconnect()

# 端到端测试
async def test_e2e_workflow(e2e_client):
    """测试完整工作流程"""
    # 1. 用户认证
    auth_result = await e2e_client.execute_tool(
        "login",
        {"username": "testuser", "password": "password"}
    )
    assert "access_token" in auth_result
    
    # 设置认证令牌
    e2e_client.set_auth_token(auth_result["access_token"])
    
    # 2. 创建资源
    create_result = await e2e_client.execute_tool(
        "create_document",
        {
            "title": "Test Document",
            "content": "This is a test document."
        }
    )
    assert "id" in create_result
    doc_id = create_result["id"]
    
    # 3. 获取资源
    doc = await e2e_client.get_resource(f"documents://{doc_id}")
    assert doc["title"] == "Test Document"
    
    # 4. 更新资源
    update_result = await e2e_client.execute_tool(
        "update_document",
        {
            "id": doc_id,
            "title": "Updated Document",
            "content": "This document has been updated."
        }
    )
    assert update_result["success"] is True
    
    # 5. 再次获取并验证
    updated_doc = await e2e_client.get_resource(f"documents://{doc_id}")
    assert updated_doc["title"] == "Updated Document"
    assert updated_doc["content"] == "This document has been updated."

性能测试

性能测试评估MCP服务在高负载下的表现,包括响应时间、吞吐量和资源使用情况。

负载测试

import asyncio
import time
import statistics
from mcp.client.base import MCPClient

async def load_test(client, tool_name, arguments, num_requests=100, concurrency=10):
    """执行负载测试
    
    参数:
        client: MCP客户端
        tool_name: 要测试的工具名称
        arguments: 工具参数
        num_requests: 总请求数
        concurrency: 并发数
    
    返回:
        测试结果
    """
    # 创建信号量控制并发
    semaphore = asyncio.Semaphore(concurrency)
    
    # 测试单个请求
    async def test_request():
        start_time = time.time()
        try:
            async with semaphore:
                await client.execute_tool(tool_name, arguments)
            elapsed = time.time() - start_time
            return {"success": True, "elapsed": elapsed}
        except Exception as e:
            elapsed = time.time() - start_time
            return {"success": False, "elapsed": elapsed, "error": str(e)}
    
    # 创建所有任务
    tasks = [test_request() for _ in range(num_requests)]
    
    # 执行所有任务
    print(f"开始负载测试: {num_requests}个请求, 并发数{concurrency}")
    start_time = time.time()
    results = await asyncio.gather(*tasks)
    total_time = time.time() - start_time
    
    # 分析结果
    successes = [r for r in results if r["success"]]
    failures = [r for r in results if not r["success"]]
    
    success_times = [r["elapsed"] for r in successes]
    
    # 计算统计数据
    stats = {
        "total_requests": num_requests,
        "successful_requests": len(successes),
        "failed_requests": len(failures),
        "total_time": total_time,
        "requests_per_second": num_requests / total_time,
        "min_response_time": min(success_times) if success_times else None,
        "max_response_time": max(success_times) if success_times else None,
        "avg_response_time": statistics.mean(success_times) if success_times else None,
        "median_response_time": statistics.median(success_times) if success_times else None,
        "p95_response_time": statistics.quantiles(success_times, n=20)[18] if len(success_times) >= 20 else None
    }
    
    return stats

# 执行负载测试
async def run_load_tests():
    """运行一系列负载测试"""
    # 创建客户端
    client = MCPClient(
        transport_type="websocket",
        host="localhost",
        port=8000
    )
    
    # 连接到服务器
    await client.connect()
    
    try:
        # 测试1: 低负载
        results_low = await load_test(
            client,
            "echo",
            {"message": "test"},
            num_requests=100,
            concurrency=5
        )
        print("低负载测试结果:", results_low)
        
        # 测试2: 中负载
        results_medium = await load_test(
            client,
            "echo",
            {"message": "test"},
            num_requests=500,
            concurrency=20
        )
        print("中负载测试结果:", results_medium)
        
        # 测试3: 高负载
        results_high = await load_test(
            client,
            "echo",
            {"message": "test"},
            num_requests=1000,
            concurrency=50
        )
        print("高负载测试结果:", results_high)
    
    finally:
        # 断开连接
        await client.disconnect()

长时间运行测试

import asyncio
import time
import psutil
import os
from mcp.client.base import MCPClient

async def monitor_resources(pid, interval=1.0, duration=60.0):
    """监控进程资源使用情况
    
    参数:
        pid: 进程ID
        interval: 监控间隔(秒)
        duration: 监控持续时间(秒)
    
    返回:
        监控结果
    """
    process = psutil.Process(pid)
    start_time = time.time()
    end_time = start_time + duration
    
    # 存储监控数据
    cpu_usage = []
    memory_usage = []
    
    while time.time() < end_time:
        # 获取CPU和内存使用情况
        try:
            cpu_percent = process.cpu_percent()
            memory_info = process.memory_info()
            
            cpu_usage.append(cpu_percent)
            memory_usage.append(memory_info.rss / (1024 * 1024))  # MB
            
            # 等待下一个间隔
            await asyncio.sleep(interval)
        except psutil.NoSuchProcess:
            break
    
    # 计算统计数据
    stats = {
        "duration": time.time() - start_time,
        "cpu_usage": {
            "avg": sum(cpu_usage) / len(cpu_usage) if cpu_usage else 0,
            "max": max(cpu_usage) if cpu_usage else 0,
            "min": min(cpu_usage) if cpu_usage else 0
        },
        "memory_usage": {
            "avg": sum(memory_usage) / len(memory_usage) if memory_usage else 0,
            "max": max(memory_usage) if memory_usage else 0,
            "min": min(memory_usage) if memory_usage else 0
        }
    }
    
    return stats

async def long_running_test(server_process, client, duration=300):
    """执行长时间运行测试
    
    参数:
        server_process: 服务器进程
        client: MCP客户端
        duration: 测试持续时间(秒)
    
    返回:
        测试结果
    """
    # 启动资源监控
    monitor_task = asyncio.create_task(
        monitor_resources(server_process.pid, interval=1.0, duration=duration)
    )
    
    # 持续发送请求
    start_time = time.time()
    end_time = start_time + duration
    
    request_results = []
    
    # 每秒发送一个请求
    while time.time() < end_time:
        try:
            start_request = time.time()
            await client.execute_tool("echo", {"message": "test"})
            request_time = time.time() - start_request
            
            request_results.append({
                "timestamp": time.time(),
                "elapsed": request_time,
                "success": True
            })
        except Exception as e:
            request_results.append({
                "timestamp": time.time(),
                "success": False,
                "error": str(e)
            })
        
        # 等待下一秒
        await asyncio.sleep(1.0)
    
    # 等待监控任务完成
    resource_stats = await monitor_task
    
    # 分析请求结果
    successes = [r for r in request_results if r.get("success")]
    failures = [r for r in request_results if not r.get("success")]
    
    success_times = [r["elapsed"] for r in successes if "elapsed" in r]
    
    request_stats = {
        "total_requests": len(request_results),
        "successful_requests": len(successes),
        "failed_requests": len(failures),
        "avg_response_time": sum(success_times) / len(success_times) if success_times else None
    }
    
    return {
        "duration": duration,
        "request_stats": request_stats,
        "resource_stats": resource_stats
    }

测试最佳实践

  1. 测试数据隔离:确保测试使用隔离的数据集,不影响生产数据。
  2. 测试环境一致性:测试环境应尽可能与生产环境相似。
  3. 自动化测试:将测试集成到CI/CD流程中,自动运行。
  4. 测试覆盖率:定期检查测试覆盖率,确保关键代码路径得到测试。
  5. 故障注入测试:模拟各种故障情况,测试服务的稳定性。

下面是一个使用pytest配置自动化测试的示例:

# conftest.py
import pytest
import os
import asyncio
from mcp.testing import create_test_server, create_test_client

# 测试前设置环境变量
def pytest_configure(config):
    os.environ["TESTING"] = "1"
    os.environ["MCP_ENV"] = "test"
    os.environ["DB_URL"] = "sqlite:///:memory:"

# 创建测试服务器
@pytest.fixture(scope="session")
def event_loop():
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

@pytest.fixture(scope="session")
async def test_server():
    """创建共享测试服务器"""
    server = create_test_server()
    await server.startup()
    yield server
    await server.shutdown()

@pytest.fixture
async def test_client(test_server):
    """创建测试客户端"""
    client = create_test_client(test_server)
    await client.connect()
    yield client
    await client.disconnect()

MCP服务部署策略

部署MCP服务需要考虑可用性、可扩展性和安全性。本节介绍几种常见的部署策略。

容器化部署

使用Docker容器部署MCP服务是一种流行的方法,它提供了一致的运行环境和简单的扩展能力。

创建Dockerfile

# 使用官方Python镜像作为基础镜像
FROM python:3.10-slim

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露MCP服务端口
EXPOSE 8000

# 设置健康检查
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# 启动MCP服务
CMD ["python", "-m", "myapp.main"]

部署配置示例

使用Docker Compose简化多容器部署:

# docker-compose.yml
version: '3.8'

services:
  mcp-service:
    build: .
    ports:
      - "8000:8000"
    environment:
      - MCP_ENV=production
      - DB_URL=postgres://user:password@db:5432/mcpdb
      - REDIS_URL=redis://redis:6379/0
      - LOG_LEVEL=info
    depends_on:
      - db
      - redis
    restart: always
    deploy:
      replicas: 2
      resources:
        limits:
          cpus: '0.5'
          memory: 512M
      update_config:
        parallelism: 1
        delay: 10s
      restart_policy:
        condition: on-failure
        
  db:
    image: postgres:14
    volumes:
      - db-data:/var/lib/postgresql/data
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=mcpdb
    restart: always
    
  redis:
    image: redis:7
    volumes:
      - redis-data:/data
    restart: always

volumes:
  db-data:
  redis-data:

云原生部署

在Kubernetes上部署MCP服务可以提供高可用性和自动扩展能力。

Kubernetes部署配置

# mcp-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-service
  labels:
    app: mcp-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mcp-service
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  template:
    metadata:
      labels:
        app: mcp-service
    spec:
      containers:
      - name: mcp-service
        image: myregistry/mcp-service:latest
        ports:
        - containerPort: 8000
        env:
        - name: MCP_ENV
          value: "production"
        - name: DB_URL
          valueFrom:
            secretKeyRef:
              name: mcp-secrets
              key: db-url
        - name: REDIS_URL
          valueFrom:
            secretKeyRef:
              name: mcp-secrets
              key: redis-url
        resources:
          limits:
            cpu: "500m"
            memory: "512Mi"
          requests:
            cpu: "200m"
            memory: "256Mi"
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 15
          periodSeconds: 20
---
apiVersion: v1
kind: Service
metadata:
  name: mcp-service
spec:
  selector:
    app: mcp-service
  ports:
  - port: 80
    targetPort: 8000
  type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: mcp-service
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-service
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

配置Ingress

# mcp-ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: mcp-ingress
  annotations:
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
    cert-manager.io/cluster-issuer: letsencrypt-prod
spec:
  tls:
  - hosts:
    - mcp.example.com
    secretName: mcp-tls
  rules:
  - host: mcp.example.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: mcp-service
            port:
              number: 80

无服务器部署

对于轻量级MCP服务,无服务器部署提供了按需扩展和降低成本的好处。以下是使用AWS Lambda和API Gateway部署MCP服务的示例:

使用Serverless框架部署

# serverless.yml
service: mcp-serverless

provider:
  name: aws
  runtime: python3.10
  stage: ${opt:stage, 'dev'}
  region: ${opt:region, 'us-east-1'}
  memorySize: 512
  timeout: 10
  environment:
    MCP_ENV: ${self:provider.stage}
    DB_URL: ${ssm:/mcp/${self:provider.stage}/db_url}
    REDIS_URL: ${ssm:/mcp/${self:provider.stage}/redis_url}
  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:Query
        - dynamodb:Scan
        - dynamodb:GetItem
        - dynamodb:PutItem
        - dynamodb:UpdateItem
        - dynamodb:DeleteItem
      Resource: !GetAtt MCPTable.Arn

functions:
  mcp_handler:
    handler: handler.lambda_handler
    events:
      - http:
          path: /
          method: ANY
          cors: true
      - http:
          path: /{proxy+}
          method: ANY
          cors: true
      - websocket:
          route: $connect
      - websocket:
          route: $disconnect
      - websocket:
          route: $default

resources:
  Resources:
    MCPTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: mcp-${self:provider.stage}
        BillingMode: PAY_PER_REQUEST
        AttributeDefinitions:
          - AttributeName: id
            AttributeType: S
        KeySchema:
          - AttributeName: id
            KeyType: HASH

Lambda处理器代码

# handler.py
import json
import asyncio
import os
from mangum import Mangum
from mcp.server.fastmcp import FastMCP
from myapp.server import create_app

# 创建MCP应用程序
app = create_app()

# 创建Lambda处理器
handler = Mangum(app.get_asgi_app())

def lambda_handler(event, context):
    """AWS Lambda处理器"""
    return handler(event, context)

持续集成与持续部署(CI/CD)

使用CI/CD管道自动化MCP服务的测试和部署过程。

GitHub Actions工作流示例

# .github/workflows/main.yml
name: MCP Service CI/CD

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.10'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install pytest pytest-asyncio pytest-cov
        if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
        if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi
    - name: Lint with flake8
      run: |
        pip install flake8
        flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
    - name: Test with pytest
      run: |
        pytest --cov=myapp --cov-report=xml
    - name: Upload coverage to Codecov
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml
        
  build-and-push:
    needs: test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v2
    - name: Login to DockerHub
      uses: docker/login-action@v2
      with:
        username: ${{ secrets.DOCKERHUB_USERNAME }}
        password: ${{ secrets.DOCKERHUB_TOKEN }}
    - name: Build and push
      uses: docker/build-push-action@v4
      with:
        context: .
        push: true
        tags: myregistry/mcp-service:latest,myregistry/mcp-service:${{ github.sha }}
        
  deploy:
    needs: build-and-push
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    - name: Set up kubectl
      uses: azure/setup-kubectl@v3
      with:
        version: 'latest'
    - name: Configure AWS credentials
      uses: aws-actions/configure-aws-credentials@v1
      with:
        aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
        aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
        aws-region: us-east-1
    - name: Update kube config
      run: aws eks update-kubeconfig --name mcp-cluster --region us-east-1
    - name: Deploy to Kubernetes
      run: |
        # 更新部署镜像
        kubectl set image deployment/mcp-service mcp-service=myregistry/mcp-service:${{ github.sha }} --record
        # 等待部署完成
        kubectl rollout status deployment/mcp-service

部署最佳实践

  1. 环境配置分离:使用环境变量或配置文件分离不同环境(开发、测试、生产)的配置。
  2. 零停机部署:使用滚动更新或蓝绿部署确保服务不中断。
  3. 自动扩展:根据负载自动扩展服务实例。
  4. 监控和告警:建立全面的监控和告警系统,及时发现问题。
  5. 回滚策略:准备快速回滚策略,以应对部署失败。

配置管理示例

# config.py
import os
from pathlib import Path
from typing import Dict, Any, Optional
import yaml

class Config:
    """配置管理类"""
    def __init__(self):
        self._config = {}
        self._load_config()
    
    def _load_config(self):
        """加载配置"""
        # 确定环境
        env = os.environ.get("MCP_ENV", "development")
        
        # 基础配置
        base_config = self._load_yaml("config/base.yaml")
        
        # 环境特定配置
        env_config = self._load_yaml(f"config/{env}.yaml")
        
        # 合并配置
        self._config = {**base_config, **(env_config or {})}
        
        # 环境变量覆盖
        for key in self._config:
            env_value = os.environ.get(key.upper())
            if env_value is not None:
                self._config[key] = env_value
    
    def _load_yaml(self, path: str) -> Dict[str, Any]:
        """加载YAML配置文件"""
        config_path = Path(path)
        if not config_path.exists():
            return {}
        
        with open(config_path, "r") as f:
            return yaml.safe_load(f)
    
    def get(self, key: str, default: Optional[Any] = None) -> Any:
        """获取配置值"""
        return self._config.get(key, default)
    
    def __getitem__(self, key: str) -> Any:
        """通过下标访问配置"""
        return self._config[key]

# 单例配置实例
config = Config()

总结

测试和部署是MCP服务开发生命周期中不可或缺的环节。本章介绍了全面的测试策略,包括单元测试、集成测试和性能测试,以及多种部署选项,从容器化到无服务器部署。通过采用这些最佳实践,可以确保MCP服务的质量和可靠性。

在实际项目中,应根据具体需求选择合适的测试和部署策略,并随着服务的发展不断优化这些策略。记住,好的测试和部署流程不仅能提高服务质量,还能使开发团队更加高效。

使用 Hugo 构建
主题 StackJimmy 设计