2.5_服务器生命周期和上下文

2.5 服务器生命周期和上下文

MCP服务器的生命周期和上下文管理是构建高效、可靠服务的关键因素。本章将探讨MCP服务器的启动和关闭流程、上下文对象使用、依赖注入以及状态管理。

服务器启动和关闭流程

MCP服务器的生命周期包括初始化、运行和关闭三个主要阶段。

初始化阶段

from mcp.server.fastmcp import FastMCP

# 创建服务器实例
mcp = FastMCP("示例服务器")

# 配置服务器
mcp.configure(
    debug=True,
    log_level="INFO",
    max_request_size=10 * 1024 * 1024  # 10MB
)

# 注册资源、工具和提示
@mcp.resource("example://resource")
def get_example():
    return "示例资源"

@mcp.tool()
def example_tool():
    return "示例工具结果"

# 服务器启动前的准备
@mcp.on_startup
async def startup_handler():
    print("服务器即将启动...")
    # 执行初始化操作,如:
    # - 连接数据库
    # - 加载配置
    # - 初始化外部服务连接

运行阶段

if __name__ == "__main__":
    # 运行服务器
    mcp.run(
        transport="sse",          # 传输方式:stdio, sse, websocket
        host="localhost",         # 主机名(适用于网络传输)
        port=8000,                # 端口(适用于网络传输)
        cors_origins=["*"],       # CORS配置(适用于网络传输)
        shutdown_timeout=60       # 关闭超时(秒)
    )

关闭阶段

# 服务器关闭时的清理
@mcp.on_shutdown
async def shutdown_handler():
    print("服务器即将关闭...")
    # 执行清理操作,如:
    # - 关闭数据库连接
    # - 释放资源
    # - 保存状态

上下文对象深入理解

上下文对象(Context)在MCP服务器中传递信息和状态,是资源和工具函数的核心参数。

使用上下文对象

from mcp.server.fastmcp import Context, FastMCP

mcp = FastMCP("上下文示例")

@mcp.resource("user://{user_id}")
def get_user_info(context: Context, user_id: str):
    """获取用户信息,使用上下文"""
    # 访问请求信息
    request_id = context.request_id
    client_id = context.client_id
    
    # 记录日志
    context.logger.info(f"请求用户信息:{user_id},请求ID:{request_id}")
    
    # 访问服务器状态
    db = context.app.state.get("database")
    
    # 使用数据库获取用户信息
    user = db.query_user(user_id)
    return user

@mcp.tool()
async def update_user(context: Context, user_id: str, data: dict):
    """更新用户信息,使用上下文"""
    # 获取当前客户端信息
    client = context.client_id
    
    # 检查权限
    if not context.app.has_permission(client, f"user:{user_id}:write"):
        context.logger.warning(f"客户端 {client} 尝试更新用户 {user_id} 但权限不足")
        raise PermissionError("无权更新此用户")
    
    # 执行更新
    db = context.app.state.get("database")
    result = await db.update_user(user_id, data)
    
    # 记录审计日志
    context.app.state.get("audit_log").record(
        action="update_user",
        user_id=user_id,
        client=client,
        data=data
    )
    
    return result

上下文属性

上下文对象包含多种有用的属性:

属性 描述
context.app MCP服务器应用实例
context.request_id 当前请求的唯一标识符
context.client_id 客户端标识符
context.logger 上下文相关的日志记录器
context.scope 请求作用域数据
context.session 会话数据(如果启用会话)

依赖注入和资源共享

MCP支持依赖注入模式,使服务组件可以方便地共享和访问资源。

注册依赖项

from mcp.server.fastmcp import FastMCP, Depends

mcp = FastMCP("依赖注入示例")

# 定义数据库连接
async def get_database():
    """提供数据库连接"""
    # 在实际应用中,可能会创建/获取真实的数据库连接
    db = Database("connection_string")
    try:
        await db.connect()
        yield db
    finally:
        await db.close()

# 定义用户服务
async def get_user_service(db = Depends(get_database)):
    """提供用户服务,依赖于数据库连接"""
    return UserService(db)

# 在工具中使用依赖
@mcp.tool()
async def get_user_profile(
    user_id: str,
    user_service = Depends(get_user_service)
):
    """获取用户资料,自动注入用户服务"""
    return await user_service.get_profile(user_id)

# 在资源中使用依赖
@mcp.resource("stats://{metric}")
async def get_statistics(
    metric: str,
    db = Depends(get_database)
):
    """获取统计数据,自动注入数据库连接"""
    return await db.query_stats(metric)

作用域和生命周期

依赖项可以有不同的作用域,控制它们的生命周期:

from enum import Enum
from mcp.server.fastmcp import FastMCP, Depends

class Scope(Enum):
    REQUEST = "request"    # 每个请求创建一次
    SESSION = "session"    # 每个会话创建一次
    APP = "app"            # 应用启动时创建一次

mcp = FastMCP("作用域示例")

# 应用级依赖
@mcp.dependency(scope=Scope.APP)
async def get_config():
    """加载配置,应用启动时执行一次"""
    config = await load_config_from_file()
    return config

