4.2_内容生成与编辑系统

4.2 内容生成与编辑系统

目录

简介

在本节中,我们将构建一个完整的内容生成与编辑系统,利用MCP协议来连接内容创建、编辑和管理的各个环节。这个系统将演示如何使用MCP来构建一个实用的内容管理工作流,适用于博客平台、文档系统或其他需要结构化内容处理的应用场景。

系统将包含以下核心功能:

  • 基于模板的内容生成
  • 内容编辑与格式化工具
  • 审核工作流程
  • 版本控制与历史记录
  • 内容发布与管理

通过这个项目,你将学习如何将MCP应用于实际业务流程,同时掌握复杂系统中各组件协同工作的最佳实践。

系统设计概述

系统架构

我们的内容生成与编辑系统将采用模块化架构,包含以下核心组件:

  1. 内容模板服务: 管理各类内容模板,提供模板选择和填充功能
  2. 内容生成引擎: 基于模板和用户输入生成初始内容
  3. 编辑工作台: 提供内容编辑、格式化和增强功能
  4. 审核系统: 处理内容审核和反馈流程
  5. 版本管理器: 追踪内容更改历史并支持版本回滚
  6. 存储服务: 持久化存储内容和相关元数据

这些组件将通过MCP协议进行通信,形成一个完整的工作流系统。

技术栈

在本项目中,我们将使用以下技术:

  • Python: 作为主要的开发语言
  • MCP: 用于组件间通信和工具集成
  • SQLite: 作为内容和元数据的轻量级存储
  • Markdown: 作为内容的默认格式
  • Flask: 提供简单的Web界面(可选)

数据模型

系统的核心数据模型包括:

Content {
    id: string,
    title: string,
    body: string,
    template_id: string,
    author: string,
    status: enum(draft, review, published, archived),
    created_at: timestamp,
    updated_at: timestamp,
    metadata: object
}

Template {
    id: string,
    name: string,
    description: string,
    schema: object,  // 定义模板需要的字段和规则
    content: string  // 包含占位符的模板内容
}

Version {
    id: string,
    content_id: string,
    revision: number,
    body: string,
    comment: string,
    author: string,
    timestamp: timestamp
}

Review {
    id: string,
    content_id: string,
    reviewer: string,
    status: enum(pending, approved, rejected),
    comments: string,
    timestamp: timestamp
}

这个数据模型将支持我们系统的所有核心功能,同时保持足够的灵活性以适应不同的内容类型和工作流程。

MCP资源与工具

我们将为系统定义以下MCP资源和工具:

资源 (Resources):

  • /templates: 内容模板集合
  • /templates/{id}: 特定模板
  • /contents: 内容集合
  • /contents/{id}: 特定内容
  • /contents/{id}/versions: 内容的版本历史
  • /contents/{id}/reviews: 内容的审核记录

工具 (Tools):

  • generateContent: 基于模板生成内容
  • formatContent: 格式化和规范化内容
  • validateContent: 验证内容是否符合规则
  • createVersion: 创建内容的新版本
  • compareVersions: 比较两个版本的差异
  • submitForReview: 提交内容进行审核
  • publishContent: 发布已审核的内容

在接下来的章节中,我们将逐步实现这些组件和功能,构建一个完整的内容生成与编辑系统。

步骤1:构建内容模板和生成工具

首先,我们需要创建内容模板系统和基于这些模板的内容生成工具。这些组件将成为我们系统的基础。

设置项目结构

让我们创建一个清晰的项目结构:

content_system/
│
├── mcp_server.py        # MCP服务器入口
├── database/
│   ├── __init__.py
│   ├── db_manager.py    # 数据库管理
│   └── models.py        # 数据模型
│
├── services/
│   ├── __init__.py
│   ├── template_service.py   # 模板服务
│   └── content_service.py    # 内容服务
│
├── utils/
│   ├── __init__.py
│   └── formatter.py     # 格式化工具
│
└── data/
    └── content.db       # SQLite数据库

实现数据库模型

首先,我们需要创建数据库模型来存储模板和内容。在database/models.py中:

import sqlite3
import json
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Union


@dataclass
class Template:
    id: str
    name: str
    description: str
    schema: Dict[str, Any]
    content: str
    created_at: int = None

    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "name": self.name,
            "description": self.description,
            "schema": self.schema,
            "content": self.content,
            "created_at": self.created_at or int(time.time())
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'Template':
        return cls(
            id=data["id"],
            name=data["name"],
            description=data["description"],
            schema=data["schema"],
            content=data["content"],
            created_at=data.get("created_at")
        )


