3.3 MCP与现有系统集成
在实际应用中,MCP通常需要与现有系统集成,如Web框架、数据库、存储系统等。本章将探讨如何将MCP服务无缝集成到现有技术栈中。
与Web框架集成
MCP服务可以与各种流行的Python Web框架集成,使其成为更大应用的一部分。
与FastAPI集成
FastAPI是一个现代、高性能的Web框架,与MCP的异步设计理念相符。以下是将MCP集成到FastAPI应用的示例:
import asyncio
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from mcp.server.fastmcp import FastMCP
# 创建FastAPI应用
app = FastAPI(title="MCP与FastAPI集成示例")
# 创建MCP服务器
mcp = FastMCP("MCP服务")
# 注册MCP资源和工具
@mcp.resource("example://hello")
async def get_hello():
return {"message": "Hello from MCP!"}
@mcp.tool()
async def echo(message: str):
"""回显消息"""
return {"echo": message}
# 请求模型
class ResourceRequest(BaseModel):
uri: str
params: dict = {}
class ToolRequest(BaseModel):
tool_name: str
arguments: dict = {}
# FastAPI端点:获取资源
@app.post("/api/mcp/resource")
async def get_resource(request: ResourceRequest):
try:
result = await mcp.handle_resource_request(request.uri, request.params)
return JSONResponse(result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# FastAPI端点:执行工具(支持流式响应)
@app.post("/api/mcp/tool")
async def execute_tool(request: ToolRequest):
async def stream_response():
try:
async for result in mcp.handle_tool_request(
request.tool_name, request.arguments
):
# 将每个结果转换为JSON行
yield f"{result}\n"
except Exception as e:
yield f"{{'error': '{str(e)}'}}\n"
return StreamingResponse(
stream_response(),
media_type="application/x-ndjson"
)
# 健康检查端点
@app.get("/health")
async def health_check():
return {"status": "healthy", "mcp_version": mcp.version}
# MCP工具列表端点
@app.get("/api/mcp/tools")
async def list_tools():
tools = mcp.list_tools()
return {"tools": tools}
# 主应用启动事件
@app.on_event("startup")
async def startup():
# 启动MCP服务器
await mcp.startup()
# 主应用关闭事件
@app.on_event("shutdown")
async def shutdown():
# 关闭MCP服务器
await mcp.shutdown()
# 启动应用
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
与Flask集成
对于偏好Flask的开发者,可以使用以下模式集成MCP:
import asyncio
import json
from queue import Queue
from threading import Thread
from flask import Flask, request, jsonify, Response
from mcp.server.fastmcp import FastMCP
# 创建Flask应用
app = Flask(__name__)
# 创建MCP服务器
mcp = FastMCP("MCP服务")
# 注册MCP资源和工具
@mcp.resource("example://hello")
async def get_hello():
return {"message": "Hello from MCP!"}
@mcp.tool()
async def echo(message: str):
"""回显消息"""
return {"echo": message}
# 运行异步函数的辅助函数
def run_async(coro):
"""在单独的线程中运行协程并返回结果"""
result_queue = Queue()
def run_in_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(coro)
result_queue.put({"success": True, "result": result})
except Exception as e:
result_queue.put({"success": False, "error": str(e)})
finally:
loop.close()
thread = Thread(target=run_in_thread)
thread.start()
thread.join()
result = result_queue.get()
if result["success"]:
return result["result"]
else:
raise Exception(result["error"])
# 启动MCP服务器
@app.before_first_request
def before_first_request():
run_async(mcp.startup())
# 关闭MCP服务器
@app.teardown_appcontext
def teardown_appcontext(exception):
run_async(mcp.shutdown())
# Flask端点:获取资源
@app.route("/api/mcp/resource", methods=["POST"])
def get_resource():
data = request.json
uri = data.get("uri")
params = data.get("params", {})
try:
result = run_async(mcp.handle_resource_request(uri, params))
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
# Flask端点:执行工具
@app.route("/api/mcp/tool", methods=["POST"])
def execute_tool():
data = request.json
tool_name = data.get("tool_name")
arguments = data.get("arguments", {})
def generate():
async def process():
try:
async for result in mcp.handle_tool_request(tool_name, arguments):
yield f"{json.dumps(result)}\n"
except Exception as e:
yield f"{json.dumps({'error': str(e)})}\n"
# 将异步生成器转换为同步生成器
coro = process()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
while True:
try:
chunk = loop.run_until_complete(coro.__anext__())
yield chunk
except StopAsyncIteration:
break
finally:
loop.close()
return Response(
generate(),
mimetype="application/x-ndjson"
)
# 健康检查端点
@app.route("/health", methods=["GET"])
def health_check():
return jsonify({"status": "healthy", "mcp_version": mcp.version})
# 启动应用
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, threaded=True)
与Django集成
对于Django项目,可以通过以下方式集成MCP:
# myapp/mcp_service.py
from mcp.server.fastmcp import FastMCP
# 创建MCP服务器单例
mcp = FastMCP("Django MCP服务")
# 注册资源和工具
@mcp.resource("example://hello")
async def get_hello():
return {"message": "Hello from Django MCP!"}
@mcp.tool()
async def echo(message: str):
"""回显消息"""
return {"echo": message}
# 初始化和关闭函数
async def startup():
await mcp.startup()
async def shutdown():
await mcp.shutdown()
# myapp/apps.py
from django.apps import AppConfig
import asyncio
import threading
class MyAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'myapp'
def ready(self):
# 避免在开发环境中多次启动
import os
if os.environ.get('RUN_MAIN', None) != 'true':
return
# 启动MCP服务器
from .mcp_service import startup
def start_mcp():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(startup())
threading.Thread(target=start_mcp, daemon=True).start()
# myapp/views.py
import asyncio
import json
from django.http import JsonResponse, StreamingHttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from asgiref.sync import async_to_sync
from .mcp_service import mcp
# Django视图:获取资源
@csrf_exempt
@require_http_methods(["POST"])
def get_resource(request):
data = json.loads(request.body)
uri = data.get("uri")
params = data.get("params", {})
try:
result = async_to_sync(mcp.handle_resource_request)(uri, params)
return JsonResponse(result)
except Exception as e:
return JsonResponse({"error": str(e)}, status=500)
# Django视图:执行工具
@csrf_exempt
@require_http_methods(["POST"])
def execute_tool(request):
data = json.loads(request.body)
tool_name = data.get("tool_name")
arguments = data.get("arguments", {})
async def process():
try:
async for result in mcp.handle_tool_request(tool_name, arguments):
yield f"{json.dumps(result)}\n".encode('utf-8')
except Exception as e:
yield f"{json.dumps({'error': str(e)})}\n".encode('utf-8')
# 使用ASGI流式响应
return StreamingHttpResponse(
async_to_sync(lambda: process())(),
content_type="application/x-ndjson"
)
# myapp/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('api/mcp/resource', views.get_resource, name='mcp_resource'),
path('api/mcp/tool', views.execute_tool, name='mcp_tool'),
]
使用ASGI服务器直接集成
对于ASGI服务器(如Uvicorn、Hypercorn等),可以直接将MCP服务作为ASGI应用提供:
from mcp.server.fastmcp import FastMCP
# 创建MCP服务器
mcp = FastMCP("ASGI MCP服务")
# 注册资源和工具
@mcp.resource("example://hello")
async def get_hello():
return {"message": "Hello from ASGI MCP!"}
@mcp.tool()
async def echo(message: str):
"""回显消息"""
return {"echo": message}
# 获取ASGI应用
app = mcp.get_asgi_app()
# 使用ASGI服务器运行
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
MCP服务与微服务架构
在微服务架构中,MCP服务可以作为独立的服务组件:
┌─────────────┐
│ API网关 │
└─────┬───────┘
│
┌───────────────┼───────────────┐
│ │ │
┌───────▼─────┐ ┌──────▼──────┐ ┌─────▼───────┐
│ 用户服务 │ │ MCP服务 │ │ 其他微服务 │
└─────────────┘ └─────────────┘ └─────────────┘
│
┌──────┴──────┐
│ 数据库/存储 │
└─────────────┘
使用API网关路由到MCP服务
示例:使用NGINX作为API网关配置:
# nginx.conf
http {
upstream mcp_service {
server mcp-service:8000;
}
upstream user_service {
server user-service:8001;
}
server {
listen 80;
# MCP服务路由
location /api/mcp/ {
proxy_pass http://mcp_service/;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
# 用户服务路由
location /api/users/ {
proxy_pass http://user_service/;
}
}
}
为微服务架构设计MCP服务发现
在微服务环境中,可以实现MCP服务发现机制:
import json
import aiohttp
from mcp.client.base import MCPClient
class ServiceRegistry:
"""MCP服务注册表"""
def __init__(self, registry_url):
self.registry_url = registry_url
self.services = {}
async def register_service(self, service_id, service_info):
"""注册MCP服务"""
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.registry_url}/services",
json={
"id": service_id,
"info": service_info
}
)
async def discover_service(self, service_id):
"""发现MCP服务"""
if service_id in self.services:
return self.services[service_id]
async with aiohttp.ClientSession() as session:
response = await session.get(
f"{self.registry_url}/services/{service_id}"
)
service_info = await response.json()
self.services[service_id] = service_info
return service_info
async def get_client(self, service_id):
"""获取MCP客户端"""
service_info = await self.discover_service(service_id)
# 创建客户端
client = MCPClient(
transport_type=service_info["transport_type"],
host=service_info["host"],
port=service_info["port"]
)
# 连接到服务
await client.connect()
return client
与数据库系统集成
MCP服务经常需要与数据库交互。下面我们将探讨如何将MCP服务与各种数据库系统集成。
与关系型数据库集成
使用SQLAlchemy集成PostgreSQL
SQLAlchemy是Python中流行的ORM库,可以与MCP服务无缝集成:
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, DateTime, func
from mcp.server.fastmcp import FastMCP, Context
# 创建SQLAlchemy模型
Base = declarative_base()
class Document(Base):
__tablename__ = "documents"
id = Column(Integer, primary_key=True)
title = Column(String(255), nullable=False)
content = Column(Text, nullable=False)
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, onupdate=func.now())
# 创建MCP服务器
mcp = FastMCP("数据库集成示例")
# 启动事件 - 数据库连接初始化
@mcp.on_startup
async def initialize_db():
# 创建异步数据库引擎
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/dbname",
echo=True
)
# 创建会话工厂
async_session = async_sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
# 存储在服务器状态中
mcp.state.db_engine = engine
mcp.state.async_session = async_session
# 创建表(如果不存在)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 关闭事件 - 清理数据库连接
@mcp.on_shutdown
async def cleanup_db():
if hasattr(mcp.state, "db_engine"):
await mcp.state.db_engine.dispose()
# 数据库会话上下文管理器
async def get_db_session():
"""获取数据库会话"""
if not hasattr(mcp.state, "async_session"):
raise RuntimeError("数据库未初始化")
async_session = mcp.state.async_session
async with async_session() as session:
yield session
# MCP工具 - 创建文档
@mcp.tool()
async def create_document(title: str, content: str):
"""创建新文档
参数:
title: 文档标题
content: 文档内容
返回:
创建的文档ID
"""
# 获取数据库会话
session_gen = get_db_session()
session = await session_gen.__anext__()
try:
# 创建新文档
document = Document(title=title, content=content)
session.add(document)
await session.commit()
await session.refresh(document)
return {"id": document.id, "title": document.title}
finally:
# 确保关闭会话
try:
await session_gen.aclose()
except:
pass
# MCP资源 - 获取文档
@mcp.resource("documents://{document_id}")
async def get_document(context: Context, document_id: int):
"""获取文档详情
参数:
document_id: 文档ID
返回:
文档详情
"""
# 获取数据库会话
session_gen = get_db_session()
session = await session_gen.__anext__()
try:
# 查询文档
from sqlalchemy import select
stmt = select(Document).where(Document.id == document_id)
result = await session.execute(stmt)
document = result.scalar_one_or_none()
if document is None:
return {"error": "文档不存在"}, 404
# 返回文档数据
return {
"id": document.id,
"title": document.title,
"content": document.content,
"created_at": document.created_at.isoformat(),
"updated_at": document.updated_at.isoformat() if document.updated_at else None
}
finally:
# 确保关闭会话
try:
await session_gen.aclose()
except:
pass
与NoSQL数据库集成
MongoDB集成示例
import motor.motor_asyncio
from bson.objectid import ObjectId
from mcp.server.fastmcp import FastMCP
# 创建MCP服务器
mcp = FastMCP("MongoDB集成示例")
# 启动事件 - 数据库连接初始化
@mcp.on_startup
async def initialize_mongo():
# 创建MongoDB客户端
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")
# 获取数据库和集合
db = client.mcp_database
mcp.state.mongo_client = client
mcp.state.db = db
# 关闭事件 - 清理连接
@mcp.on_shutdown
async def cleanup_mongo():
if hasattr(mcp.state, "mongo_client"):
mcp.state.mongo_client.close()
# MCP工具 - 创建项目
@mcp.tool()
async def create_item(collection: str, data: dict):
"""创建新项目
参数:
collection: 集合名称
data: 项目数据
返回:
创建的项目ID
"""
# 获取指定集合
if not hasattr(mcp.state, "db"):
raise RuntimeError("数据库未初始化")
collection = mcp.state.db[collection]
# 插入数据
result = await collection.insert_one(data)
return {"id": str(result.inserted_id)}
# MCP资源 - 获取项目
@mcp.resource("items://{collection}/{item_id}")
async def get_item(collection: str, item_id: str):
"""获取项目详情
参数:
collection: 集合名称
item_id: 项目ID
返回:
项目详情
"""
# 获取指定集合
if not hasattr(mcp.state, "db"):
raise RuntimeError("数据库未初始化")
collection = mcp.state.db[collection]
# 查询项目
try:
item = await collection.find_one({"_id": ObjectId(item_id)})
except:
return {"error": "无效的项目ID"}, 400
if not item:
return {"error": "项目不存在"}, 404
# 转换ObjectId为字符串
item["_id"] = str(item["_id"])
return item
与存储系统集成
文件存储集成
使用AWS S3集成
通过将MCP与Amazon S3集成,可以提供云存储功能:
import asyncio
import aioboto3
from mcp.server.fastmcp import FastMCP
# 创建MCP服务器
mcp = FastMCP("S3存储集成示例")
# S3客户端依赖
async def get_s3_client():
"""获取S3客户端"""
if not hasattr(mcp.state, "s3_session"):
raise RuntimeError("S3未初始化")
async with mcp.state.s3_session.client("s3") as client:
yield client
# 启动事件 - 初始化S3客户端
@mcp.on_startup
async def initialize_s3():
# 创建S3会话
session = aioboto3.Session(
aws_access_key_id="YOUR_ACCESS_KEY",
aws_secret_access_key="YOUR_SECRET_KEY",
region_name="us-west-1"
)
mcp.state.s3_session = session
mcp.state.s3_bucket = "your-bucket-name"
# MCP工具 - 上传文件
@mcp.tool()
async def upload_file(key: str, content: str, content_type: str = "text/plain"):
"""上传文件到S3
参数:
key: 文件键名
content: 文件内容
content_type: 内容类型
返回:
上传状态
"""
# 获取S3客户端
client_gen = get_s3_client()
client = await client_gen.__anext__()
try:
# 上传文件
await client.put_object(
Bucket=mcp.state.s3_bucket,
Key=key,
Body=content,
ContentType=content_type
)
return {
"success": True,
"message": "文件已上传",
"url": f"https://{mcp.state.s3_bucket}.s3.amazonaws.com/{key}"
}
finally:
# 确保关闭客户端
try:
await client_gen.aclose()
except:
pass
# MCP资源 - 获取文件
@mcp.resource("files://{key}")
async def get_file(key: str):
"""从S3获取文件
参数:
key: 文件键名
返回:
文件内容
"""
# 获取S3客户端
client_gen = get_s3_client()
client = await client_gen.__anext__()
try:
# 下载文件
try:
response = await client.get_object(
Bucket=mcp.state.s3_bucket,
Key=key
)
except Exception as e:
return {"error": f"文件不存在: {str(e)}"}, 404
# 读取文件内容
body = await response['Body'].read()
content_type = response.get('ContentType', 'application/octet-stream')
# 返回文件内容
return body.decode('utf-8'), content_type
finally:
# 确保关闭客户端
try:
await client_gen.aclose()
except:
pass
小结
在本章中,我们探讨了将MCP服务与各种现有系统集成的方法:
- 与Web框架(FastAPI、Flask、Django)集成
- 在微服务架构中部署MCP服务
- 与关系型数据库和NoSQL数据库集成
- 与云存储系统集成
通过这些集成,MCP服务可以成为更大系统的一部分,提供强大的模型上下文和工具功能,同时充分利用现有技术栈的优势。在下一章中,我们将探讨MCP服务的安全最佳实践。