# 会话级依赖
@mcp.dependency(scope=Scope.SESSION)
async def get_session_cache(config = Depends(get_config)):
    """创建会话缓存,每个会话创建一次"""
    return SessionCache(config.cache_size)

# 请求级依赖
@mcp.dependency(scope=Scope.REQUEST)
async def get_request_context(
    context: Context,
    cache = Depends(get_session_cache)
):
    """创建请求上下文,每个请求创建一次"""
    return RequestContext(context.request_id, cache)

处理状态和会话数据

MCP服务器可以管理不同级别的状态数据。

应用级状态

from mcp.server.fastmcp import FastMCP

mcp = FastMCP("状态示例")

# 初始化应用状态
@mcp.on_startup
async def initialize_state():
    # 创建应用级共享状态
    mcp.state.counter = 0
    mcp.state.users = {}
    mcp.state.active_sessions = set()

@mcp.tool()
async def increment_counter(context: Context):
    """增加计数器"""
    # 访问和修改应用级状态
    context.app.state.counter += 1
    return context.app.state.counter

会话级状态

from mcp.server.fastmcp import FastMCP, Context

mcp = FastMCP("会话状态示例")
mcp.enable_sessions()  # 启用会话支持

@mcp.tool()
async def set_preference(context: Context, key: str, value: str):
    """设置用户偏好"""
    # 确保会话有一个preferences字典
    if "preferences" not in context.session:
        context.session["preferences"] = {}
    
    # 更新偏好
    context.session["preferences"][key] = value
    return {"success": True, "message": f"已设置偏好 {key}={value}"}

@mcp.resource("preferences://{key}")
async def get_preference(context: Context, key: str):
    """获取用户偏好"""
    # 从会话中读取偏好
    preferences = context.session.get("preferences", {})
    if key in preferences:
        return preferences[key]
    else:
        return f"未找到偏好: {key}"

请求级状态

from mcp.server.fastmcp import FastMCP, Context

mcp = FastMCP("请求状态示例")

@mcp.middleware("request")
async def track_request_time(context: Context, call_next):
    """跟踪请求执行时间的中间件"""
    import time
    
    # 记录开始时间
    start_time = time.time()
    
    # 在请求作用域中存储数据
    context.scope["request_start"] = start_time
    
    # 执行请求
    response = await call_next(context)
    
    # 计算执行时间
    execution_time = time.time() - start_time
    context.logger.info(f"请求 {context.request_id} 执行时间: {execution_time:.4f}秒")
    
    return response

@mcp.tool()
async def get_execution_info(context: Context):
    """获取当前请求的执行信息"""
    start_time = context.scope.get("request_start")
    if start_time:
        import time
        current_time = time.time()
        elapsed = current_time - start_time
        return {
            "request_id": context.request_id,
            "elapsed_seconds": elapsed,
            "started_at": start_time
        }
    else:
        return {"error": "找不到请求开始时间"}

进阶示例:带数据库连接的应用服务器

下面是一个综合示例,展示如何创建一个使用数据库连接的MCP服务器:

import asyncio
import aiosqlite
from datetime import datetime
from typing import Dict, List, Optional
from mcp.server.fastmcp import FastMCP, Context, Depends

# 创建服务器
mcp = FastMCP("数据库示例服务器")

# 启用会话支持
mcp.enable_sessions()

# 数据库依赖
async def get_db():
    """提供数据库连接"""
    # 连接到SQLite数据库
    db = await aiosqlite.connect("./data.db")
    # 启用行工厂,返回字典而非元组
    db.row_factory = aiosqlite.Row
    
    try:
        yield db
    finally:
        await db.close()

# 初始化数据库
@mcp.on_startup
async def initialize_database():
    async with aiosqlite.connect("./data.db") as db:
        # 创建表
        await db.execute("""
        CREATE TABLE IF NOT EXISTS notes (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            user_id TEXT NOT NULL,
            title TEXT NOT NULL,
            content TEXT NOT NULL,
            created_at TEXT NOT NULL,
            updated_at TEXT NOT NULL
        )
        """)
        await db.commit()

# 工具:创建笔记
@mcp.tool()
async def create_note(
    context: Context,
    title: str,
    content: str,
    db = Depends(get_db)
):
    """创建新笔记
    
    参数:
        title: 笔记标题
        content: 笔记内容
    
    返回:
        创建的笔记信息
    """
    # 从会话中获取用户ID,或使用默认值
    user_id = context.session.get("user_id", "anonymous")
    
    # 获取当前时间
    now = datetime.now().isoformat()
    
    # 插入笔记
    cursor = await db.execute(
        """
        INSERT INTO notes (user_id, title, content, created_at, updated_at)
        VALUES (?, ?, ?, ?, ?)
        """,
        (user_id, title, content, now, now)
    )
    
    # 获取新笔记ID
    note_id = cursor.lastrowid
    await db.commit()
    
    return {
        "id": note_id,
        "user_id": user_id,
        "title": title,
        "content": content,
        "created_at": now,
        "updated_at": now
    }

