4.2 内容生成与编辑系统
目录
简介
在本节中,我们将构建一个完整的内容生成与编辑系统,利用MCP协议来连接内容创建、编辑和管理的各个环节。这个系统将演示如何使用MCP来构建一个实用的内容管理工作流,适用于博客平台、文档系统或其他需要结构化内容处理的应用场景。
系统将包含以下核心功能:
- 基于模板的内容生成
- 内容编辑与格式化工具
- 审核工作流程
- 版本控制与历史记录
- 内容发布与管理
通过这个项目,你将学习如何将MCP应用于实际业务流程,同时掌握复杂系统中各组件协同工作的最佳实践。
系统设计概述
系统架构
我们的内容生成与编辑系统将采用模块化架构,包含以下核心组件:
- 内容模板服务: 管理各类内容模板,提供模板选择和填充功能
- 内容生成引擎: 基于模板和用户输入生成初始内容
- 编辑工作台: 提供内容编辑、格式化和增强功能
- 审核系统: 处理内容审核和反馈流程
- 版本管理器: 追踪内容更改历史并支持版本回滚
- 存储服务: 持久化存储内容和相关元数据
这些组件将通过MCP协议进行通信,形成一个完整的工作流系统。
技术栈
在本项目中,我们将使用以下技术:
- Python: 作为主要的开发语言
- MCP: 用于组件间通信和工具集成
- SQLite: 作为内容和元数据的轻量级存储
- Markdown: 作为内容的默认格式
- Flask: 提供简单的Web界面(可选)
数据模型
系统的核心数据模型包括:
Content {
id: string,
title: string,
body: string,
template_id: string,
author: string,
status: enum(draft, review, published, archived),
created_at: timestamp,
updated_at: timestamp,
metadata: object
}
Template {
id: string,
name: string,
description: string,
schema: object, // 定义模板需要的字段和规则
content: string // 包含占位符的模板内容
}
Version {
id: string,
content_id: string,
revision: number,
body: string,
comment: string,
author: string,
timestamp: timestamp
}
Review {
id: string,
content_id: string,
reviewer: string,
status: enum(pending, approved, rejected),
comments: string,
timestamp: timestamp
}
这个数据模型将支持我们系统的所有核心功能,同时保持足够的灵活性以适应不同的内容类型和工作流程。
MCP资源与工具
我们将为系统定义以下MCP资源和工具:
资源 (Resources):
/templates: 内容模板集合/templates/{id}: 特定模板/contents: 内容集合/contents/{id}: 特定内容/contents/{id}/versions: 内容的版本历史/contents/{id}/reviews: 内容的审核记录
工具 (Tools):
generateContent: 基于模板生成内容formatContent: 格式化和规范化内容validateContent: 验证内容是否符合规则createVersion: 创建内容的新版本compareVersions: 比较两个版本的差异submitForReview: 提交内容进行审核publishContent: 发布已审核的内容
在接下来的章节中,我们将逐步实现这些组件和功能,构建一个完整的内容生成与编辑系统。
步骤1:构建内容模板和生成工具
首先,我们需要创建内容模板系统和基于这些模板的内容生成工具。这些组件将成为我们系统的基础。
设置项目结构
让我们创建一个清晰的项目结构:
content_system/
│
├── mcp_server.py # MCP服务器入口
├── database/
│ ├── __init__.py
│ ├── db_manager.py # 数据库管理
│ └── models.py # 数据模型
│
├── services/
│ ├── __init__.py
│ ├── template_service.py # 模板服务
│ └── content_service.py # 内容服务
│
├── utils/
│ ├── __init__.py
│ └── formatter.py # 格式化工具
│
└── data/
└── content.db # SQLite数据库
实现数据库模型
首先,我们需要创建数据库模型来存储模板和内容。在database/models.py中:
import sqlite3
import json
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Union
@dataclass
class Template:
id: str
name: str
description: str
schema: Dict[str, Any]
content: str
created_at: int = None
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"description": self.description,
"schema": self.schema,
"content": self.content,
"created_at": self.created_at or int(time.time())
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Template':
return cls(
id=data["id"],
name=data["name"],
description=data["description"],
schema=data["schema"],
content=data["content"],
created_at=data.get("created_at")
)
@dataclass
class Content:
id: str
title: str
body: str
template_id: str
author: str
status: str # 'draft', 'review', 'published', 'archived'
metadata: Dict[str, Any]
created_at: int = None
updated_at: int = None
def to_dict(self) -> Dict[str, Any]:
now = int(time.time())
return {
"id": self.id,
"title": self.title,
"body": self.body,
"template_id": self.template_id,
"author": self.author,
"status": self.status,
"metadata": self.metadata,
"created_at": self.created_at or now,
"updated_at": self.updated_at or now
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Content':
return cls(
id=data["id"],
title=data["title"],
body=data["body"],
template_id=data["template_id"],
author=data["author"],
status=data["status"],
metadata=data["metadata"],
created_at=data.get("created_at"),
updated_at=data.get("updated_at")
)
创建数据库管理器
接下来,我们实现数据库管理器来处理数据存储。在database/db_manager.py中:
import sqlite3
import json
import uuid
import time
from typing import List, Dict, Any, Optional
from pathlib import Path
from .models import Template, Content
class DatabaseManager:
def __init__(self, db_path="data/content.db"):
# 确保数据目录存在
Path(db_path).parent.mkdir(exist_ok=True)
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self._create_tables()
def _create_tables(self):
cursor = self.conn.cursor()
# 创建模板表
cursor.execute('''
CREATE TABLE IF NOT EXISTS templates (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
schema TEXT NOT NULL,
content TEXT NOT NULL,
created_at INTEGER NOT NULL
)
''')
# 创建内容表
cursor.execute('''
CREATE TABLE IF NOT EXISTS contents (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
body TEXT NOT NULL,
template_id TEXT NOT NULL,
author TEXT NOT NULL,
status TEXT NOT NULL,
metadata TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY (template_id) REFERENCES templates (id)
)
''')
# 创建版本表
cursor.execute('''
CREATE TABLE IF NOT EXISTS content_versions (
id TEXT PRIMARY KEY,
content_id TEXT NOT NULL,
revision INTEGER NOT NULL,
body TEXT NOT NULL,
comment TEXT,
author TEXT NOT NULL,
timestamp INTEGER NOT NULL,
FOREIGN KEY (content_id) REFERENCES contents (id),
UNIQUE (content_id, revision)
)
''')
# 创建审核表
cursor.execute('''
CREATE TABLE IF NOT EXISTS content_reviews (
id TEXT PRIMARY KEY,
content_id TEXT NOT NULL,
reviewer TEXT NOT NULL,
status TEXT NOT NULL,
comments TEXT,
timestamp INTEGER NOT NULL,
FOREIGN KEY (content_id) REFERENCES contents (id)
)
''')
self.conn.commit()
def close(self):
self.conn.close()
# 模板操作
def create_template(self, template: Template) -> Template:
if not template.id:
template.id = str(uuid.uuid4())
template.created_at = int(time.time())
data = template.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'INSERT INTO templates VALUES (?, ?, ?, ?, ?, ?)',
(
data["id"],
data["name"],
data["description"],
json.dumps(data["schema"]),
data["content"],
data["created_at"]
)
)
self.conn.commit()
return template
def get_template(self, template_id: str) -> Optional[Template]:
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM templates WHERE id = ?', (template_id,))
row = cursor.fetchone()
if row:
return Template.from_dict({
"id": row["id"],
"name": row["name"],
"description": row["description"],
"schema": json.loads(row["schema"]),
"content": row["content"],
"created_at": row["created_at"]
})
return None
def list_templates(self) -> List[Template]:
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM templates')
rows = cursor.fetchall()
return [Template.from_dict({
"id": row["id"],
"name": row["name"],
"description": row["description"],
"schema": json.loads(row["schema"]),
"content": row["content"],
"created_at": row["created_at"]
}) for row in rows]
# 内容操作
def create_content(self, content: Content) -> Content:
if not content.id:
content.id = str(uuid.uuid4())
now = int(time.time())
content.created_at = now
content.updated_at = now
data = content.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'INSERT INTO contents VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
(
data["id"],
data["title"],
data["body"],
data["template_id"],
data["author"],
data["status"],
json.dumps(data["metadata"]),
data["created_at"],
data["updated_at"]
)
)
self.conn.commit()
return content
def update_content(self, content: Content) -> Content:
content.updated_at = int(time.time())
data = content.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'''UPDATE contents
SET title = ?, body = ?, status = ?, metadata = ?, updated_at = ?
WHERE id = ?''',
(
data["title"],
data["body"],
data["status"],
json.dumps(data["metadata"]),
data["updated_at"],
data["id"]
)
)
self.conn.commit()
return content
def get_content(self, content_id: str) -> Optional[Content]:
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM contents WHERE id = ?', (content_id,))
row = cursor.fetchone()
if row:
return Content.from_dict({
"id": row["id"],
"title": row["title"],
"body": row["body"],
"template_id": row["template_id"],
"author": row["author"],
"status": row["status"],
"metadata": json.loads(row["metadata"]),
"created_at": row["created_at"],
"updated_at": row["updated_at"]
})
return None
def list_contents(self, status=None) -> List[Content]:
cursor = self.conn.cursor()
if status:
cursor.execute('SELECT * FROM contents WHERE status = ?', (status,))
else:
cursor.execute('SELECT * FROM contents')
rows = cursor.fetchall()
return [Content.from_dict({
"id": row["id"],
"title": row["title"],
"body": row["body"],
"template_id": row["template_id"],
"author": row["author"],
"status": row["status"],
"metadata": json.loads(row["metadata"]),
"created_at": row["created_at"],
"updated_at": row["updated_at"]
}) for row in rows]
# 版本控制操作
def create_version(self, version: ContentVersion) -> ContentVersion:
if not version.id:
version.id = str(uuid.uuid4())
version.timestamp = int(time.time())
data = version.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'INSERT INTO content_versions VALUES (?, ?, ?, ?, ?, ?, ?)',
(
data["id"],
data["content_id"],
data["revision"],
data["body"],
data["comment"],
data["author"],
data["timestamp"]
)
)
self.conn.commit()
return version
def get_content_versions(self, content_id: str) -> List[ContentVersion]:
cursor = self.conn.cursor()
cursor.execute(
'SELECT * FROM content_versions WHERE content_id = ? ORDER BY revision DESC',
(content_id,)
)
rows = cursor.fetchall()
return [ContentVersion.from_dict({
"id": row["id"],
"content_id": row["content_id"],
"revision": row["revision"],
"body": row["body"],
"comment": row["comment"],
"author": row["author"],
"timestamp": row["timestamp"]
}) for row in rows]
def get_content_version(self, content_id: str, revision: int) -> Optional[ContentVersion]:
cursor = self.conn.cursor()
cursor.execute(
'SELECT * FROM content_versions WHERE content_id = ? AND revision = ?',
(content_id, revision)
)
row = cursor.fetchone()
if row:
return ContentVersion.from_dict({
"id": row["id"],
"content_id": row["content_id"],
"revision": row["revision"],
"body": row["body"],
"comment": row["comment"],
"author": row["author"],
"timestamp": row["timestamp"]
})
return None
def get_next_revision(self, content_id: str) -> int:
cursor = self.conn.cursor()
cursor.execute(
'SELECT MAX(revision) as max_rev FROM content_versions WHERE content_id = ?',
(content_id,)
)
row = cursor.fetchone()
if row and row["max_rev"] is not None:
return row["max_rev"] + 1
return 1 # 第一个版本
# 审核操作
def create_review(self, review: ContentReview) -> ContentReview:
if not review.id:
review.id = str(uuid.uuid4())
review.timestamp = int(time.time())
data = review.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'INSERT INTO content_reviews VALUES (?, ?, ?, ?, ?, ?)',
(
data["id"],
data["content_id"],
data["reviewer"],
data["status"],
data["comments"],
data["timestamp"]
)
)
self.conn.commit()
return review
def update_review(self, review: ContentReview) -> ContentReview:
review.timestamp = int(time.time())
data = review.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'''UPDATE content_reviews
SET status = ?, comments = ?, timestamp = ?
WHERE id = ?''',
(
data["status"],
data["comments"],
data["timestamp"],
data["id"]
)
)
self.conn.commit()
return review
def get_content_reviews(self, content_id: str) -> List[ContentReview]:
cursor = self.conn.cursor()
cursor.execute(
'SELECT * FROM content_reviews WHERE content_id = ? ORDER BY timestamp DESC',
(content_id,)
)
rows = cursor.fetchall()
return [ContentReview.from_dict({
"id": row["id"],
"content_id": row["content_id"],
"reviewer": row["reviewer"],
"status": row["status"],
"comments": row["comments"],
"timestamp": row["timestamp"]
}) for row in rows]
def get_pending_reviews(self) -> List[ContentReview]:
cursor = self.conn.cursor()
cursor.execute(
'SELECT * FROM content_reviews WHERE status = ? ORDER BY timestamp ASC',
('pending',)
)
rows = cursor.fetchall()
return [ContentReview.from_dict({
"id": row["id"],
"content_id": row["content_id"],
"reviewer": row["reviewer"],
"status": row["status"],
"comments": row["comments"],
"timestamp": row["timestamp"]
}) for row in rows]
实现模板服务
现在我们创建模板服务来管理内容模板。在services/template_service.py中:
import string
from typing import Dict, Any, List, Optional
from database.db_manager import DatabaseManager
from database.models import Template
class TemplateService:
def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager
def create_template(self, name: str, description: str, schema: Dict[str, Any],
content: str) -> Template:
"""创建新的内容模板"""
template = Template(
id=None, # 由数据库管理器生成
name=name,
description=description,
schema=schema,
content=content
)
return self.db_manager.create_template(template)
def get_template(self, template_id: str) -> Optional[Template]:
"""获取特定模板"""
return self.db_manager.get_template(template_id)
def list_templates(self) -> List[Template]:
"""列出所有可用的模板"""
return self.db_manager.list_templates()
def render_template(self, template_id: str, variables: Dict[str, Any]) -> str:
"""基于模板和变量渲染内容"""
template = self.get_template(template_id)
if not template:
raise ValueError(f"Template not found: {template_id}")
# 验证提供的变量是否符合模板的schema
self._validate_variables(template.schema, variables)
# 使用字符串模板填充内容
template_str = string.Template(template.content)
return template_str.substitute(variables)
def _validate_variables(self, schema: Dict[str, Any], variables: Dict[str, Any]):
"""验证变量是否符合schema定义"""
for field, field_schema in schema.items():
# 检查必填字段
if field_schema.get("required", False) and field not in variables:
raise ValueError(f"Missing required field: {field}")
# 如果字段存在,检查类型
if field in variables:
value = variables[field]
field_type = field_schema.get("type", "string")
# 简单类型检查
if field_type == "string" and not isinstance(value, str):
raise ValueError(f"Field {field} should be a string")
elif field_type == "number" and not (isinstance(value, int) or isinstance(value, float)):
raise ValueError(f"Field {field} should be a number")
elif field_type == "boolean" and not isinstance(value, bool):
raise ValueError(f"Field {field} should be a boolean")
elif field_type == "array" and not isinstance(value, list):
raise ValueError(f"Field {field} should be an array")
elif field_type == "object" and not isinstance(value, dict):
raise ValueError(f"Field {field} should be an object")
# 检查长度限制
if isinstance(value, str) and "maxLength" in field_schema:
if len(value) > field_schema["maxLength"]:
raise ValueError(
f"Field {field} exceeds maximum length of {field_schema['maxLength']}")
# 检查枚举值
if "enum" in field_schema and value not in field_schema["enum"]:
valid_values = ", ".join(str(v) for v in field_schema["enum"])
raise ValueError(
f"Field {field} must be one of: {valid_values}")
实现内容服务
接下来,我们创建内容服务来管理生成的内容。在services/content_service.py中:
from typing import Dict, Any, List, Optional
from database.db_manager import DatabaseManager
from database.models import Content
from services.template_service import TemplateService
class ContentService:
def __init__(self, db_manager: DatabaseManager, template_service: TemplateService):
self.db_manager = db_manager
self.template_service = template_service
def generate_content(self, template_id: str, title: str, variables: Dict[str, Any],
author: str) -> Content:
"""基于模板生成新内容"""
# 使用模板服务渲染内容
body = self.template_service.render_template(template_id, variables)
# 创建新的内容记录
content = Content(
id=None, # 由数据库管理器生成
title=title,
body=body,
template_id=template_id,
author=author,
status="draft", # 新生成的内容默认为草稿状态
metadata={"template_variables": variables} # 存储用于生成的变量
)
return self.db_manager.create_content(content)
def get_content(self, content_id: str) -> Optional[Content]:
"""获取特定内容"""
return self.db_manager.get_content(content_id)
def list_contents(self, status=None) -> List[Content]:
"""列出内容,可选按状态筛选"""
return self.db_manager.list_contents(status)
def update_content(self, content_id: str, title: str = None, body: str = None,
status: str = None, metadata: Dict[str, Any] = None) -> Content:
"""更新内容"""
content = self.get_content(content_id)
if not content:
raise ValueError(f"Content not found: {content_id}")
# 更新提供的字段
if title is not None:
content.title = title
if body is not None:
content.body = body
if status is not None:
if status not in ["draft", "review", "published", "archived"]:
raise ValueError(f"Invalid status: {status}")
content.status = status
if metadata is not None:
# 合并而不是替换元数据
content.metadata.update(metadata)
return self.db_manager.update_content(content)
创建格式化工具
为了支持内容的格式化和规范化,我们实现一些工具函数。在utils/formatter.py中:
import re
import markdown
from typing import Dict, Any, List
def format_markdown(text: str) -> str:
"""规范化Markdown格式"""
# 确保段落之间有一个空行
text = re.sub(r'(\n)(?!\n)', r'\1\n', text)
# 规范化标题格式(确保#后有空格)
text = re.sub(r'(^|\n)(#+)([^ #])', r'\1\2 \3', text)
# 规范化列表项(确保-后有空格)
text = re.sub(r'(^|\n)-([^ ])', r'\1- \2', text)
# 删除多余的空行(超过两个连续空行)
text = re.sub(r'\n{3,}', r'\n\n', text)
return text
def validate_markdown(text: str) -> List[Dict[str, Any]]:
"""验证Markdown内容,返回问题列表"""
issues = []
# 检查标题层级
if not re.search(r'^# ', text, re.MULTILINE):
issues.append({
"type": "missing_h1",
"message": "Missing top-level heading (H1)"
})
# 检查空链接
empty_links = re.findall(r'\[([^\]]+)\]\(\s*\)', text)
if empty_links:
issues.append({
"type": "empty_links",
"message": f"Found empty links: {', '.join(empty_links)}"
})
# 检查潜在的拼写错误(简单示例)
common_typos = {
"teh": "the",
"adn": "and",
"tahn": "than"
}
for typo, correction in common_typos.items():
if re.search(r'\b' + typo + r'\b', text, re.IGNORECASE):
issues.append({
"type": "potential_typo",
"message": f"Potential typo: '{typo}' might be '{correction}'"
})
return issues
def markdown_to_html(text: str) -> str:
"""将Markdown转换为HTML"""
return markdown.markdown(
text,
extensions=['extra', 'codehilite', 'tables', 'toc']
)
def extract_metadata(text: str) -> Dict[str, Any]:
"""从Markdown中提取元数据(YAML前置内容)"""
metadata = {}
# 简单的YAML前置内容解析
match = re.match(r'^---\s*\n(.*?)\n---\s*\n', text, re.DOTALL)
if match:
yaml_text = match.group(1)
# 简单解析key-value对
for line in yaml_text.split('\n'):
if ':' in line:
key, value = line.split(':', 1)
metadata[key.strip()] = value.strip()
return metadata
通过这些基础组件,我们已经构建了内容模板和生成工具的核心部分。在下一节中,我们将实现编辑和审核工作流部分。
步骤2:实现编辑和审核工作流
现在我们已经有了内容生成的基础,下面我们将实现编辑和审核工作流功能,这将使系统更加完整和实用。
创建版本控制模型
首先,让我们在database/models.py中添加版本和审核相关的数据模型:
@dataclass
class ContentVersion:
id: str
content_id: str
revision: int
body: str
comment: str
author: str
timestamp: int = None
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"content_id": self.content_id,
"revision": self.revision,
"body": self.body,
"comment": self.comment,
"author": self.author,
"timestamp": self.timestamp or int(time.time())
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ContentVersion':
return cls(
id=data["id"],
content_id=data["content_id"],
revision=data["revision"],
body=data["body"],
comment=data["comment"],
author=data["author"],
timestamp=data.get("timestamp")
)
@dataclass
class ContentReview:
id: str
content_id: str
reviewer: str
status: str # 'pending', 'approved', 'rejected'
comments: str
timestamp: int = None
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"content_id": self.content_id,
"reviewer": self.reviewer,
"status": self.status,
"comments": self.comments,
"timestamp": self.timestamp or int(time.time())
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ContentReview':
return cls(
id=data["id"],
content_id=data["content_id"],
reviewer=data["reviewer"],
status=data["status"],
comments=data["comments"],
timestamp=data.get("timestamp")
)
扩展数据库管理器
接下来,我们需要在database/db_manager.py中扩展数据库管理器,以支持版本和审核功能:
def _create_tables(self):
# ... 现有代码 ...
# 创建版本表
cursor.execute('''
CREATE TABLE IF NOT EXISTS content_versions (
id TEXT PRIMARY KEY,
content_id TEXT NOT NULL,
revision INTEGER NOT NULL,
body TEXT NOT NULL,
comment TEXT,
author TEXT NOT NULL,
timestamp INTEGER NOT NULL,
FOREIGN KEY (content_id) REFERENCES contents (id),
UNIQUE (content_id, revision)
)
''')
# 创建审核表
cursor.execute('''
CREATE TABLE IF NOT EXISTS content_reviews (
id TEXT PRIMARY KEY,
content_id TEXT NOT NULL,
reviewer TEXT NOT NULL,
status TEXT NOT NULL,
comments TEXT,
timestamp INTEGER NOT NULL,
FOREIGN KEY (content_id) REFERENCES contents (id)
)
''')
self.conn.commit()
# 版本控制操作
def create_version(self, version: ContentVersion) -> ContentVersion:
if not version.id:
version.id = str(uuid.uuid4())
version.timestamp = int(time.time())
data = version.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'INSERT INTO content_versions VALUES (?, ?, ?, ?, ?, ?, ?)',
(
data["id"],
data["content_id"],
data["revision"],
data["body"],
data["comment"],
data["author"],
data["timestamp"]
)
)
self.conn.commit()
return version
def get_content_versions(self, content_id: str) -> List[ContentVersion]:
cursor = self.conn.cursor()
cursor.execute(
'SELECT * FROM content_versions WHERE content_id = ? ORDER BY revision DESC',
(content_id,)
)
rows = cursor.fetchall()
return [ContentVersion.from_dict({
"id": row["id"],
"content_id": row["content_id"],
"revision": row["revision"],
"body": row["body"],
"comment": row["comment"],
"author": row["author"],
"timestamp": row["timestamp"]
}) for row in rows]
def get_content_version(self, content_id: str, revision: int) -> Optional[ContentVersion]:
cursor = self.conn.cursor()
cursor.execute(
'SELECT * FROM content_versions WHERE content_id = ? AND revision = ?',
(content_id, revision)
)
row = cursor.fetchone()
if row:
return ContentVersion.from_dict({
"id": row["id"],
"content_id": row["content_id"],
"revision": row["revision"],
"body": row["body"],
"comment": row["comment"],
"author": row["author"],
"timestamp": row["timestamp"]
})
return None
def get_next_revision(self, content_id: str) -> int:
cursor = self.conn.cursor()
cursor.execute(
'SELECT MAX(revision) as max_rev FROM content_versions WHERE content_id = ?',
(content_id,)
)
row = cursor.fetchone()
if row and row["max_rev"] is not None:
return row["max_rev"] + 1
return 1 # 第一个版本
# 审核操作
def create_review(self, review: ContentReview) -> ContentReview:
if not review.id:
review.id = str(uuid.uuid4())
review.timestamp = int(time.time())
data = review.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'INSERT INTO content_reviews VALUES (?, ?, ?, ?, ?, ?)',
(
data["id"],
data["content_id"],
data["reviewer"],
data["status"],
data["comments"],
data["timestamp"]
)
)
self.conn.commit()
return review
def update_review(self, review: ContentReview) -> ContentReview:
review.timestamp = int(time.time())
data = review.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'''UPDATE content_reviews
SET status = ?, comments = ?, timestamp = ?
WHERE id = ?''',
(
data["status"],
data["comments"],
data["timestamp"],
data["id"]
)
)
self.conn.commit()
return review
def get_content_reviews(self, content_id: str) -> List[ContentReview]:
cursor = self.conn.cursor()
cursor.execute(
'SELECT * FROM content_reviews WHERE content_id = ? ORDER BY timestamp DESC',
(content_id,)
)
rows = cursor.fetchall()
return [ContentReview.from_dict({
"id": row["id"],
"content_id": row["content_id"],
"reviewer": row["reviewer"],
"status": row["status"],
"comments": row["comments"],
"timestamp": row["timestamp"]
}) for row in rows]
def get_pending_reviews(self) -> List[ContentReview]:
cursor = self.conn.cursor()
cursor.execute(
'SELECT * FROM content_reviews WHERE status = ? ORDER BY timestamp ASC',
('pending',)
)
rows = cursor.fetchall()
return [ContentReview.from_dict({
"id": row["id"],
"content_id": row["content_id"],
"reviewer": row["reviewer"],
"status": row["status"],
"comments": row["comments"],
"timestamp": row["timestamp"]
}) for row in rows]
实现版本管理服务
现在,我们创建一个版本管理服务来处理内容版本控制。在services/version_service.py中:
import difflib
from typing import Dict, Any, List, Optional, Tuple
from database.db_manager import DatabaseManager
from database.models import Content, ContentVersion
from services.content_service import ContentService
class VersionService:
def __init__(self, db_manager: DatabaseManager, content_service: ContentService):
self.db_manager = db_manager
self.content_service = content_service
def create_version(self, content_id: str, body: str, author: str,
comment: str) -> ContentVersion:
"""创建内容的新版本"""
# 确认内容存在
content = self.content_service.get_content(content_id)
if not content:
raise ValueError(f"Content not found: {content_id}")
# 获取下一个版本号
next_revision = self.db_manager.get_next_revision(content_id)
# 创建新的版本记录
version = ContentVersion(
id=None, # 由数据库管理器生成
content_id=content_id,
revision=next_revision,
body=body,
comment=comment,
author=author
)
return self.db_manager.create_version(version)
def get_content_versions(self, content_id: str) -> List[ContentVersion]:
"""获取内容的所有版本"""
return self.db_manager.get_content_versions(content_id)
def get_content_version(self, content_id: str, revision: int) -> Optional[ContentVersion]:
"""获取内容的特定版本"""
return self.db_manager.get_content_version(content_id, revision)
def compare_versions(self, content_id: str, from_revision: int,
to_revision: int) -> Dict[str, Any]:
"""比较两个版本之间的差异"""
from_version = self.get_content_version(content_id, from_revision)
to_version = self.get_content_version(content_id, to_revision)
if not from_version or not to_version:
raise ValueError(f"Version not found for content: {content_id}")
# 使用difflib比较文本差异
from_lines = from_version.body.splitlines()
to_lines = to_version.body.splitlines()
diff = list(difflib.unified_diff(
from_lines,
to_lines,
fromfile=f"revision-{from_revision}",
tofile=f"revision-{to_revision}",
lineterm=""
))
# 计算统计信息
stats = self._calculate_diff_stats(from_lines, to_lines)
return {
"from_revision": from_revision,
"to_revision": to_revision,
"diff": diff,
"stats": stats,
"from_timestamp": from_version.timestamp,
"to_timestamp": to_version.timestamp
}
def restore_version(self, content_id: str, revision: int, author: str) -> Content:
"""将内容还原到特定版本"""
version = self.get_content_version(content_id, revision)
if not version:
raise ValueError(f"Version {revision} not found for content: {content_id}")
# 更新内容
content = self.content_service.update_content(
content_id=content_id,
body=version.body,
metadata={"restored_from_revision": revision}
)
# 创建新版本记录
self.create_version(
content_id=content_id,
body=version.body,
author=author,
comment=f"Restored from revision {revision}"
)
return content
def _calculate_diff_stats(self, from_lines: List[str], to_lines: List[str]) -> Dict[str, int]:
"""计算差异统计信息"""
s = difflib.SequenceMatcher(None, from_lines, to_lines)
# 计算添加、删除和修改的行数
added = 0
deleted = 0
for tag, i1, i2, j1, j2 in s.get_opcodes():
if tag == 'insert':
added += (j2 - j1)
elif tag == 'delete':
deleted += (i2 - i1)
elif tag == 'replace':
added += (j2 - j1)
deleted += (i2 - i1)
return {
"added": added,
"deleted": deleted,
"changed": added + deleted
}
实现审核服务
接下来,我们实现一个审核服务来管理内容审核流程。在services/review_service.py中:
from typing import Dict, Any, List, Optional
from database.db_manager import DatabaseManager
from database.models import Content, ContentReview
from services.content_service import ContentService
class ReviewService:
def __init__(self, db_manager: DatabaseManager, content_service: ContentService):
self.db_manager = db_manager
self.content_service = content_service
def submit_for_review(self, content_id: str, reviewer: str) -> ContentReview:
"""提交内容进行审核"""
content = self.content_service.get_content(content_id)
if not content:
raise ValueError(f"Content not found: {content_id}")
# 更新内容状态
self.content_service.update_content(
content_id=content_id,
status="review"
)
# 创建审核记录
review = ContentReview(
id=None, # 由数据库管理器生成
content_id=content_id,
reviewer=reviewer,
status="pending",
comments=""
)
return self.db_manager.create_review(review)
def approve_content(self, review_id: str, comments: str = "") -> Tuple[ContentReview, Content]:
"""批准内容"""
review = self._get_review(review_id)
if not review:
raise ValueError(f"Review not found: {review_id}")
if review.status != "pending":
raise ValueError(f"Review is not pending, current status: {review.status}")
# 更新审核状态
review.status = "approved"
review.comments = comments
updated_review = self.db_manager.update_review(review)
# 更新内容状态
content = self.content_service.update_content(
content_id=review.content_id,
status="published",
metadata={"approved_by": review.reviewer}
)
return updated_review, content
def reject_content(self, review_id: str, comments: str) -> Tuple[ContentReview, Content]:
"""拒绝内容"""
review = self._get_review(review_id)
if not review:
raise ValueError(f"Review not found: {review_id}")
if review.status != "pending":
raise ValueError(f"Review is not pending, current status: {review.status}")
# 更新审核状态
review.status = "rejected"
review.comments = comments
updated_review = self.db_manager.update_review(review)
# 更新内容状态(返回到草稿状态)
content = self.content_service.update_content(
content_id=review.content_id,
status="draft",
metadata={"rejected_by": review.reviewer, "rejection_reason": comments}
)
return updated_review, content
def get_content_reviews(self, content_id: str) -> List[ContentReview]:
"""获取内容的所有审核记录"""
return self.db_manager.get_content_reviews(content_id)
def get_pending_reviews(self) -> List[ContentReview]:
"""获取所有待处理的审核"""
return self.db_manager.get_pending_reviews()
def _get_review(self, review_id: str) -> Optional[ContentReview]:
"""从数据库获取审核记录"""
# 这个方法需要实现,但在当前的db_manager中缺少,我们可以添加它
cursor = self.db_manager.conn.cursor()
cursor.execute('SELECT * FROM content_reviews WHERE id = ?', (review_id,))
row = cursor.fetchone()
if row:
return ContentReview.from_dict({
"id": row["id"],
"content_id": row["content_id"],
"reviewer": row["reviewer"],
"status": row["status"],
"comments": row["comments"],
"timestamp": row["timestamp"]
})
return None
创建差异比较工具
为了支持内容版本的比较和可视化,我们实现一个差异比较工具。在utils/diff_tool.py中:
import difflib
from typing import List, Dict, Any, Tuple
def create_html_diff(from_text: str, to_text: str) -> str:
"""创建HTML格式的差异比较视图"""
from_lines = from_text.splitlines()
to_lines = to_text.splitlines()
# 创建HTML格式的差异
diff = difflib.HtmlDiff()
return diff.make_file(from_lines, to_lines, "旧版本", "新版本", True)
def create_text_diff(from_text: str, to_text: str) -> str:
"""创建文本格式的差异比较视图"""
from_lines = from_text.splitlines()
to_lines = to_text.splitlines()
# 创建统一格式的差异
diff = list(difflib.unified_diff(
from_lines,
to_lines,
fromfile="旧版本",
tofile="新版本",
lineterm=""
))
return "\n".join(diff)
def analyze_diff(from_text: str, to_text: str) -> Dict[str, Any]:
"""分析两个文本之间的差异并提供详细统计"""
from_lines = from_text.splitlines()
to_lines = to_text.splitlines()
# 使用SequenceMatcher进行详细分析
s = difflib.SequenceMatcher(None, from_lines, to_lines)
# 初始化统计信息
stats = {
"total_lines_before": len(from_lines),
"total_lines_after": len(to_lines),
"unchanged_lines": 0,
"added_lines": 0,
"deleted_lines": 0,
"changed_lines": 0,
"changes": []
}
# 分析每个变更块
for tag, i1, i2, j1, j2 in s.get_opcodes():
if tag == 'equal':
stats["unchanged_lines"] += (i2 - i1)
elif tag == 'insert':
stats["added_lines"] += (j2 - j1)
stats["changes"].append({
"type": "insert",
"from_line": i1 + 1,
"to_line": i1 + 1,
"content": to_lines[j1:j2]
})
elif tag == 'delete':
stats["deleted_lines"] += (i2 - i1)
stats["changes"].append({
"type": "delete",
"from_line": i1 + 1,
"to_line": i2,
"content": from_lines[i1:i2]
})
elif tag == 'replace':
stats["changed_lines"] += max(i2 - i1, j2 - j1)
stats["changes"].append({
"type": "replace",
"from_line": i1 + 1,
"to_line": i2,
"old_content": from_lines[i1:i2],
"new_content": to_lines[j1:j2]
})
# 计算相似度
stats["similarity"] = round(s.ratio() * 100, 2)
return stats
通过这些组件,我们实现了内容的编辑和审核工作流程。在下一节中,我们将实现内容版本管理的功能。
步骤3:内容版本管理
在完成了基本的内容生成和审核流程后,我们需要实现一个健壮的版本管理系统,以便跟踪和管理内容的变更历史。
创建MCP服务器
现在,让我们将所有组件整合到一个MCP服务器中。在mcp_server.py文件中:
import json
import os
from typing import Dict, Any, List, Optional
from mcp.server import Server, Context
from mcp.transports import StdIOTransport, ServerSentEventsTransport
from database.db_manager import DatabaseManager
from services.template_service import TemplateService
from services.content_service import ContentService
from services.version_service import VersionService
from services.review_service import ReviewService
from utils.formatter import (
format_markdown, validate_markdown, markdown_to_html, extract_metadata
)
from utils.diff_tool import create_html_diff, create_text_diff, analyze_diff
class ContentManagementServer:
def __init__(self):
# 确保数据目录存在
os.makedirs("data", exist_ok=True)
# 初始化数据库和服务
self.db_manager = DatabaseManager("data/content.db")
self.template_service = TemplateService(self.db_manager)
self.content_service = ContentService(self.db_manager, self.template_service)
self.version_service = VersionService(self.db_manager, self.content_service)
self.review_service = ReviewService(self.db_manager, self.content_service)
# 创建MCP服务器
self.server = Server()
# 注册资源
self._register_resources()
# 注册工具
self._register_tools()
def _register_resources(self):
"""注册系统资源"""
# 模板资源
@self.server.resource("/templates")
async def get_templates(ctx: Context):
templates = self.template_service.list_templates()
return [t.to_dict() for t in templates]
@self.server.resource("/templates/{id}")
async def get_template(ctx: Context, id: str):
template = self.template_service.get_template(id)
if not template:
ctx.status(404)
return {"error": f"Template not found: {id}"}
return template.to_dict()
# 内容资源
@self.server.resource("/contents")
async def get_contents(ctx: Context):
status = ctx.query("status")
contents = self.content_service.list_contents(status)
return [c.to_dict() for c in contents]
@self.server.resource("/contents/{id}")
async def get_content(ctx: Context, id: str):
content = self.content_service.get_content(id)
if not content:
ctx.status(404)
return {"error": f"Content not found: {id}"}
return content.to_dict()
@self.server.resource("/contents/{id}/versions")
async def get_content_versions(ctx: Context, id: str):
versions = self.version_service.get_content_versions(id)
return [v.to_dict() for v in versions]
@self.server.resource("/contents/{id}/versions/{revision}")
async def get_content_version(ctx: Context, id: str, revision: str):
version = self.version_service.get_content_version(id, int(revision))
if not version:
ctx.status(404)
return {"error": f"Version {revision} not found for content: {id}"}
return version.to_dict()
@self.server.resource("/contents/{id}/reviews")
async def get_content_reviews(ctx: Context, id: str):
reviews = self.review_service.get_content_reviews(id)
return [r.to_dict() for r in reviews]
@self.server.resource("/reviews/pending")
async def get_pending_reviews(ctx: Context):
reviews = self.review_service.get_pending_reviews()
return [r.to_dict() for r in reviews]
def _register_tools(self):
"""注册系统工具"""
# 模板工具
@self.server.tool("createTemplate")
async def create_template(
name: str,
description: str,
schema: Dict[str, Any],
content: str
) -> Dict[str, Any]:
"""创建新的内容模板"""
template = self.template_service.create_template(
name=name,
description=description,
schema=schema,
content=content
)
return template.to_dict()
# 内容生成工具
@self.server.tool("generateContent")
async def generate_content(
template_id: str,
title: str,
variables: Dict[str, Any],
author: str
) -> Dict[str, Any]:
"""基于模板生成新内容"""
content = self.content_service.generate_content(
template_id=template_id,
title=title,
variables=variables,
author=author
)
return content.to_dict()
# 内容编辑工具
@self.server.tool("updateContent")
async def update_content(
content_id: str,
title: str = None,
body: str = None,
status: str = None,
metadata: Dict[str, Any] = None
) -> Dict[str, Any]:
"""更新内容"""
content = self.content_service.update_content(
content_id=content_id,
title=title,
body=body,
status=status,
metadata=metadata
)
return content.to_dict()
# 格式化工具
@self.server.tool("formatContent")
async def format_content(text: str) -> str:
"""格式化Markdown内容"""
return format_markdown(text)
@self.server.tool("validateContent")
async def validate_content(text: str) -> List[Dict[str, Any]]:
"""验证Markdown内容"""
return validate_markdown(text)
@self.server.tool("convertToHtml")
async def convert_to_html(text: str) -> str:
"""将Markdown转换为HTML"""
return markdown_to_html(text)
@self.server.tool("extractMetadata")
async def extract_content_metadata(text: str) -> Dict[str, Any]:
"""从内容中提取元数据"""
return extract_metadata(text)
# 版本管理工具
@self.server.tool("createVersion")
async def create_version(
content_id: str,
body: str,
author: str,
comment: str
) -> Dict[str, Any]:
"""创建内容的新版本"""
version = self.version_service.create_version(
content_id=content_id,
body=body,
author=author,
comment=comment
)
return version.to_dict()
@self.server.tool("compareVersions")
async def compare_versions(
content_id: str,
from_revision: int,
to_revision: int
) -> Dict[str, Any]:
"""比较两个版本之间的差异"""
return self.version_service.compare_versions(
content_id=content_id,
from_revision=from_revision,
to_revision=to_revision
)
@self.server.tool("restoreVersion")
async def restore_version(
content_id: str,
revision: int,
author: str
) -> Dict[str, Any]:
"""将内容还原到特定版本"""
content = self.version_service.restore_version(
content_id=content_id,
revision=revision,
author=author
)
return content.to_dict()
@self.server.tool("createHtmlDiff")
async def create_html_version_diff(
from_text: str,
to_text: str
) -> str:
"""创建HTML格式的差异比较视图"""
return create_html_diff(from_text, to_text)
@self.server.tool("createTextDiff")
async def create_text_version_diff(
from_text: str,
to_text: str
) -> str:
"""创建文本格式的差异比较视图"""
return create_text_diff(from_text, to_text)
@self.server.tool("analyzeDiff")
async def analyze_version_diff(
from_text: str,
to_text: str
) -> Dict[str, Any]:
"""分析两个文本之间的差异"""
return analyze_diff(from_text, to_text)
# 审核工具
@self.server.tool("submitForReview")
async def submit_for_review(
content_id: str,
reviewer: str
) -> Dict[str, Any]:
"""提交内容进行审核"""
review = self.review_service.submit_for_review(
content_id=content_id,
reviewer=reviewer
)
return review.to_dict()
@self.server.tool("approveContent")
async def approve_content(
review_id: str,
comments: str = ""
) -> Dict[str, Any]:
"""批准内容"""
review, content = self.review_service.approve_content(
review_id=review_id,
comments=comments
)
return {
"review": review.to_dict(),
"content": content.to_dict()
}
@self.server.tool("rejectContent")
async def reject_content(
review_id: str,
comments: str
) -> Dict[str, Any]:
"""拒绝内容"""
review, content = self.review_service.reject_content(
review_id=review_id,
comments=comments
)
return {
"review": review.to_dict(),
"content": content.to_dict()
}
def run_stdio(self):
"""使用标准输入/输出运行服务器"""
transport = StdIOTransport()
self.server.serve(transport)
def run_sse(self, host="localhost", port=8000):
"""使用SSE运行服务器"""
transport = ServerSentEventsTransport(host=host, port=port)
self.server.serve(transport)
def close(self):
"""关闭服务器和数据库连接"""
self.db_manager.close()
if __name__ == "__main__":
import sys
server = ContentManagementServer()
try:
# 根据命令行参数选择传输方式
if len(sys.argv) > 1 and sys.argv[1] == "sse":
print("Starting server with SSE transport on http://localhost:8000")
server.run_sse()
else:
print("Starting server with StdIO transport")
server.run_stdio()
finally:
server.close()
创建示例客户端
为了演示如何使用我们的MCP服务器,我们创建一个示例客户端。在example_client.py文件中:
import asyncio
import json
import uuid
from typing import Dict, Any, List
from mcp.client import Client
from mcp.transports import StdIOClientTransport, ServerSentEventsClientTransport
async def run_content_workflow():
"""运行完整的内容创建、编辑和发布工作流"""
# 连接到MCP服务器
print("Connecting to content management server...")
client = Client()
# 使用相同的传输方式
# transport = StdIOClientTransport() # 如果服务器使用StdIO
transport = ServerSentEventsClientTransport("http://localhost:8000") # 如果服务器使用SSE
await client.connect(transport)
print("Connected!")
try:
# 1. 创建一个博客文章模板
template_id = str(uuid.uuid4())
print(f"Creating blog post template (ID: {template_id})...")
await client.tools.createTemplate(
name="Blog Post",
description="通用博客文章模板",
schema={
"title": {"type": "string", "required": True, "maxLength": 100},
"summary": {"type": "string", "required": True, "maxLength": 200},
"tags": {"type": "array", "required": False},
"category": {
"type": "string",
"required": True,
"enum": ["技术", "观点", "教程", "新闻"]
}
},
content="""---
title: $title
category: $category
tags: $tags
---
# $title
## 摘要
$summary
## 正文
在这里添加文章的主要内容...
## 结论
在这里添加结论...
"""
)
# 2. 获取可用模板
print("Fetching available templates...")
templates = await client.resources.get("/templates")
print(f"Found {len(templates)} templates")
# 3. 生成新内容
print("Generating new content based on template...")
content = await client.tools.generateContent(
template_id=template_id,
title="使用MCP构建内容管理系统",
variables={
"title": "使用MCP构建内容管理系统",
"summary": "本文介绍如何使用MCP协议构建一个灵活的内容管理系统,包括模板、版本控制和审核流程。",
"tags": ["MCP", "内容管理", "Python"],
"category": "教程"
},
author="MCP教程作者"
)
content_id = content["id"]
print(f"Generated content with ID: {content_id}")
# 4. 编辑内容
print("Editing content...")
edited_body = content["body"].replace(
"在这里添加文章的主要内容...",
"""
本教程将带你了解如何使用MCP协议构建一个完整的内容管理系统。
### MCP的优势
使用MCP构建内容系统有以下优势:
1. 模块化架构
2. 灵活的资源定义
3. 强大的工具集成
4. 简单的通信模型
### 系统架构
我们的系统包含以下核心组件:
- 模板引擎
- 内容服务
- 版本控制
- 审核工作流
"""
)
# 格式化内容
print("Formatting content...")
formatted_body = await client.tools.formatContent(edited_body)
# 验证内容
print("Validating content...")
validation_issues = await client.tools.validateContent(formatted_body)
if validation_issues:
print(f"Found {len(validation_issues)} issues:")
for issue in validation_issues:
print(f"- {issue['message']}")
# 更新内容
print("Updating content...")
await client.tools.updateContent(
content_id=content_id,
body=formatted_body
)
# 5. 创建版本
print("Creating version...")
version = await client.tools.createVersion(
content_id=content_id,
body=formatted_body,
author="MCP教程作者",
comment="完成了文章主体内容"
)
print(f"Created version {version['revision']}")
# 6. 进一步编辑内容
print("Making additional edits...")
further_edited_body = formatted_body.replace(
"### 系统架构",
"### 系统架构设计"
).replace(
"- 模板引擎\n- 内容服务\n- 版本控制\n- 审核工作流",
"- 模板引擎:管理和应用内容模板\n- 内容服务:处理内容的创建和存储\n- 版本控制:跟踪内容的变更历史\n- 审核工作流:管理内容的审核和发布流程"
)
# 更新内容
print("Updating with additional edits...")
await client.tools.updateContent(
content_id=content_id,
body=further_edited_body
)
# 创建新版本
print("Creating another version...")
version2 = await client.tools.createVersion(
content_id=content_id,
body=further_edited_body,
author="MCP教程作者",
comment="完善了系统架构部分"
)
print(f"Created version {version2['revision']}")
# 7. 比较版本
print("Comparing versions...")
comparison = await client.tools.compareVersions(
content_id=content_id,
from_revision=1,
to_revision=2
)
print(f"Diff stats: {comparison['stats']}")
# 8. 提交审核
print("Submitting for review...")
review = await client.tools.submitForReview(
content_id=content_id,
reviewer="内容审核员"
)
print(f"Submitted for review (ID: {review['id']})")
# 9. 批准内容
print("Approving content...")
approval_result = await client.tools.approveContent(
review_id=review['id'],
comments="内容符合规范,批准发布"
)
print("Content approved and published!")
# 10. 获取最终内容
print("Fetching final content...")
final_content = await client.resources.get(f"/contents/{content_id}")
print(f"Final content status: {final_content['status']}")
# 11. 转换为HTML(可选)
print("Converting to HTML...")
html_content = await client.tools.convertToHtml(final_content["body"])
print("Workflow completed successfully!")
finally:
# 断开连接
await client.disconnect()
print("Disconnected from server")
if __name__ == "__main__":
asyncio.run(run_content_workflow())
启动和测试系统
现在我们可以启动服务器和客户端来测试我们的内容管理系统:
- 首先,启动服务器:
python mcp_server.py sse
- 然后,在另一个终端中运行客户端:
python example_client.py
你应该能够看到客户端执行整个内容创建、编辑和发布的工作流程,从创建模板到发布内容。
完整项目代码
我们已经构建了一个完整的内容生成与编辑系统,其中包含以下核心文件:
mcp_server.py:MCP服务器入口database/db_manager.py:数据库管理器database/models.py:数据模型定义services/template_service.py:模板服务services/content_service.py:内容服务services/version_service.py:版本管理服务services/review_service.py:审核服务utils/formatter.py:内容格式化工具utils/diff_tool.py:版本差异工具example_client.py:示例客户端
这些组件共同构成了一个完整的内容管理系统,可以应用于博客平台、文档系统或其他需要内容生成和管理的场景。
项目解析
本项目演示了MCP在实际应用中的强大功能和灵活性:
-
模块化设计:我们将系统分解为多个独立的服务,每个服务负责特定的功能,同时通过MCP协议无缝协作。
-
资源抽象:使用MCP资源路径来抽象数据访问,提供了清晰的API界面。
-
工具集成:MCP工具为系统提供了强大的功能支持,包括内容生成、格式化、版本管理和审核等。
-
传输灵活性:系统支持多种传输方式(StdIO和SSE),可以在不同环境中灵活部署。
-
数据流设计:系统中的数据流非常清晰,从模板到内容,再到版本和审核,形成了一个完整的工作流。
后续扩展
这个系统还有很多可以扩展的方向:
-
用户认证与权限:添加用户系统和权限控制,限制不同用户对内容的访问和编辑权限。
-
搜索功能:实现全文搜索以便快速查找内容。
-
媒体资源管理:扩展系统以支持图片、视频等媒体资源的管理。
-
多语言支持:为内容添加多语言支持,管理不同语言版本的内容。
-
内容分析:添加分析工具,跟踪内容的使用情况和效果。
-
Web界面:构建一个Web界面,使非技术用户也能使用系统。
小结
在本节中,我们构建了一个完整的内容生成与编辑系统,展示了如何使用MCP协议构建实际应用。通过这个项目,你学习了:
- 如何设计和实现基于模板的内容生成系统
- 如何构建内容编辑和审核工作流
- 如何实现版本控制和差异比较功能
- 如何将这些组件整合到一个完整的MCP服务中
这个系统虽然简单,但包含了内容管理系统的核心功能,可以作为更复杂系统的基础。你可以根据自己的需求扩展这个系统,添加更多功能或集成到现有应用中。