@dataclass
class Content:
    id: str
    title: str
    body: str
    template_id: str
    author: str
    status: str  # 'draft', 'review', 'published', 'archived'
    metadata: Dict[str, Any]
    created_at: int = None
    updated_at: int = None
    
    def to_dict(self) -> Dict[str, Any]:
        now = int(time.time())
        return {
            "id": self.id,
            "title": self.title,
            "body": self.body,
            "template_id": self.template_id,
            "author": self.author,
            "status": self.status,
            "metadata": self.metadata,
            "created_at": self.created_at or now,
            "updated_at": self.updated_at or now
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'Content':
        return cls(
            id=data["id"],
            title=data["title"],
            body=data["body"],
            template_id=data["template_id"],
            author=data["author"],
            status=data["status"],
            metadata=data["metadata"],
            created_at=data.get("created_at"),
            updated_at=data.get("updated_at")
        )

创建数据库管理器

接下来,我们实现数据库管理器来处理数据存储。在database/db_manager.py中:

import sqlite3
import json
import uuid
import time
from typing import List, Dict, Any, Optional
from pathlib import Path

from .models import Template, Content


class DatabaseManager:
    def __init__(self, db_path="data/content.db"):
        # 确保数据目录存在
        Path(db_path).parent.mkdir(exist_ok=True)
        
        self.conn = sqlite3.connect(db_path)
        self.conn.row_factory = sqlite3.Row
        self._create_tables()
    
    def _create_tables(self):
        cursor = self.conn.cursor()
        
        # 创建模板表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS templates (
            id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            description TEXT,
            schema TEXT NOT NULL,
            content TEXT NOT NULL,
            created_at INTEGER NOT NULL
        )
        ''')
        
        # 创建内容表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS contents (
            id TEXT PRIMARY KEY,
            title TEXT NOT NULL,
            body TEXT NOT NULL,
            template_id TEXT NOT NULL,
            author TEXT NOT NULL,
            status TEXT NOT NULL,
            metadata TEXT NOT NULL,
            created_at INTEGER NOT NULL,
            updated_at INTEGER NOT NULL,
            FOREIGN KEY (template_id) REFERENCES templates (id)
        )
        ''')
        
        # 创建版本表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS content_versions (
            id TEXT PRIMARY KEY,
            content_id TEXT NOT NULL,
            revision INTEGER NOT NULL,
            body TEXT NOT NULL,
            comment TEXT,
            author TEXT NOT NULL,
            timestamp INTEGER NOT NULL,
            FOREIGN KEY (content_id) REFERENCES contents (id),
            UNIQUE (content_id, revision)
        )
        ''')
        
        # 创建审核表
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS content_reviews (
            id TEXT PRIMARY KEY,
            content_id TEXT NOT NULL,
            reviewer TEXT NOT NULL,
            status TEXT NOT NULL,
            comments TEXT,
            timestamp INTEGER NOT NULL,
            FOREIGN KEY (content_id) REFERENCES contents (id)
        )
        ''')
        
        self.conn.commit()
    
    def close(self):
        self.conn.close()
    
    # 模板操作
    def create_template(self, template: Template) -> Template:
        if not template.id:
            template.id = str(uuid.uuid4())
        
        template.created_at = int(time.time())
        data = template.to_dict()
        
        cursor = self.conn.cursor()
        cursor.execute(
            'INSERT INTO templates VALUES (?, ?, ?, ?, ?, ?)',
            (
                data["id"],
                data["name"],
                data["description"],
                json.dumps(data["schema"]),
                data["content"],
                data["created_at"]
            )
        )
        self.conn.commit()
        return template
    
    def get_template(self, template_id: str) -> Optional[Template]:
        cursor = self.conn.cursor()
        cursor.execute('SELECT * FROM templates WHERE id = ?', (template_id,))
        row = cursor.fetchone()
        
        if row:
            return Template.from_dict({
                "id": row["id"],
                "name": row["name"],
                "description": row["description"],
                "schema": json.loads(row["schema"]),
                "content": row["content"],
                "created_at": row["created_at"]
            })
        return None
    
    def list_templates(self) -> List[Template]:
        cursor = self.conn.cursor()
        cursor.execute('SELECT * FROM templates')
        rows = cursor.fetchall()
        
        return [Template.from_dict({
            "id": row["id"],
            "name": row["name"],
            "description": row["description"],
            "schema": json.loads(row["schema"]),
            "content": row["content"],
            "created_at": row["created_at"]
        }) for row in rows]
    
    # 内容操作
    def create_content(self, content: Content) -> Content:
        if not content.id:
            content.id = str(uuid.uuid4())
        
        now = int(time.time())
        content.created_at = now
        content.updated_at = now
        data = content.to_dict()
        
        cursor = self.conn.cursor()
        cursor.execute(
            'INSERT INTO contents VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
            (
                data["id"],
                data["title"],
                data["body"],
                data["template_id"],
                data["author"],
                data["status"],
                json.dumps(data["metadata"]),
                data["created_at"],
                data["updated_at"]
            )
        )
        self.conn.commit()
        return content
    
    def update_content(self, content: Content) -> Content:
        content.updated_at = int(time.time())
        data = content.to_dict()
        
        cursor = self.conn.cursor()
        cursor.execute(
            '''UPDATE contents 
               SET title = ?, body = ?, status = ?, metadata = ?, updated_at = ?
               WHERE id = ?''',
            (
                data["title"],
                data["body"],
                data["status"],
                json.dumps(data["metadata"]),
                data["updated_at"],
                data["id"]
            )
        )
        self.conn.commit()
        return content
    
    def get_content(self, content_id: str) -> Optional[Content]:
        cursor = self.conn.cursor()
        cursor.execute('SELECT * FROM contents WHERE id = ?', (content_id,))
        row = cursor.fetchone()
        
        if row:
            return Content.from_dict({
                "id": row["id"],
                "title": row["title"],
                "body": row["body"],
                "template_id": row["template_id"],
                "author": row["author"],
                "status": row["status"],
                "metadata": json.loads(row["metadata"]),
                "created_at": row["created_at"],
                "updated_at": row["updated_at"]
            })
        return None
    
    def list_contents(self, status=None) -> List[Content]:
        cursor = self.conn.cursor()
        if status:
            cursor.execute('SELECT * FROM contents WHERE status = ?', (status,))
        else:
            cursor.execute('SELECT * FROM contents')
        rows = cursor.fetchall()
        
        return [Content.from_dict({
            "id": row["id"],
            "title": row["title"],
            "body": row["body"],
            "template_id": row["template_id"],
            "author": row["author"],
            "status": row["status"],
            "metadata": json.loads(row["metadata"]),
            "created_at": row["created_at"],
            "updated_at": row["updated_at"]
        }) for row in rows]

    # 版本控制操作
    def create_version(self, version: ContentVersion) -> ContentVersion:
        if not version.id:
            version.id = str(uuid.uuid4())
        
        version.timestamp = int(time.time())
        data = version.to_dict()
        
        cursor = self.conn.cursor()
        cursor.execute(
            'INSERT INTO content_versions VALUES (?, ?, ?, ?, ?, ?, ?)',
            (
                data["id"],
                data["content_id"],
                data["revision"],
                data["body"],
                data["comment"],
                data["author"],
                data["timestamp"]
            )
        )
        self.conn.commit()
        return version

    def get_content_versions(self, content_id: str) -> List[ContentVersion]:
        cursor = self.conn.cursor()
        cursor.execute(
            'SELECT * FROM content_versions WHERE content_id = ? ORDER BY revision DESC',
            (content_id,)
        )
        rows = cursor.fetchall()
        
        return [ContentVersion.from_dict({
            "id": row["id"],
            "content_id": row["content_id"],
            "revision": row["revision"],
            "body": row["body"],
            "comment": row["comment"],
            "author": row["author"],
            "timestamp": row["timestamp"]
        }) for row in rows]

    def get_content_version(self, content_id: str, revision: int) -> Optional[ContentVersion]:
        cursor = self.conn.cursor()
        cursor.execute(
            'SELECT * FROM content_versions WHERE content_id = ? AND revision = ?',
            (content_id, revision)
        )
        row = cursor.fetchone()
        
        if row:
            return ContentVersion.from_dict({
                "id": row["id"],
                "content_id": row["content_id"],
                "revision": row["revision"],
                "body": row["body"],
                "comment": row["comment"],
                "author": row["author"],
                "timestamp": row["timestamp"]
            })
        return None

    def get_next_revision(self, content_id: str) -> int:
        cursor = self.conn.cursor()
        cursor.execute(
            'SELECT MAX(revision) as max_rev FROM content_versions WHERE content_id = ?',
            (content_id,)
        )
        row = cursor.fetchone()
        
        if row and row["max_rev"] is not None:
            return row["max_rev"] + 1
        return 1  # 第一个版本

    # 审核操作
    def create_review(self, review: ContentReview) -> ContentReview:
        if not review.id:
            review.id = str(uuid.uuid4())
        
        review.timestamp = int(time.time())
        data = review.to_dict()
        
        cursor = self.conn.cursor()
        cursor.execute(
            'INSERT INTO content_reviews VALUES (?, ?, ?, ?, ?, ?)',
            (
                data["id"],
                data["content_id"],
                data["reviewer"],
                data["status"],
                data["comments"],
                data["timestamp"]
            )
        )
        self.conn.commit()
        return review

    def update_review(self, review: ContentReview) -> ContentReview:
        review.timestamp = int(time.time())
        data = review.to_dict()
        
        cursor = self.conn.cursor()
        cursor.execute(
            '''UPDATE content_reviews
               SET status = ?, comments = ?, timestamp = ?
               WHERE id = ?''',
            (
                data["status"],
                data["comments"],
                data["timestamp"],
                data["id"]
            )
        )
        self.conn.commit()
        return review

    def get_content_reviews(self, content_id: str) -> List[ContentReview]:
        cursor = self.conn.cursor()
        cursor.execute(
            'SELECT * FROM content_reviews WHERE content_id = ? ORDER BY timestamp DESC',
            (content_id,)
        )
        rows = cursor.fetchall()
        
        return [ContentReview.from_dict({
            "id": row["id"],
            "content_id": row["content_id"],
            "reviewer": row["reviewer"],
            "status": row["status"],
            "comments": row["comments"],
            "timestamp": row["timestamp"]
        }) for row in rows]

    def get_pending_reviews(self) -> List[ContentReview]:
        cursor = self.conn.cursor()
        cursor.execute(
            'SELECT * FROM content_reviews WHERE status = ? ORDER BY timestamp ASC',
            ('pending',)
        )
        rows = cursor.fetchall()
        
        return [ContentReview.from_dict({
            "id": row["id"],
            "content_id": row["content_id"],
            "reviewer": row["reviewer"],
            "status": row["status"],
            "comments": row["comments"],
            "timestamp": row["timestamp"]
        }) for row in rows]

