3.3_MCP与现有系统集成

3.3 MCP与现有系统集成

在实际应用中,MCP通常需要与现有系统集成,如Web框架、数据库、存储系统等。本章将探讨如何将MCP服务无缝集成到现有技术栈中。

与Web框架集成

MCP服务可以与各种流行的Python Web框架集成,使其成为更大应用的一部分。

与FastAPI集成

FastAPI是一个现代、高性能的Web框架,与MCP的异步设计理念相符。以下是将MCP集成到FastAPI应用的示例:

import asyncio
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from mcp.server.fastmcp import FastMCP

# 创建FastAPI应用
app = FastAPI(title="MCP与FastAPI集成示例")

# 创建MCP服务器
mcp = FastMCP("MCP服务")

# 注册MCP资源和工具
@mcp.resource("example://hello")
async def get_hello():
    return {"message": "Hello from MCP!"}

@mcp.tool()
async def echo(message: str):
    """回显消息"""
    return {"echo": message}

# 请求模型
class ResourceRequest(BaseModel):
    uri: str
    params: dict = {}

class ToolRequest(BaseModel):
    tool_name: str
    arguments: dict = {}

# FastAPI端点:获取资源
@app.post("/api/mcp/resource")
async def get_resource(request: ResourceRequest):
    try:
        result = await mcp.handle_resource_request(request.uri, request.params)
        return JSONResponse(result)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# FastAPI端点:执行工具(支持流式响应)
@app.post("/api/mcp/tool")
async def execute_tool(request: ToolRequest):
    async def stream_response():
        try:
            async for result in mcp.handle_tool_request(
                request.tool_name, request.arguments
            ):
                # 将每个结果转换为JSON行
                yield f"{result}\n"
        except Exception as e:
            yield f"{{'error': '{str(e)}'}}\n"

    return StreamingResponse(
        stream_response(),
        media_type="application/x-ndjson"
    )

# 健康检查端点
@app.get("/health")
async def health_check():
    return {"status": "healthy", "mcp_version": mcp.version}

# MCP工具列表端点
@app.get("/api/mcp/tools")
async def list_tools():
    tools = mcp.list_tools()
    return {"tools": tools}

# 主应用启动事件
@app.on_event("startup")
async def startup():
    # 启动MCP服务器
    await mcp.startup()

# 主应用关闭事件
@app.on_event("shutdown")
async def shutdown():
    # 关闭MCP服务器
    await mcp.shutdown()

# 启动应用
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

与Flask集成

对于偏好Flask的开发者,可以使用以下模式集成MCP:

import asyncio
import json
from queue import Queue
from threading import Thread
from flask import Flask, request, jsonify, Response
from mcp.server.fastmcp import FastMCP

# 创建Flask应用
app = Flask(__name__)

# 创建MCP服务器
mcp = FastMCP("MCP服务")

# 注册MCP资源和工具
@mcp.resource("example://hello")
async def get_hello():
    return {"message": "Hello from MCP!"}

@mcp.tool()
async def echo(message: str):
    """回显消息"""
    return {"echo": message}

# 运行异步函数的辅助函数
def run_async(coro):
    """在单独的线程中运行协程并返回结果"""
    result_queue = Queue()
    
    def run_in_thread():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        try:
            result = loop.run_until_complete(coro)
            result_queue.put({"success": True, "result": result})
        except Exception as e:
            result_queue.put({"success": False, "error": str(e)})
        finally:
            loop.close()
    
    thread = Thread(target=run_in_thread)
    thread.start()
    thread.join()
    
    result = result_queue.get()
    if result["success"]:
        return result["result"]
    else:
        raise Exception(result["error"])

# 启动MCP服务器
@app.before_first_request
def before_first_request():
    run_async(mcp.startup())

# 关闭MCP服务器
@app.teardown_appcontext
def teardown_appcontext(exception):
    run_async(mcp.shutdown())

# Flask端点:获取资源
@app.route("/api/mcp/resource", methods=["POST"])
def get_resource():
    data = request.json
    uri = data.get("uri")
    params = data.get("params", {})
    
    try:
        result = run_async(mcp.handle_resource_request(uri, params))
        return jsonify(result)
    except Exception as e:
        return jsonify({"error": str(e)}), 500

# Flask端点:执行工具
@app.route("/api/mcp/tool", methods=["POST"])
def execute_tool():
    data = request.json
    tool_name = data.get("tool_name")
    arguments = data.get("arguments", {})
    
    def generate():
        async def process():
            try:
                async for result in mcp.handle_tool_request(tool_name, arguments):
                    yield f"{json.dumps(result)}\n"
            except Exception as e:
                yield f"{json.dumps({'error': str(e)})}\n"
        
        # 将异步生成器转换为同步生成器
        coro = process()
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        
        try:
            while True:
                try:
                    chunk = loop.run_until_complete(coro.__anext__())
                    yield chunk
                except StopAsyncIteration:
                    break
        finally:
            loop.close()
    
    return Response(
        generate(),
        mimetype="application/x-ndjson"
    )