# 工具:更新笔记
@mcp.tool()
async def update_note(
    context: Context,
    note_id: int,
    title: Optional[str] = None,
    content: Optional[str] = None,
    db = Depends(get_db)
):
    """更新笔记
    
    参数:
        note_id: 笔记ID
        title: 新标题(可选)
        content: 新内容(可选)
    
    返回:
        更新结果
    """
    # 从会话中获取用户ID
    user_id = context.session.get("user_id", "anonymous")
    
    # 获取当前笔记
    async with db.execute(
        "SELECT * FROM notes WHERE id = ? AND user_id = ?",
        (note_id, user_id)
    ) as cursor:
        note = await cursor.fetchone()
    
    if not note:
        return {"error": "笔记不存在或无权访问"}
    
    # 准备更新内容
    updates = []
    params = []
    
    if title is not None:
        updates.append("title = ?")
        params.append(title)
    
    if content is not None:
        updates.append("content = ?")
        params.append(content)
    
    if not updates:
        return {"error": "没有提供更新内容"}
    
    # 添加更新时间
    updates.append("updated_at = ?")
    now = datetime.now().isoformat()
    params.append(now)
    
    # 添加查询条件参数
    params.extend([note_id, user_id])
    
    # 执行更新
    await db.execute(
        f"""
        UPDATE notes
        SET {', '.join(updates)}
        WHERE id = ? AND user_id = ?
        """,
        params
    )
    await db.commit()
    
    return {"success": True, "note_id": note_id, "updated_at": now}

# 资源:获取笔记列表
@mcp.resource("notes://list")
async def get_notes(context: Context, db = Depends(get_db)):
    """获取当前用户的所有笔记"""
    user_id = context.session.get("user_id", "anonymous")
    
    async with db.execute(
        """
        SELECT id, title, created_at, updated_at
        FROM notes
        WHERE user_id = ?
        ORDER BY updated_at DESC
        """,
        (user_id,)
    ) as cursor:
        # 获取所有笔记
        notes = await cursor.fetchall()
    
    # 转换为列表
    result = [dict(note) for note in notes]
    
    return result

# 资源:获取单个笔记
@mcp.resource("notes://{note_id}")
async def get_note(context: Context, note_id: int, db = Depends(get_db)):
    """获取指定笔记的详细信息"""
    user_id = context.session.get("user_id", "anonymous")
    
    async with db.execute(
        """
        SELECT *
        FROM notes
        WHERE id = ? AND user_id = ?
        """,
        (note_id, user_id)
    ) as cursor:
        note = await cursor.fetchone()
    
    if not note:
        return {"error": "笔记不存在或无权访问"}
    
    return dict(note)

# 工具:设置用户ID(简化的"登录")
@mcp.tool()
async def set_user_id(context: Context, user_id: str):
    """设置当前会话的用户ID
    
    参数:
        user_id: 要设置的用户ID
    
    返回:
        设置结果
    """
    # 存储用户ID到会话
    context.session["user_id"] = user_id
    context.logger.info(f"设置会话用户ID: {user_id}")
    
    return {"success": True, "user_id": user_id}

# 工具:删除笔记
@mcp.tool()
async def delete_note(context: Context, note_id: int, db = Depends(get_db)):
    """删除笔记
    
    参数:
        note_id: 要删除的笔记ID
    
    返回:
        删除结果
    """
    user_id = context.session.get("user_id", "anonymous")
    
    # 检查笔记是否存在
    async with db.execute(
        "SELECT id FROM notes WHERE id = ? AND user_id = ?",
        (note_id, user_id)
    ) as cursor:
        note = await cursor.fetchone()
    
    if not note:
        return {"error": "笔记不存在或无权访问"}
    
    # 执行删除
    await db.execute(
        "DELETE FROM notes WHERE id = ? AND user_id = ?",
        (note_id, user_id)
    )
    await db.commit()
    
    return {"success": True, "deleted_note_id": note_id}

# 中间件:请求日志记录
@mcp.middleware("request")
async def log_requests(context: Context, call_next):
    """记录所有请求"""
    start_time = asyncio.get_event_loop().time()
    
    # 记录请求开始
    context.logger.info(f"开始处理请求 {context.request_id}")
    
    # 处理请求
    response = await call_next(context)
    
    # 计算处理时间
    duration = asyncio.get_event_loop().time() - start_time
    context.logger.info(f"请求 {context.request_id} 处理完成,耗时 {duration:.4f}秒")
    
    return response

# 运行服务器
if __name__ == "__main__":
    mcp.run(transport="sse", host="localhost", port=8000)

小结

在本章中,我们探讨了MCP服务器的生命周期和上下文管理:

  • 服务器的初始化、运行和关闭流程
  • 上下文对象的使用和属性
  • 依赖注入和资源共享
  • 应用级、会话级和请求级状态管理
  • 通过带数据库连接的应用服务器示例,展示了如何构建一个完整的服务

掌握这些概念将帮助您创建结构良好、高效运行的MCP服务器。在进阶篇的后续章节中,我们将探讨更复杂的主题,包括定制传输层、高性能服务等。

使用 Hugo 构建
主题 StackJimmy 设计