实现模板服务

现在我们创建模板服务来管理内容模板。在services/template_service.py中:

import string
from typing import Dict, Any, List, Optional

from database.db_manager import DatabaseManager
from database.models import Template


class TemplateService:
    def __init__(self, db_manager: DatabaseManager):
        self.db_manager = db_manager
    
    def create_template(self, name: str, description: str, schema: Dict[str, Any], 
                        content: str) -> Template:
        """创建新的内容模板"""
        template = Template(
            id=None,  # 由数据库管理器生成
            name=name,
            description=description,
            schema=schema,
            content=content
        )
        return self.db_manager.create_template(template)
    
    def get_template(self, template_id: str) -> Optional[Template]:
        """获取特定模板"""
        return self.db_manager.get_template(template_id)
    
    def list_templates(self) -> List[Template]:
        """列出所有可用的模板"""
        return self.db_manager.list_templates()
    
    def render_template(self, template_id: str, variables: Dict[str, Any]) -> str:
        """基于模板和变量渲染内容"""
        template = self.get_template(template_id)
        if not template:
            raise ValueError(f"Template not found: {template_id}")
        
        # 验证提供的变量是否符合模板的schema
        self._validate_variables(template.schema, variables)
        
        # 使用字符串模板填充内容
        template_str = string.Template(template.content)
        return template_str.substitute(variables)
    
    def _validate_variables(self, schema: Dict[str, Any], variables: Dict[str, Any]):
        """验证变量是否符合schema定义"""
        for field, field_schema in schema.items():
            # 检查必填字段
            if field_schema.get("required", False) and field not in variables:
                raise ValueError(f"Missing required field: {field}")
            
            # 如果字段存在,检查类型
            if field in variables:
                value = variables[field]
                field_type = field_schema.get("type", "string")
                
                # 简单类型检查
                if field_type == "string" and not isinstance(value, str):
                    raise ValueError(f"Field {field} should be a string")
                elif field_type == "number" and not (isinstance(value, int) or isinstance(value, float)):
                    raise ValueError(f"Field {field} should be a number")
                elif field_type == "boolean" and not isinstance(value, bool):
                    raise ValueError(f"Field {field} should be a boolean")
                elif field_type == "array" and not isinstance(value, list):
                    raise ValueError(f"Field {field} should be an array")
                elif field_type == "object" and not isinstance(value, dict):
                    raise ValueError(f"Field {field} should be an object")
                
                # 检查长度限制
                if isinstance(value, str) and "maxLength" in field_schema:
                    if len(value) > field_schema["maxLength"]:
                        raise ValueError(
                            f"Field {field} exceeds maximum length of {field_schema['maxLength']}")
                
                # 检查枚举值
                if "enum" in field_schema and value not in field_schema["enum"]:
                    valid_values = ", ".join(str(v) for v in field_schema["enum"])
                    raise ValueError(
                        f"Field {field} must be one of: {valid_values}")

实现内容服务

接下来,我们创建内容服务来管理生成的内容。在services/content_service.py中:

from typing import Dict, Any, List, Optional

from database.db_manager import DatabaseManager
from database.models import Content
from services.template_service import TemplateService


class ContentService:
    def __init__(self, db_manager: DatabaseManager, template_service: TemplateService):
        self.db_manager = db_manager
        self.template_service = template_service
    
    def generate_content(self, template_id: str, title: str, variables: Dict[str, Any], 
                         author: str) -> Content:
        """基于模板生成新内容"""
        # 使用模板服务渲染内容
        body = self.template_service.render_template(template_id, variables)
        
        # 创建新的内容记录
        content = Content(
            id=None,  # 由数据库管理器生成
            title=title,
            body=body,
            template_id=template_id,
            author=author,
            status="draft",  # 新生成的内容默认为草稿状态
            metadata={"template_variables": variables}  # 存储用于生成的变量
        )
        
        return self.db_manager.create_content(content)
    
    def get_content(self, content_id: str) -> Optional[Content]:
        """获取特定内容"""
        return self.db_manager.get_content(content_id)
    
    def list_contents(self, status=None) -> List[Content]:
        """列出内容,可选按状态筛选"""
        return self.db_manager.list_contents(status)
    
    def update_content(self, content_id: str, title: str = None, body: str = None,
                      status: str = None, metadata: Dict[str, Any] = None) -> Content:
        """更新内容"""
        content = self.get_content(content_id)
        if not content:
            raise ValueError(f"Content not found: {content_id}")
        
        # 更新提供的字段
        if title is not None:
            content.title = title
        if body is not None:
            content.body = body
        if status is not None:
            if status not in ["draft", "review", "published", "archived"]:
                raise ValueError(f"Invalid status: {status}")
            content.status = status
        if metadata is not None:
            # 合并而不是替换元数据
            content.metadata.update(metadata)
        
        return self.db_manager.update_content(content)

创建格式化工具

为了支持内容的格式化和规范化,我们实现一些工具函数。在utils/formatter.py中:

import re
import markdown
from typing import Dict, Any, List


def format_markdown(text: str) -> str:
    """规范化Markdown格式"""
    # 确保段落之间有一个空行
    text = re.sub(r'(\n)(?!\n)', r'\1\n', text)
    
    # 规范化标题格式(确保#后有空格)
    text = re.sub(r'(^|\n)(#+)([^ #])', r'\1\2 \3', text)
    
    # 规范化列表项(确保-后有空格)
    text = re.sub(r'(^|\n)-([^ ])', r'\1- \2', text)
    
    # 删除多余的空行(超过两个连续空行)
    text = re.sub(r'\n{3,}', r'\n\n', text)
    
    return text