# 健康检查端点
@app.route("/health", methods=["GET"])
def health_check():
    return jsonify({"status": "healthy", "mcp_version": mcp.version})

# 启动应用
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, threaded=True)

与Django集成

对于Django项目,可以通过以下方式集成MCP:

# myapp/mcp_service.py
from mcp.server.fastmcp import FastMCP

# 创建MCP服务器单例
mcp = FastMCP("Django MCP服务")

# 注册资源和工具
@mcp.resource("example://hello")
async def get_hello():
    return {"message": "Hello from Django MCP!"}

@mcp.tool()
async def echo(message: str):
    """回显消息"""
    return {"echo": message}

# 初始化和关闭函数
async def startup():
    await mcp.startup()

async def shutdown():
    await mcp.shutdown()
# myapp/apps.py
from django.apps import AppConfig
import asyncio
import threading

class MyAppConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'myapp'
    
    def ready(self):
        # 避免在开发环境中多次启动
        import os
        if os.environ.get('RUN_MAIN', None) != 'true':
            return
            
        # 启动MCP服务器
        from .mcp_service import startup
        
        def start_mcp():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            loop.run_until_complete(startup())
        
        threading.Thread(target=start_mcp, daemon=True).start()
# myapp/views.py
import asyncio
import json
from django.http import JsonResponse, StreamingHttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from asgiref.sync import async_to_sync
from .mcp_service import mcp

# Django视图:获取资源
@csrf_exempt
@require_http_methods(["POST"])
def get_resource(request):
    data = json.loads(request.body)
    uri = data.get("uri")
    params = data.get("params", {})
    
    try:
        result = async_to_sync(mcp.handle_resource_request)(uri, params)
        return JsonResponse(result)
    except Exception as e:
        return JsonResponse({"error": str(e)}, status=500)

# Django视图:执行工具
@csrf_exempt
@require_http_methods(["POST"])
def execute_tool(request):
    data = json.loads(request.body)
    tool_name = data.get("tool_name")
    arguments = data.get("arguments", {})
    
    async def process():
        try:
            async for result in mcp.handle_tool_request(tool_name, arguments):
                yield f"{json.dumps(result)}\n".encode('utf-8')
        except Exception as e:
            yield f"{json.dumps({'error': str(e)})}\n".encode('utf-8')
    
    # 使用ASGI流式响应
    return StreamingHttpResponse(
        async_to_sync(lambda: process())(),
        content_type="application/x-ndjson"
    )
# myapp/urls.py
from django.urls import path
from . import views

urlpatterns = [
    path('api/mcp/resource', views.get_resource, name='mcp_resource'),
    path('api/mcp/tool', views.execute_tool, name='mcp_tool'),
]

使用ASGI服务器直接集成

对于ASGI服务器(如Uvicorn、Hypercorn等),可以直接将MCP服务作为ASGI应用提供:

from mcp.server.fastmcp import FastMCP

# 创建MCP服务器
mcp = FastMCP("ASGI MCP服务")

# 注册资源和工具
@mcp.resource("example://hello")
async def get_hello():
    return {"message": "Hello from ASGI MCP!"}

@mcp.tool()
async def echo(message: str):
    """回显消息"""
    return {"echo": message}

# 获取ASGI应用
app = mcp.get_asgi_app()

# 使用ASGI服务器运行
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

MCP服务与微服务架构

在微服务架构中,MCP服务可以作为独立的服务组件:

                  ┌─────────────┐
                  │  API网关    │
                  └─────┬───────┘
                        │
        ┌───────────────┼───────────────┐
        │               │               │
┌───────▼─────┐  ┌──────▼──────┐  ┌─────▼───────┐
│ 用户服务    │  │  MCP服务    │  │ 其他微服务  │
└─────────────┘  └─────────────┘  └─────────────┘
                        │
                 ┌──────┴──────┐
                 │ 数据库/存储 │
                 └─────────────┘

使用API网关路由到MCP服务

示例:使用NGINX作为API网关配置:

# nginx.conf
http {
    upstream mcp_service {
        server mcp-service:8000;
    }
    
    upstream user_service {
        server user-service:8001;
    }
    
    server {
        listen 80;
        
        # MCP服务路由
        location /api/mcp/ {
            proxy_pass http://mcp_service/;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
            proxy_set_header Host $host;
        }
        
        # 用户服务路由
        location /api/users/ {
            proxy_pass http://user_service/;
        }
    }
}

为微服务架构设计MCP服务发现

在微服务环境中,可以实现MCP服务发现机制:

