2.5 服务器生命周期和上下文
MCP服务器的生命周期和上下文管理是构建高效、可靠服务的关键因素。本章将探讨MCP服务器的启动和关闭流程、上下文对象使用、依赖注入以及状态管理。
服务器启动和关闭流程
MCP服务器的生命周期包括初始化、运行和关闭三个主要阶段。
初始化阶段
from mcp.server.fastmcp import FastMCP
# 创建服务器实例
mcp = FastMCP("示例服务器")
# 配置服务器
mcp.configure(
debug=True,
log_level="INFO",
max_request_size=10 * 1024 * 1024 # 10MB
)
# 注册资源、工具和提示
@mcp.resource("example://resource")
def get_example():
return "示例资源"
@mcp.tool()
def example_tool():
return "示例工具结果"
# 服务器启动前的准备
@mcp.on_startup
async def startup_handler():
print("服务器即将启动...")
# 执行初始化操作,如:
# - 连接数据库
# - 加载配置
# - 初始化外部服务连接
运行阶段
if __name__ == "__main__":
# 运行服务器
mcp.run(
transport="sse", # 传输方式:stdio, sse, websocket
host="localhost", # 主机名(适用于网络传输)
port=8000, # 端口(适用于网络传输)
cors_origins=["*"], # CORS配置(适用于网络传输)
shutdown_timeout=60 # 关闭超时(秒)
)
关闭阶段
# 服务器关闭时的清理
@mcp.on_shutdown
async def shutdown_handler():
print("服务器即将关闭...")
# 执行清理操作,如:
# - 关闭数据库连接
# - 释放资源
# - 保存状态
上下文对象深入理解
上下文对象(Context)在MCP服务器中传递信息和状态,是资源和工具函数的核心参数。
使用上下文对象
from mcp.server.fastmcp import Context, FastMCP
mcp = FastMCP("上下文示例")
@mcp.resource("user://{user_id}")
def get_user_info(context: Context, user_id: str):
"""获取用户信息,使用上下文"""
# 访问请求信息
request_id = context.request_id
client_id = context.client_id
# 记录日志
context.logger.info(f"请求用户信息:{user_id},请求ID:{request_id}")
# 访问服务器状态
db = context.app.state.get("database")
# 使用数据库获取用户信息
user = db.query_user(user_id)
return user
@mcp.tool()
async def update_user(context: Context, user_id: str, data: dict):
"""更新用户信息,使用上下文"""
# 获取当前客户端信息
client = context.client_id
# 检查权限
if not context.app.has_permission(client, f"user:{user_id}:write"):
context.logger.warning(f"客户端 {client} 尝试更新用户 {user_id} 但权限不足")
raise PermissionError("无权更新此用户")
# 执行更新
db = context.app.state.get("database")
result = await db.update_user(user_id, data)
# 记录审计日志
context.app.state.get("audit_log").record(
action="update_user",
user_id=user_id,
client=client,
data=data
)
return result
上下文属性
上下文对象包含多种有用的属性:
| 属性 | 描述 |
|---|---|
context.app |
MCP服务器应用实例 |
context.request_id |
当前请求的唯一标识符 |
context.client_id |
客户端标识符 |
context.logger |
上下文相关的日志记录器 |
context.scope |
请求作用域数据 |
context.session |
会话数据(如果启用会话) |
依赖注入和资源共享
MCP支持依赖注入模式,使服务组件可以方便地共享和访问资源。
注册依赖项
from mcp.server.fastmcp import FastMCP, Depends
mcp = FastMCP("依赖注入示例")
# 定义数据库连接
async def get_database():
"""提供数据库连接"""
# 在实际应用中,可能会创建/获取真实的数据库连接
db = Database("connection_string")
try:
await db.connect()
yield db
finally:
await db.close()
# 定义用户服务
async def get_user_service(db = Depends(get_database)):
"""提供用户服务,依赖于数据库连接"""
return UserService(db)
# 在工具中使用依赖
@mcp.tool()
async def get_user_profile(
user_id: str,
user_service = Depends(get_user_service)
):
"""获取用户资料,自动注入用户服务"""
return await user_service.get_profile(user_id)
# 在资源中使用依赖
@mcp.resource("stats://{metric}")
async def get_statistics(
metric: str,
db = Depends(get_database)
):
"""获取统计数据,自动注入数据库连接"""
return await db.query_stats(metric)
作用域和生命周期
依赖项可以有不同的作用域,控制它们的生命周期:
from enum import Enum
from mcp.server.fastmcp import FastMCP, Depends
class Scope(Enum):
REQUEST = "request" # 每个请求创建一次
SESSION = "session" # 每个会话创建一次
APP = "app" # 应用启动时创建一次
mcp = FastMCP("作用域示例")
# 应用级依赖
@mcp.dependency(scope=Scope.APP)
async def get_config():
"""加载配置,应用启动时执行一次"""
config = await load_config_from_file()
return config
# 会话级依赖
@mcp.dependency(scope=Scope.SESSION)
async def get_session_cache(config = Depends(get_config)):
"""创建会话缓存,每个会话创建一次"""
return SessionCache(config.cache_size)
# 请求级依赖
@mcp.dependency(scope=Scope.REQUEST)
async def get_request_context(
context: Context,
cache = Depends(get_session_cache)
):
"""创建请求上下文,每个请求创建一次"""
return RequestContext(context.request_id, cache)
处理状态和会话数据
MCP服务器可以管理不同级别的状态数据。
应用级状态
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("状态示例")
# 初始化应用状态
@mcp.on_startup
async def initialize_state():
# 创建应用级共享状态
mcp.state.counter = 0
mcp.state.users = {}
mcp.state.active_sessions = set()
@mcp.tool()
async def increment_counter(context: Context):
"""增加计数器"""
# 访问和修改应用级状态
context.app.state.counter += 1
return context.app.state.counter
会话级状态
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("会话状态示例")
mcp.enable_sessions() # 启用会话支持
@mcp.tool()
async def set_preference(context: Context, key: str, value: str):
"""设置用户偏好"""
# 确保会话有一个preferences字典
if "preferences" not in context.session:
context.session["preferences"] = {}
# 更新偏好
context.session["preferences"][key] = value
return {"success": True, "message": f"已设置偏好 {key}={value}"}
@mcp.resource("preferences://{key}")
async def get_preference(context: Context, key: str):
"""获取用户偏好"""
# 从会话中读取偏好
preferences = context.session.get("preferences", {})
if key in preferences:
return preferences[key]
else:
return f"未找到偏好: {key}"
请求级状态
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("请求状态示例")
@mcp.middleware("request")
async def track_request_time(context: Context, call_next):
"""跟踪请求执行时间的中间件"""
import time
# 记录开始时间
start_time = time.time()
# 在请求作用域中存储数据
context.scope["request_start"] = start_time
# 执行请求
response = await call_next(context)
# 计算执行时间
execution_time = time.time() - start_time
context.logger.info(f"请求 {context.request_id} 执行时间: {execution_time:.4f}秒")
return response
@mcp.tool()
async def get_execution_info(context: Context):
"""获取当前请求的执行信息"""
start_time = context.scope.get("request_start")
if start_time:
import time
current_time = time.time()
elapsed = current_time - start_time
return {
"request_id": context.request_id,
"elapsed_seconds": elapsed,
"started_at": start_time
}
else:
return {"error": "找不到请求开始时间"}
进阶示例:带数据库连接的应用服务器
下面是一个综合示例,展示如何创建一个使用数据库连接的MCP服务器:
import asyncio
import aiosqlite
from datetime import datetime
from typing import Dict, List, Optional
from mcp.server.fastmcp import FastMCP, Context, Depends
# 创建服务器
mcp = FastMCP("数据库示例服务器")
# 启用会话支持
mcp.enable_sessions()
# 数据库依赖
async def get_db():
"""提供数据库连接"""
# 连接到SQLite数据库
db = await aiosqlite.connect("./data.db")
# 启用行工厂,返回字典而非元组
db.row_factory = aiosqlite.Row
try:
yield db
finally:
await db.close()
# 初始化数据库
@mcp.on_startup
async def initialize_database():
async with aiosqlite.connect("./data.db") as db:
# 创建表
await db.execute("""
CREATE TABLE IF NOT EXISTS notes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
await db.commit()
# 工具:创建笔记
@mcp.tool()
async def create_note(
context: Context,
title: str,
content: str,
db = Depends(get_db)
):
"""创建新笔记
参数:
title: 笔记标题
content: 笔记内容
返回:
创建的笔记信息
"""
# 从会话中获取用户ID,或使用默认值
user_id = context.session.get("user_id", "anonymous")
# 获取当前时间
now = datetime.now().isoformat()
# 插入笔记
cursor = await db.execute(
"""
INSERT INTO notes (user_id, title, content, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
""",
(user_id, title, content, now, now)
)
# 获取新笔记ID
note_id = cursor.lastrowid
await db.commit()
return {
"id": note_id,
"user_id": user_id,
"title": title,
"content": content,
"created_at": now,
"updated_at": now
}
# 工具:更新笔记
@mcp.tool()
async def update_note(
context: Context,
note_id: int,
title: Optional[str] = None,
content: Optional[str] = None,
db = Depends(get_db)
):
"""更新笔记
参数:
note_id: 笔记ID
title: 新标题(可选)
content: 新内容(可选)
返回:
更新结果
"""
# 从会话中获取用户ID
user_id = context.session.get("user_id", "anonymous")
# 获取当前笔记
async with db.execute(
"SELECT * FROM notes WHERE id = ? AND user_id = ?",
(note_id, user_id)
) as cursor:
note = await cursor.fetchone()
if not note:
return {"error": "笔记不存在或无权访问"}
# 准备更新内容
updates = []
params = []
if title is not None:
updates.append("title = ?")
params.append(title)
if content is not None:
updates.append("content = ?")
params.append(content)
if not updates:
return {"error": "没有提供更新内容"}
# 添加更新时间
updates.append("updated_at = ?")
now = datetime.now().isoformat()
params.append(now)
# 添加查询条件参数
params.extend([note_id, user_id])
# 执行更新
await db.execute(
f"""
UPDATE notes
SET {', '.join(updates)}
WHERE id = ? AND user_id = ?
""",
params
)
await db.commit()
return {"success": True, "note_id": note_id, "updated_at": now}
# 资源:获取笔记列表
@mcp.resource("notes://list")
async def get_notes(context: Context, db = Depends(get_db)):
"""获取当前用户的所有笔记"""
user_id = context.session.get("user_id", "anonymous")
async with db.execute(
"""
SELECT id, title, created_at, updated_at
FROM notes
WHERE user_id = ?
ORDER BY updated_at DESC
""",
(user_id,)
) as cursor:
# 获取所有笔记
notes = await cursor.fetchall()
# 转换为列表
result = [dict(note) for note in notes]
return result
# 资源:获取单个笔记
@mcp.resource("notes://{note_id}")
async def get_note(context: Context, note_id: int, db = Depends(get_db)):
"""获取指定笔记的详细信息"""
user_id = context.session.get("user_id", "anonymous")
async with db.execute(
"""
SELECT *
FROM notes
WHERE id = ? AND user_id = ?
""",
(note_id, user_id)
) as cursor:
note = await cursor.fetchone()
if not note:
return {"error": "笔记不存在或无权访问"}
return dict(note)
# 工具:设置用户ID(简化的"登录")
@mcp.tool()
async def set_user_id(context: Context, user_id: str):
"""设置当前会话的用户ID
参数:
user_id: 要设置的用户ID
返回:
设置结果
"""
# 存储用户ID到会话
context.session["user_id"] = user_id
context.logger.info(f"设置会话用户ID: {user_id}")
return {"success": True, "user_id": user_id}
# 工具:删除笔记
@mcp.tool()
async def delete_note(context: Context, note_id: int, db = Depends(get_db)):
"""删除笔记
参数:
note_id: 要删除的笔记ID
返回:
删除结果
"""
user_id = context.session.get("user_id", "anonymous")
# 检查笔记是否存在
async with db.execute(
"SELECT id FROM notes WHERE id = ? AND user_id = ?",
(note_id, user_id)
) as cursor:
note = await cursor.fetchone()
if not note:
return {"error": "笔记不存在或无权访问"}
# 执行删除
await db.execute(
"DELETE FROM notes WHERE id = ? AND user_id = ?",
(note_id, user_id)
)
await db.commit()
return {"success": True, "deleted_note_id": note_id}
# 中间件:请求日志记录
@mcp.middleware("request")
async def log_requests(context: Context, call_next):
"""记录所有请求"""
start_time = asyncio.get_event_loop().time()
# 记录请求开始
context.logger.info(f"开始处理请求 {context.request_id}")
# 处理请求
response = await call_next(context)
# 计算处理时间
duration = asyncio.get_event_loop().time() - start_time
context.logger.info(f"请求 {context.request_id} 处理完成,耗时 {duration:.4f}秒")
return response
# 运行服务器
if __name__ == "__main__":
mcp.run(transport="sse", host="localhost", port=8000)
小结
在本章中,我们探讨了MCP服务器的生命周期和上下文管理:
- 服务器的初始化、运行和关闭流程
- 上下文对象的使用和属性
- 依赖注入和资源共享
- 应用级、会话级和请求级状态管理
- 通过带数据库连接的应用服务器示例,展示了如何构建一个完整的服务
掌握这些概念将帮助您创建结构良好、高效运行的MCP服务器。在进阶篇的后续章节中,我们将探讨更复杂的主题,包括定制传输层、高性能服务等。