def validate_markdown(text: str) -> List[Dict[str, Any]]:
    """验证Markdown内容,返回问题列表"""
    issues = []
    
    # 检查标题层级
    if not re.search(r'^# ', text, re.MULTILINE):
        issues.append({
            "type": "missing_h1",
            "message": "Missing top-level heading (H1)"
        })
    
    # 检查空链接
    empty_links = re.findall(r'\[([^\]]+)\]\(\s*\)', text)
    if empty_links:
        issues.append({
            "type": "empty_links",
            "message": f"Found empty links: {', '.join(empty_links)}"
        })
    
    # 检查潜在的拼写错误(简单示例)
    common_typos = {
        "teh": "the",
        "adn": "and",
        "tahn": "than"
    }
    
    for typo, correction in common_typos.items():
        if re.search(r'\b' + typo + r'\b', text, re.IGNORECASE):
            issues.append({
                "type": "potential_typo",
                "message": f"Potential typo: '{typo}' might be '{correction}'"
            })
    
    return issues


def markdown_to_html(text: str) -> str:
    """将Markdown转换为HTML"""
    return markdown.markdown(
        text,
        extensions=['extra', 'codehilite', 'tables', 'toc']
    )


def extract_metadata(text: str) -> Dict[str, Any]:
    """从Markdown中提取元数据(YAML前置内容)"""
    metadata = {}
    
    # 简单的YAML前置内容解析
    match = re.match(r'^---\s*\n(.*?)\n---\s*\n', text, re.DOTALL)
    if match:
        yaml_text = match.group(1)
        
        # 简单解析key-value对
        for line in yaml_text.split('\n'):
            if ':' in line:
                key, value = line.split(':', 1)
                metadata[key.strip()] = value.strip()
    
    return metadata

通过这些基础组件,我们已经构建了内容模板和生成工具的核心部分。在下一节中,我们将实现编辑和审核工作流部分。

步骤2:实现编辑和审核工作流

现在我们已经有了内容生成的基础,下面我们将实现编辑和审核工作流功能,这将使系统更加完整和实用。

创建版本控制模型

首先,让我们在database/models.py中添加版本和审核相关的数据模型:

@dataclass
class ContentVersion:
    id: str
    content_id: str
    revision: int
    body: str
    comment: str
    author: str
    timestamp: int = None
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "content_id": self.content_id,
            "revision": self.revision,
            "body": self.body,
            "comment": self.comment,
            "author": self.author,
            "timestamp": self.timestamp or int(time.time())
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'ContentVersion':
        return cls(
            id=data["id"],
            content_id=data["content_id"],
            revision=data["revision"],
            body=data["body"],
            comment=data["comment"],
            author=data["author"],
            timestamp=data.get("timestamp")
        )


@dataclass
class ContentReview:
    id: str
    content_id: str
    reviewer: str
    status: str  # 'pending', 'approved', 'rejected'
    comments: str
    timestamp: int = None
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "content_id": self.content_id,
            "reviewer": self.reviewer,
            "status": self.status,
            "comments": self.comments,
            "timestamp": self.timestamp or int(time.time())
        }
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'ContentReview':
        return cls(
            id=data["id"],
            content_id=data["content_id"],
            reviewer=data["reviewer"],
            status=data["status"],
            comments=data["comments"],
            timestamp=data.get("timestamp")
        )

扩展数据库管理器

接下来,我们需要在database/db_manager.py中扩展数据库管理器,以支持版本和审核功能:

def _create_tables(self):
    # ... 现有代码 ...
    
    # 创建版本表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS content_versions (
        id TEXT PRIMARY KEY,
        content_id TEXT NOT NULL,
        revision INTEGER NOT NULL,
        body TEXT NOT NULL,
        comment TEXT,
        author TEXT NOT NULL,
        timestamp INTEGER NOT NULL,
        FOREIGN KEY (content_id) REFERENCES contents (id),
        UNIQUE (content_id, revision)
    )
    ''')
    
    # 创建审核表
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS content_reviews (
        id TEXT PRIMARY KEY,
        content_id TEXT NOT NULL,
        reviewer TEXT NOT NULL,
        status TEXT NOT NULL,
        comments TEXT,
        timestamp INTEGER NOT NULL,
        FOREIGN KEY (content_id) REFERENCES contents (id)
    )
    ''')
    
    self.conn.commit()

# 版本控制操作
def create_version(self, version: ContentVersion) -> ContentVersion:
    if not version.id:
        version.id = str(uuid.uuid4())
    
    version.timestamp = int(time.time())
    data = version.to_dict()
    
    cursor = self.conn.cursor()
    cursor.execute(
        'INSERT INTO content_versions VALUES (?, ?, ?, ?, ?, ?, ?)',
        (
            data["id"],
            data["content_id"],
            data["revision"],
            data["body"],
            data["comment"],
            data["author"],
            data["timestamp"]
        )
    )
    self.conn.commit()
    return version

def get_content_versions(self, content_id: str) -> List[ContentVersion]:
    cursor = self.conn.cursor()
    cursor.execute(
        'SELECT * FROM content_versions WHERE content_id = ? ORDER BY revision DESC',
        (content_id,)
    )
    rows = cursor.fetchall()
    
    return [ContentVersion.from_dict({
        "id": row["id"],
        "content_id": row["content_id"],
        "revision": row["revision"],
        "body": row["body"],
        "comment": row["comment"],
        "author": row["author"],
        "timestamp": row["timestamp"]
    }) for row in rows]

def get_content_version(self, content_id: str, revision: int) -> Optional[ContentVersion]:
    cursor = self.conn.cursor()
    cursor.execute(
        'SELECT * FROM content_versions WHERE content_id = ? AND revision = ?',
        (content_id, revision)
    )
    row = cursor.fetchone()
    
    if row:
        return ContentVersion.from_dict({
            "id": row["id"],
            "content_id": row["content_id"],
            "revision": row["revision"],
            "body": row["body"],
            "comment": row["comment"],
            "author": row["author"],
            "timestamp": row["timestamp"]
        })
    return None

def get_next_revision(self, content_id: str) -> int:
    cursor = self.conn.cursor()
    cursor.execute(
        'SELECT MAX(revision) as max_rev FROM content_versions WHERE content_id = ?',
        (content_id,)
    )
    row = cursor.fetchone()
    
    if row and row["max_rev"] is not None:
        return row["max_rev"] + 1
    return 1  # 第一个版本

# 审核操作
def create_review(self, review: ContentReview) -> ContentReview:
    if not review.id:
        review.id = str(uuid.uuid4())
    
    review.timestamp = int(time.time())
    data = review.to_dict()
    
    cursor = self.conn.cursor()
    cursor.execute(
        'INSERT INTO content_reviews VALUES (?, ?, ?, ?, ?, ?)',
        (
            data["id"],
            data["content_id"],
            data["reviewer"],
            data["status"],
            data["comments"],
            data["timestamp"]
        )
    )
    self.conn.commit()
    return review

def update_review(self, review: ContentReview) -> ContentReview:
    review.timestamp = int(time.time())
    data = review.to_dict()
    
    cursor = self.conn.cursor()
    cursor.execute(
        '''UPDATE content_reviews
           SET status = ?, comments = ?, timestamp = ?
           WHERE id = ?''',
        (
            data["status"],
            data["comments"],
            data["timestamp"],
            data["id"]
        )
    )
    self.conn.commit()
    return review