import json
import aiohttp
from mcp.client.base import MCPClient

class ServiceRegistry:
    """MCP服务注册表"""
    
    def __init__(self, registry_url):
        self.registry_url = registry_url
        self.services = {}
    
    async def register_service(self, service_id, service_info):
        """注册MCP服务"""
        async with aiohttp.ClientSession() as session:
            await session.post(
                f"{self.registry_url}/services",
                json={
                    "id": service_id,
                    "info": service_info
                }
            )
    
    async def discover_service(self, service_id):
        """发现MCP服务"""
        if service_id in self.services:
            return self.services[service_id]
            
        async with aiohttp.ClientSession() as session:
            response = await session.get(
                f"{self.registry_url}/services/{service_id}"
            )
            service_info = await response.json()
            self.services[service_id] = service_info
            return service_info
    
    async def get_client(self, service_id):
        """获取MCP客户端"""
        service_info = await self.discover_service(service_id)
        
        # 创建客户端
        client = MCPClient(
            transport_type=service_info["transport_type"],
            host=service_info["host"],
            port=service_info["port"]
        )
        
        # 连接到服务
        await client.connect()
        
        return client

与数据库系统集成

MCP服务经常需要与数据库交互。下面我们将探讨如何将MCP服务与各种数据库系统集成。

与关系型数据库集成

使用SQLAlchemy集成PostgreSQL

SQLAlchemy是Python中流行的ORM库,可以与MCP服务无缝集成:

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, DateTime, func
from mcp.server.fastmcp import FastMCP, Context

# 创建SQLAlchemy模型
Base = declarative_base()

class Document(Base):
    __tablename__ = "documents"
    
    id = Column(Integer, primary_key=True)
    title = Column(String(255), nullable=False)
    content = Column(Text, nullable=False)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, onupdate=func.now())

# 创建MCP服务器
mcp = FastMCP("数据库集成示例")

# 启动事件 - 数据库连接初始化
@mcp.on_startup
async def initialize_db():
    # 创建异步数据库引擎
    engine = create_async_engine(
        "postgresql+asyncpg://user:password@localhost/dbname",
        echo=True
    )
    
    # 创建会话工厂
    async_session = async_sessionmaker(
        engine, expire_on_commit=False, class_=AsyncSession
    )
    
    # 存储在服务器状态中
    mcp.state.db_engine = engine
    mcp.state.async_session = async_session
    
    # 创建表(如果不存在)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

# 关闭事件 - 清理数据库连接
@mcp.on_shutdown
async def cleanup_db():
    if hasattr(mcp.state, "db_engine"):
        await mcp.state.db_engine.dispose()

# 数据库会话上下文管理器
async def get_db_session():
    """获取数据库会话"""
    if not hasattr(mcp.state, "async_session"):
        raise RuntimeError("数据库未初始化")
    
    async_session = mcp.state.async_session
    async with async_session() as session:
        yield session

# MCP工具 - 创建文档
@mcp.tool()
async def create_document(title: str, content: str):
    """创建新文档
    
    参数:
        title: 文档标题
        content: 文档内容
    
    返回:
        创建的文档ID
    """
    # 获取数据库会话
    session_gen = get_db_session()
    session = await session_gen.__anext__()
    
    try:
        # 创建新文档
        document = Document(title=title, content=content)
        session.add(document)
        await session.commit()
        await session.refresh(document)
        
        return {"id": document.id, "title": document.title}
    finally:
        # 确保关闭会话
        try:
            await session_gen.aclose()
        except:
            pass

# MCP资源 - 获取文档
@mcp.resource("documents://{document_id}")
async def get_document(context: Context, document_id: int):
    """获取文档详情
    
    参数:
        document_id: 文档ID
    
    返回:
        文档详情
    """
    # 获取数据库会话
    session_gen = get_db_session()
    session = await session_gen.__anext__()
    
    try:
        # 查询文档
        from sqlalchemy import select
        stmt = select(Document).where(Document.id == document_id)
        result = await session.execute(stmt)
        document = result.scalar_one_or_none()
        
        if document is None:
            return {"error": "文档不存在"}, 404
        
        # 返回文档数据
        return {
            "id": document.id,
            "title": document.title,
            "content": document.content,
            "created_at": document.created_at.isoformat(),
            "updated_at": document.updated_at.isoformat() if document.updated_at else None
        }
    finally:
        # 确保关闭会话
        try:
            await session_gen.aclose()
        except:
            pass

与NoSQL数据库集成

MongoDB集成示例

import motor.motor_asyncio
from bson.objectid import ObjectId
from mcp.server.fastmcp import FastMCP

# 创建MCP服务器
mcp = FastMCP("MongoDB集成示例")

# 启动事件 - 数据库连接初始化
@mcp.on_startup
async def initialize_mongo():
    # 创建MongoDB客户端
    client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")
    
    # 获取数据库和集合
    db = client.mcp_database
    mcp.state.mongo_client = client
    mcp.state.db = db