def get_content_reviews(self, content_id: str) -> List[ContentReview]:
    cursor = self.conn.cursor()
    cursor.execute(
        'SELECT * FROM content_reviews WHERE content_id = ? ORDER BY timestamp DESC',
        (content_id,)
    )
    rows = cursor.fetchall()
    
    return [ContentReview.from_dict({
        "id": row["id"],
        "content_id": row["content_id"],
        "reviewer": row["reviewer"],
        "status": row["status"],
        "comments": row["comments"],
        "timestamp": row["timestamp"]
    }) for row in rows]

def get_pending_reviews(self) -> List[ContentReview]:
    cursor = self.conn.cursor()
    cursor.execute(
        'SELECT * FROM content_reviews WHERE status = ? ORDER BY timestamp ASC',
        ('pending',)
    )
    rows = cursor.fetchall()
    
    return [ContentReview.from_dict({
        "id": row["id"],
        "content_id": row["content_id"],
        "reviewer": row["reviewer"],
        "status": row["status"],
        "comments": row["comments"],
        "timestamp": row["timestamp"]
    }) for row in rows]

实现版本管理服务

现在,我们创建一个版本管理服务来处理内容版本控制。在services/version_service.py中:

import difflib
from typing import Dict, Any, List, Optional, Tuple

from database.db_manager import DatabaseManager
from database.models import Content, ContentVersion
from services.content_service import ContentService


class VersionService:
    def __init__(self, db_manager: DatabaseManager, content_service: ContentService):
        self.db_manager = db_manager
        self.content_service = content_service
    
    def create_version(self, content_id: str, body: str, author: str, 
                       comment: str) -> ContentVersion:
        """创建内容的新版本"""
        # 确认内容存在
        content = self.content_service.get_content(content_id)
        if not content:
            raise ValueError(f"Content not found: {content_id}")
        
        # 获取下一个版本号
        next_revision = self.db_manager.get_next_revision(content_id)
        
        # 创建新的版本记录
        version = ContentVersion(
            id=None,  # 由数据库管理器生成
            content_id=content_id,
            revision=next_revision,
            body=body,
            comment=comment,
            author=author
        )
        
        return self.db_manager.create_version(version)
    
    def get_content_versions(self, content_id: str) -> List[ContentVersion]:
        """获取内容的所有版本"""
        return self.db_manager.get_content_versions(content_id)
    
    def get_content_version(self, content_id: str, revision: int) -> Optional[ContentVersion]:
        """获取内容的特定版本"""
        return self.db_manager.get_content_version(content_id, revision)
    
    def compare_versions(self, content_id: str, from_revision: int, 
                         to_revision: int) -> Dict[str, Any]:
        """比较两个版本之间的差异"""
        from_version = self.get_content_version(content_id, from_revision)
        to_version = self.get_content_version(content_id, to_revision)
        
        if not from_version or not to_version:
            raise ValueError(f"Version not found for content: {content_id}")
        
        # 使用difflib比较文本差异
        from_lines = from_version.body.splitlines()
        to_lines = to_version.body.splitlines()
        
        diff = list(difflib.unified_diff(
            from_lines,
            to_lines,
            fromfile=f"revision-{from_revision}",
            tofile=f"revision-{to_revision}",
            lineterm=""
        ))
        
        # 计算统计信息
        stats = self._calculate_diff_stats(from_lines, to_lines)
        
        return {
            "from_revision": from_revision,
            "to_revision": to_revision,
            "diff": diff,
            "stats": stats,
            "from_timestamp": from_version.timestamp,
            "to_timestamp": to_version.timestamp
        }
    
    def restore_version(self, content_id: str, revision: int, author: str) -> Content:
        """将内容还原到特定版本"""
        version = self.get_content_version(content_id, revision)
        if not version:
            raise ValueError(f"Version {revision} not found for content: {content_id}")
        
        # 更新内容
        content = self.content_service.update_content(
            content_id=content_id,
            body=version.body,
            metadata={"restored_from_revision": revision}
        )
        
        # 创建新版本记录
        self.create_version(
            content_id=content_id,
            body=version.body,
            author=author,
            comment=f"Restored from revision {revision}"
        )
        
        return content
    
    def _calculate_diff_stats(self, from_lines: List[str], to_lines: List[str]) -> Dict[str, int]:
        """计算差异统计信息"""
        s = difflib.SequenceMatcher(None, from_lines, to_lines)
        
        # 计算添加、删除和修改的行数
        added = 0
        deleted = 0
        
        for tag, i1, i2, j1, j2 in s.get_opcodes():
            if tag == 'insert':
                added += (j2 - j1)
            elif tag == 'delete':
                deleted += (i2 - i1)
            elif tag == 'replace':
                added += (j2 - j1)
                deleted += (i2 - i1)
        
        return {
            "added": added,
            "deleted": deleted,
            "changed": added + deleted
        }

实现审核服务

接下来,我们实现一个审核服务来管理内容审核流程。在services/review_service.py中:

from typing import Dict, Any, List, Optional

from database.db_manager import DatabaseManager
from database.models import Content, ContentReview
from services.content_service import ContentService


class ReviewService:
    def __init__(self, db_manager: DatabaseManager, content_service: ContentService):
        self.db_manager = db_manager
        self.content_service = content_service
    
    def submit_for_review(self, content_id: str, reviewer: str) -> ContentReview:
        """提交内容进行审核"""
        content = self.content_service.get_content(content_id)
        if not content:
            raise ValueError(f"Content not found: {content_id}")
        
        # 更新内容状态
        self.content_service.update_content(
            content_id=content_id,
            status="review"
        )
        
        # 创建审核记录
        review = ContentReview(
            id=None,  # 由数据库管理器生成
            content_id=content_id,
            reviewer=reviewer,
            status="pending",
            comments=""
        )
        
        return self.db_manager.create_review(review)
    
    def approve_content(self, review_id: str, comments: str = "") -> Tuple[ContentReview, Content]:
        """批准内容"""
        review = self._get_review(review_id)
        if not review:
            raise ValueError(f"Review not found: {review_id}")
        
        if review.status != "pending":
            raise ValueError(f"Review is not pending, current status: {review.status}")
        
        # 更新审核状态
        review.status = "approved"
        review.comments = comments
        updated_review = self.db_manager.update_review(review)
        
        # 更新内容状态
        content = self.content_service.update_content(
            content_id=review.content_id,
            status="published",
            metadata={"approved_by": review.reviewer}
        )
        
        return updated_review, content
    
    def reject_content(self, review_id: str, comments: str) -> Tuple[ContentReview, Content]:
        """拒绝内容"""
        review = self._get_review(review_id)
        if not review:
            raise ValueError(f"Review not found: {review_id}")
        
        if review.status != "pending":
            raise ValueError(f"Review is not pending, current status: {review.status}")
        
        # 更新审核状态
        review.status = "rejected"
        review.comments = comments
        updated_review = self.db_manager.update_review(review)
        
        # 更新内容状态(返回到草稿状态)
        content = self.content_service.update_content(
            content_id=review.content_id,
            status="draft",
            metadata={"rejected_by": review.reviewer, "rejection_reason": comments}
        )
        
        return updated_review, content
    
    def get_content_reviews(self, content_id: str) -> List[ContentReview]:
        """获取内容的所有审核记录"""
        return self.db_manager.get_content_reviews(content_id)
    
    def get_pending_reviews(self) -> List[ContentReview]:
        """获取所有待处理的审核"""
        return self.db_manager.get_pending_reviews()
    
    def _get_review(self, review_id: str) -> Optional[ContentReview]:
        """从数据库获取审核记录"""
        # 这个方法需要实现,但在当前的db_manager中缺少,我们可以添加它
        cursor = self.db_manager.conn.cursor()
        cursor.execute('SELECT * FROM content_reviews WHERE id = ?', (review_id,))
        row = cursor.fetchone()
        
        if row:
            return ContentReview.from_dict({
                "id": row["id"],
                "content_id": row["content_id"],
                "reviewer": row["reviewer"],
                "status": row["status"],
                "comments": row["comments"],
                "timestamp": row["timestamp"]
            })
        return None

创建差异比较工具

为了支持内容版本的比较和可视化,我们实现一个差异比较工具。在utils/diff_tool.py中:

import difflib
from typing import List, Dict, Any, Tuple


def create_html_diff(from_text: str, to_text: str) -> str:
    """创建HTML格式的差异比较视图"""
    from_lines = from_text.splitlines()
    to_lines = to_text.splitlines()
    
    # 创建HTML格式的差异
    diff = difflib.HtmlDiff()
    return diff.make_file(from_lines, to_lines, "旧版本", "新版本", True)


def create_text_diff(from_text: str, to_text: str) -> str:
    """创建文本格式的差异比较视图"""
    from_lines = from_text.splitlines()
    to_lines = to_text.splitlines()
    
    # 创建统一格式的差异
    diff = list(difflib.unified_diff(
        from_lines,
        to_lines,
        fromfile="旧版本",
        tofile="新版本",
        lineterm=""
    ))
    
    return "\n".join(diff)


def analyze_diff(from_text: str, to_text: str) -> Dict[str, Any]:
    """分析两个文本之间的差异并提供详细统计"""
    from_lines = from_text.splitlines()
    to_lines = to_text.splitlines()
    
    # 使用SequenceMatcher进行详细分析
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
    
    # 初始化统计信息
    stats = {
        "total_lines_before": len(from_lines),
        "total_lines_after": len(to_lines),
        "unchanged_lines": 0,
        "added_lines": 0,
        "deleted_lines": 0,
        "changed_lines": 0,
        "changes": []
    }
    
    # 分析每个变更块
    for tag, i1, i2, j1, j2 in s.get_opcodes():
        if tag == 'equal':
            stats["unchanged_lines"] += (i2 - i1)
        elif tag == 'insert':
            stats["added_lines"] += (j2 - j1)
            stats["changes"].append({
                "type": "insert",
                "from_line": i1 + 1,
                "to_line": i1 + 1,
                "content": to_lines[j1:j2]
            })
        elif tag == 'delete':
            stats["deleted_lines"] += (i2 - i1)
            stats["changes"].append({
                "type": "delete",
                "from_line": i1 + 1,
                "to_line": i2,
                "content": from_lines[i1:i2]
            })
        elif tag == 'replace':
            stats["changed_lines"] += max(i2 - i1, j2 - j1)
            stats["changes"].append({
                "type": "replace",
                "from_line": i1 + 1,
                "to_line": i2,
                "old_content": from_lines[i1:i2],
                "new_content": to_lines[j1:j2]
            })
    
    # 计算相似度
    stats["similarity"] = round(s.ratio() * 100, 2)
    
    return stats

通过这些组件,我们实现了内容的编辑和审核工作流程。在下一节中,我们将实现内容版本管理的功能。

步骤3:内容版本管理

在完成了基本的内容生成和审核流程后,我们需要实现一个健壮的版本管理系统,以便跟踪和管理内容的变更历史。

创建MCP服务器

现在,让我们将所有组件整合到一个MCP服务器中。在mcp_server.py文件中:

import json
import os
from typing import Dict, Any, List, Optional

from mcp.server import Server, Context
from mcp.transports import StdIOTransport, ServerSentEventsTransport

from database.db_manager import DatabaseManager
from services.template_service import TemplateService
from services.content_service import ContentService
from services.version_service import VersionService
from services.review_service import ReviewService
from utils.formatter import (
    format_markdown, validate_markdown, markdown_to_html, extract_metadata
)
from utils.diff_tool import create_html_diff, create_text_diff, analyze_diff