# 关闭事件 - 清理连接
@mcp.on_shutdown
async def cleanup_mongo():
    if hasattr(mcp.state, "mongo_client"):
        mcp.state.mongo_client.close()

# MCP工具 - 创建项目
@mcp.tool()
async def create_item(collection: str, data: dict):
    """创建新项目
    
    参数:
        collection: 集合名称
        data: 项目数据
    
    返回:
        创建的项目ID
    """
    # 获取指定集合
    if not hasattr(mcp.state, "db"):
        raise RuntimeError("数据库未初始化")
    
    collection = mcp.state.db[collection]
    
    # 插入数据
    result = await collection.insert_one(data)
    
    return {"id": str(result.inserted_id)}

# MCP资源 - 获取项目
@mcp.resource("items://{collection}/{item_id}")
async def get_item(collection: str, item_id: str):
    """获取项目详情
    
    参数:
        collection: 集合名称
        item_id: 项目ID
    
    返回:
        项目详情
    """
    # 获取指定集合
    if not hasattr(mcp.state, "db"):
        raise RuntimeError("数据库未初始化")
    
    collection = mcp.state.db[collection]
    
    # 查询项目
    try:
        item = await collection.find_one({"_id": ObjectId(item_id)})
    except:
        return {"error": "无效的项目ID"}, 400
    
    if not item:
        return {"error": "项目不存在"}, 404
    
    # 转换ObjectId为字符串
    item["_id"] = str(item["_id"])
    
    return item

与存储系统集成

文件存储集成

使用AWS S3集成

通过将MCP与Amazon S3集成,可以提供云存储功能:

import asyncio
import aioboto3
from mcp.server.fastmcp import FastMCP

# 创建MCP服务器
mcp = FastMCP("S3存储集成示例")

# S3客户端依赖
async def get_s3_client():
    """获取S3客户端"""
    if not hasattr(mcp.state, "s3_session"):
        raise RuntimeError("S3未初始化")
    
    async with mcp.state.s3_session.client("s3") as client:
        yield client

# 启动事件 - 初始化S3客户端
@mcp.on_startup
async def initialize_s3():
    # 创建S3会话
    session = aioboto3.Session(
        aws_access_key_id="YOUR_ACCESS_KEY",
        aws_secret_access_key="YOUR_SECRET_KEY",
        region_name="us-west-1"
    )
    
    mcp.state.s3_session = session
    mcp.state.s3_bucket = "your-bucket-name"

# MCP工具 - 上传文件
@mcp.tool()
async def upload_file(key: str, content: str, content_type: str = "text/plain"):
    """上传文件到S3
    
    参数:
        key: 文件键名
        content: 文件内容
        content_type: 内容类型
    
    返回:
        上传状态
    """
    # 获取S3客户端
    client_gen = get_s3_client()
    client = await client_gen.__anext__()
    
    try:
        # 上传文件
        await client.put_object(
            Bucket=mcp.state.s3_bucket,
            Key=key,
            Body=content,
            ContentType=content_type
        )
        
        return {
            "success": True,
            "message": "文件已上传",
            "url": f"https://{mcp.state.s3_bucket}.s3.amazonaws.com/{key}"
        }
    finally:
        # 确保关闭客户端
        try:
            await client_gen.aclose()
        except:
            pass

# MCP资源 - 获取文件
@mcp.resource("files://{key}")
async def get_file(key: str):
    """从S3获取文件
    
    参数:
        key: 文件键名
    
    返回:
        文件内容
    """
    # 获取S3客户端
    client_gen = get_s3_client()
    client = await client_gen.__anext__()
    
    try:
        # 下载文件
        try:
            response = await client.get_object(
                Bucket=mcp.state.s3_bucket,
                Key=key
            )
        except Exception as e:
            return {"error": f"文件不存在: {str(e)}"}, 404
        
        # 读取文件内容
        body = await response['Body'].read()
        content_type = response.get('ContentType', 'application/octet-stream')
        
        # 返回文件内容
        return body.decode('utf-8'), content_type
    finally:
        # 确保关闭客户端
        try:
            await client_gen.aclose()
        except:
            pass

小结

在本章中,我们探讨了将MCP服务与各种现有系统集成的方法:

  • 与Web框架(FastAPI、Flask、Django)集成
  • 在微服务架构中部署MCP服务
  • 与关系型数据库和NoSQL数据库集成
  • 与云存储系统集成

通过这些集成,MCP服务可以成为更大系统的一部分,提供强大的模型上下文和工具功能,同时充分利用现有技术栈的优势。在下一章中,我们将探讨MCP服务的安全最佳实践。

使用 Hugo 构建
主题 StackJimmy 设计