class ContentManagementServer:
    def __init__(self):
        # 确保数据目录存在
        os.makedirs("data", exist_ok=True)
        
        # 初始化数据库和服务
        self.db_manager = DatabaseManager("data/content.db")
        self.template_service = TemplateService(self.db_manager)
        self.content_service = ContentService(self.db_manager, self.template_service)
        self.version_service = VersionService(self.db_manager, self.content_service)
        self.review_service = ReviewService(self.db_manager, self.content_service)
        
        # 创建MCP服务器
        self.server = Server()
        
        # 注册资源
        self._register_resources()
        
        # 注册工具
        self._register_tools()
    
    def _register_resources(self):
        """注册系统资源"""
        # 模板资源
        @self.server.resource("/templates")
        async def get_templates(ctx: Context):
            templates = self.template_service.list_templates()
            return [t.to_dict() for t in templates]
        
        @self.server.resource("/templates/{id}")
        async def get_template(ctx: Context, id: str):
            template = self.template_service.get_template(id)
            if not template:
                ctx.status(404)
                return {"error": f"Template not found: {id}"}
            return template.to_dict()
        
        # 内容资源
        @self.server.resource("/contents")
        async def get_contents(ctx: Context):
            status = ctx.query("status")
            contents = self.content_service.list_contents(status)
            return [c.to_dict() for c in contents]
        
        @self.server.resource("/contents/{id}")
        async def get_content(ctx: Context, id: str):
            content = self.content_service.get_content(id)
            if not content:
                ctx.status(404)
                return {"error": f"Content not found: {id}"}
            return content.to_dict()
        
        @self.server.resource("/contents/{id}/versions")
        async def get_content_versions(ctx: Context, id: str):
            versions = self.version_service.get_content_versions(id)
            return [v.to_dict() for v in versions]
        
        @self.server.resource("/contents/{id}/versions/{revision}")
        async def get_content_version(ctx: Context, id: str, revision: str):
            version = self.version_service.get_content_version(id, int(revision))
            if not version:
                ctx.status(404)
                return {"error": f"Version {revision} not found for content: {id}"}
            return version.to_dict()
        
        @self.server.resource("/contents/{id}/reviews")
        async def get_content_reviews(ctx: Context, id: str):
            reviews = self.review_service.get_content_reviews(id)
            return [r.to_dict() for r in reviews]
        
        @self.server.resource("/reviews/pending")
        async def get_pending_reviews(ctx: Context):
            reviews = self.review_service.get_pending_reviews()
            return [r.to_dict() for r in reviews]
    
    def _register_tools(self):
        """注册系统工具"""
        # 模板工具
        @self.server.tool("createTemplate")
        async def create_template(
            name: str,
            description: str,
            schema: Dict[str, Any],
            content: str
        ) -> Dict[str, Any]:
            """创建新的内容模板"""
            template = self.template_service.create_template(
                name=name,
                description=description,
                schema=schema,
                content=content
            )
            return template.to_dict()
        
        # 内容生成工具
        @self.server.tool("generateContent")
        async def generate_content(
            template_id: str,
            title: str,
            variables: Dict[str, Any],
            author: str
        ) -> Dict[str, Any]:
            """基于模板生成新内容"""
            content = self.content_service.generate_content(
                template_id=template_id,
                title=title,
                variables=variables,
                author=author
            )
            return content.to_dict()
        
        # 内容编辑工具
        @self.server.tool("updateContent")
        async def update_content(
            content_id: str,
            title: str = None,
            body: str = None,
            status: str = None,
            metadata: Dict[str, Any] = None
        ) -> Dict[str, Any]:
            """更新内容"""
            content = self.content_service.update_content(
                content_id=content_id,
                title=title,
                body=body,
                status=status,
                metadata=metadata
            )
            return content.to_dict()
        
        # 格式化工具
        @self.server.tool("formatContent")
        async def format_content(text: str) -> str:
            """格式化Markdown内容"""
            return format_markdown(text)
        
        @self.server.tool("validateContent")
        async def validate_content(text: str) -> List[Dict[str, Any]]:
            """验证Markdown内容"""
            return validate_markdown(text)
        
        @self.server.tool("convertToHtml")
        async def convert_to_html(text: str) -> str:
            """将Markdown转换为HTML"""
            return markdown_to_html(text)
        
        @self.server.tool("extractMetadata")
        async def extract_content_metadata(text: str) -> Dict[str, Any]:
            """从内容中提取元数据"""
            return extract_metadata(text)
        
        # 版本管理工具
        @self.server.tool("createVersion")
        async def create_version(
            content_id: str,
            body: str,
            author: str,
            comment: str
        ) -> Dict[str, Any]:
            """创建内容的新版本"""
            version = self.version_service.create_version(
                content_id=content_id,
                body=body,
                author=author,
                comment=comment
            )
            return version.to_dict()
        
        @self.server.tool("compareVersions")
        async def compare_versions(
            content_id: str,
            from_revision: int,
            to_revision: int
        ) -> Dict[str, Any]:
            """比较两个版本之间的差异"""
            return self.version_service.compare_versions(
                content_id=content_id,
                from_revision=from_revision,
                to_revision=to_revision
            )
        
        @self.server.tool("restoreVersion")
        async def restore_version(
            content_id: str,
            revision: int,
            author: str
        ) -> Dict[str, Any]:
            """将内容还原到特定版本"""
            content = self.version_service.restore_version(
                content_id=content_id,
                revision=revision,
                author=author
            )
            return content.to_dict()
        
        @self.server.tool("createHtmlDiff")
        async def create_html_version_diff(
            from_text: str,
            to_text: str
        ) -> str:
            """创建HTML格式的差异比较视图"""
            return create_html_diff(from_text, to_text)
        
        @self.server.tool("createTextDiff")
        async def create_text_version_diff(
            from_text: str,
            to_text: str
        ) -> str:
            """创建文本格式的差异比较视图"""
            return create_text_diff(from_text, to_text)
        
        @self.server.tool("analyzeDiff")
        async def analyze_version_diff(
            from_text: str,
            to_text: str
        ) -> Dict[str, Any]:
            """分析两个文本之间的差异"""
            return analyze_diff(from_text, to_text)
        
        # 审核工具
        @self.server.tool("submitForReview")
        async def submit_for_review(
            content_id: str,
            reviewer: str
        ) -> Dict[str, Any]:
            """提交内容进行审核"""
            review = self.review_service.submit_for_review(
                content_id=content_id,
                reviewer=reviewer
            )
            return review.to_dict()
        
        @self.server.tool("approveContent")
        async def approve_content(
            review_id: str,
            comments: str = ""
        ) -> Dict[str, Any]:
            """批准内容"""
            review, content = self.review_service.approve_content(
                review_id=review_id,
                comments=comments
            )
            return {
                "review": review.to_dict(),
                "content": content.to_dict()
            }
        
        @self.server.tool("rejectContent")
        async def reject_content(
            review_id: str,
            comments: str
        ) -> Dict[str, Any]:
            """拒绝内容"""
            review, content = self.review_service.reject_content(
                review_id=review_id,
                comments=comments
            )
            return {
                "review": review.to_dict(),
                "content": content.to_dict()
            }
    
    def run_stdio(self):
        """使用标准输入/输出运行服务器"""
        transport = StdIOTransport()
        self.server.serve(transport)
    
    def run_sse(self, host="localhost", port=8000):
        """使用SSE运行服务器"""
        transport = ServerSentEventsTransport(host=host, port=port)
        self.server.serve(transport)
    
    def close(self):
        """关闭服务器和数据库连接"""
        self.db_manager.close()


if __name__ == "__main__":
    import sys
    
    server = ContentManagementServer()
    
    try:
        # 根据命令行参数选择传输方式
        if len(sys.argv) > 1 and sys.argv[1] == "sse":
            print("Starting server with SSE transport on http://localhost:8000")
            server.run_sse()
        else:
            print("Starting server with StdIO transport")
            server.run_stdio()
    finally:
        server.close()

创建示例客户端

为了演示如何使用我们的MCP服务器,我们创建一个示例客户端。在example_client.py文件中:

import asyncio
import json
import uuid
from typing import Dict, Any, List

from mcp.client import Client
from mcp.transports import StdIOClientTransport, ServerSentEventsClientTransport


async def run_content_workflow():
    """运行完整的内容创建、编辑和发布工作流"""
    # 连接到MCP服务器
    print("Connecting to content management server...")
    client = Client()
    
    # 使用相同的传输方式
    # transport = StdIOClientTransport()  # 如果服务器使用StdIO
    transport = ServerSentEventsClientTransport("http://localhost:8000")  # 如果服务器使用SSE
    
    await client.connect(transport)
    print("Connected!")
    
    try:
        # 1. 创建一个博客文章模板
        template_id = str(uuid.uuid4())
        print(f"Creating blog post template (ID: {template_id})...")
        
        await client.tools.createTemplate(
            name="Blog Post",
            description="通用博客文章模板",
            schema={
                "title": {"type": "string", "required": True, "maxLength": 100},
                "summary": {"type": "string", "required": True, "maxLength": 200},
                "tags": {"type": "array", "required": False},
                "category": {
                    "type": "string", 
                    "required": True,
                    "enum": ["技术", "观点", "教程", "新闻"]
                }
            },
            content="""---
title: $title
category: $category
tags: $tags
---

# $title

## 摘要

$summary

## 正文

在这里添加文章的主要内容...

## 结论

在这里添加结论...
"""
        )
        
        # 2. 获取可用模板
        print("Fetching available templates...")
        templates = await client.resources.get("/templates")
        print(f"Found {len(templates)} templates")
        
        # 3. 生成新内容
        print("Generating new content based on template...")
        content = await client.tools.generateContent(
            template_id=template_id,
            title="使用MCP构建内容管理系统",
            variables={
                "title": "使用MCP构建内容管理系统",
                "summary": "本文介绍如何使用MCP协议构建一个灵活的内容管理系统,包括模板、版本控制和审核流程。",
                "tags": ["MCP", "内容管理", "Python"],
                "category": "教程"
            },
            author="MCP教程作者"
        )
        
        content_id = content["id"]
        print(f"Generated content with ID: {content_id}")
        
        # 4. 编辑内容
        print("Editing content...")
        edited_body = content["body"].replace(
            "在这里添加文章的主要内容...",
            """
本教程将带你了解如何使用MCP协议构建一个完整的内容管理系统。

### MCP的优势

使用MCP构建内容系统有以下优势:

1. 模块化架构
2. 灵活的资源定义
3. 强大的工具集成
4. 简单的通信模型

### 系统架构

我们的系统包含以下核心组件:
- 模板引擎
- 内容服务
- 版本控制
- 审核工作流
            """
        )
        
        # 格式化内容
        print("Formatting content...")
        formatted_body = await client.tools.formatContent(edited_body)
        
        # 验证内容
        print("Validating content...")
        validation_issues = await client.tools.validateContent(formatted_body)
        
        if validation_issues:
            print(f"Found {len(validation_issues)} issues:")
            for issue in validation_issues:
                print(f"- {issue['message']}")
        
        # 更新内容
        print("Updating content...")
        await client.tools.updateContent(
            content_id=content_id,
            body=formatted_body
        )
        
        # 5. 创建版本
        print("Creating version...")
        version = await client.tools.createVersion(
            content_id=content_id,
            body=formatted_body,
            author="MCP教程作者",
            comment="完成了文章主体内容"
        )
        
        print(f"Created version {version['revision']}")
        
        # 6. 进一步编辑内容
        print("Making additional edits...")
        further_edited_body = formatted_body.replace(
            "### 系统架构",
            "### 系统架构设计"
        ).replace(
            "- 模板引擎\n- 内容服务\n- 版本控制\n- 审核工作流",
            "- 模板引擎:管理和应用内容模板\n- 内容服务:处理内容的创建和存储\n- 版本控制:跟踪内容的变更历史\n- 审核工作流:管理内容的审核和发布流程"
        )
        
        # 更新内容
        print("Updating with additional edits...")
        await client.tools.updateContent(
            content_id=content_id,
            body=further_edited_body
        )
        
        # 创建新版本
        print("Creating another version...")
        version2 = await client.tools.createVersion(
            content_id=content_id,
            body=further_edited_body,
            author="MCP教程作者",
            comment="完善了系统架构部分"
        )
        
        print(f"Created version {version2['revision']}")
        
        # 7. 比较版本
        print("Comparing versions...")
        comparison = await client.tools.compareVersions(
            content_id=content_id,
            from_revision=1,
            to_revision=2
        )
        
        print(f"Diff stats: {comparison['stats']}")
        
        # 8. 提交审核
        print("Submitting for review...")
        review = await client.tools.submitForReview(
            content_id=content_id,
            reviewer="内容审核员"
        )
        
        print(f"Submitted for review (ID: {review['id']})")
        
        # 9. 批准内容
        print("Approving content...")
        approval_result = await client.tools.approveContent(
            review_id=review['id'],
            comments="内容符合规范,批准发布"
        )
        
        print("Content approved and published!")
        
        # 10. 获取最终内容
        print("Fetching final content...")
        final_content = await client.resources.get(f"/contents/{content_id}")
        
        print(f"Final content status: {final_content['status']}")
        
        # 11. 转换为HTML(可选)
        print("Converting to HTML...")
        html_content = await client.tools.convertToHtml(final_content["body"])
        
        print("Workflow completed successfully!")
        
    finally:
        # 断开连接
        await client.disconnect()
        print("Disconnected from server")


if __name__ == "__main__":
    asyncio.run(run_content_workflow())

启动和测试系统

现在我们可以启动服务器和客户端来测试我们的内容管理系统:

  1. 首先,启动服务器:
python mcp_server.py sse
  1. 然后,在另一个终端中运行客户端:
python example_client.py

你应该能够看到客户端执行整个内容创建、编辑和发布的工作流程,从创建模板到发布内容。

完整项目代码

我们已经构建了一个完整的内容生成与编辑系统,其中包含以下核心文件:

  • mcp_server.py:MCP服务器入口
  • database/db_manager.py:数据库管理器
  • database/models.py:数据模型定义
  • services/template_service.py:模板服务
  • services/content_service.py:内容服务
  • services/version_service.py:版本管理服务
  • services/review_service.py:审核服务
  • utils/formatter.py:内容格式化工具
  • utils/diff_tool.py:版本差异工具
  • example_client.py:示例客户端

这些组件共同构成了一个完整的内容管理系统,可以应用于博客平台、文档系统或其他需要内容生成和管理的场景。

项目解析

本项目演示了MCP在实际应用中的强大功能和灵活性:

  1. 模块化设计:我们将系统分解为多个独立的服务,每个服务负责特定的功能,同时通过MCP协议无缝协作。

  2. 资源抽象:使用MCP资源路径来抽象数据访问,提供了清晰的API界面。

  3. 工具集成:MCP工具为系统提供了强大的功能支持,包括内容生成、格式化、版本管理和审核等。

  4. 传输灵活性:系统支持多种传输方式(StdIO和SSE),可以在不同环境中灵活部署。

  5. 数据流设计:系统中的数据流非常清晰,从模板到内容,再到版本和审核,形成了一个完整的工作流。

后续扩展

这个系统还有很多可以扩展的方向:

  1. 用户认证与权限:添加用户系统和权限控制,限制不同用户对内容的访问和编辑权限。

  2. 搜索功能:实现全文搜索以便快速查找内容。

  3. 媒体资源管理:扩展系统以支持图片、视频等媒体资源的管理。

  4. 多语言支持:为内容添加多语言支持,管理不同语言版本的内容。

  5. 内容分析:添加分析工具,跟踪内容的使用情况和效果。

  6. Web界面:构建一个Web界面,使非技术用户也能使用系统。

小结

在本节中,我们构建了一个完整的内容生成与编辑系统,展示了如何使用MCP协议构建实际应用。通过这个项目,你学习了:

  1. 如何设计和实现基于模板的内容生成系统
  2. 如何构建内容编辑和审核工作流
  3. 如何实现版本控制和差异比较功能
  4. 如何将这些组件整合到一个完整的MCP服务中

这个系统虽然简单,但包含了内容管理系统的核心功能,可以作为更复杂系统的基础。你可以根据自己的需求扩展这个系统,添加更多功能或集成到现有应用中。

使用 Hugo 构建
主题 StackJimmy 设计