4.3 自动化助手
目录
简介
在本节中,我们将构建一个功能齐全的自动化助手系统,利用MCP协议来连接和协调各种自动化任务。这个系统能够帮助用户自动执行日常工作中的重复性任务,跟踪任务的执行状态,并通过集成日历和通知功能保持用户随时了解重要事项。
自动化助手将包含以下核心功能:
- 灵活的任务定义和调度系统
- 任务执行和状态跟踪
- 执行报告和数据可视化
- 日历事件集成
- 多渠道通知系统
通过这个项目,你将学习如何使用MCP构建一个实用的自动化系统,该系统可以连接各种服务和API,并协调它们之间的工作流程。这种自动化助手在个人生产力工具、团队协作系统或企业业务流程自动化中都有广泛应用。
系统设计概述
系统架构
我们的自动化助手将采用模块化、可扩展的架构,包含以下核心组件:
- 任务管理器:负责任务的创建、存储和管理
- 任务调度器:根据时间表或触发条件调度任务执行
- 执行引擎:执行不同类型的自动化任务
- 状态跟踪器:记录和报告任务执行的状态和结果
- 日历集成器:将任务与日历事件同步
- 通知服务:通过各种渠道发送通知
- 用户界面:提供管理和监控任务的界面
这些组件将通过MCP协议进行通信,每个组件可以独立开发和扩展。
技术栈
在本项目中,我们将使用以下技术:
- Python: 作为主要开发语言
- MCP: 用于组件间通信
- SQLite: 作为任务和状态数据的存储
- APScheduler: 用于任务调度
- Google Calendar API: 用于日历集成
- SMTP/Webhook/Telegram API: 用于通知功能
- Flask: 提供简单的Web界面(可选)
数据模型
系统的核心数据模型如下:
Task {
id: string,
name: string,
description: string,
type: string, // 'script', 'api', 'web', 'email', 'custom'
config: object, // 特定任务类型的配置
schedule: {
type: string, // 'interval', 'cron', 'one_time', 'trigger'
value: string, // 调度表达式或触发条件
},
enabled: boolean,
created_at: timestamp,
updated_at: timestamp,
tags: array,
owner: string
}
Execution {
id: string,
task_id: string,
start_time: timestamp,
end_time: timestamp,
status: string, // 'pending', 'running', 'success', 'failed', 'cancelled'
result: object,
logs: array,
error: string
}
Notification {
id: string,
task_id: string,
execution_id: string,
channel: string, // 'email', 'webhook', 'telegram', 'sms'
config: object,
sent_at: timestamp,
status: string, // 'pending', 'sent', 'failed'
content: string
}
CalendarEvent {
id: string,
task_id: string,
calendar_id: string,
event_id: string,
title: string,
description: string,
start_time: timestamp,
end_time: timestamp,
sync_status: string // 'synced', 'pending', 'failed'
}
MCP资源与工具
我们将为系统定义以下MCP资源和工具:
资源 (Resources):
/tasks: 任务集合/tasks/{id}: 特定任务/tasks/{id}/executions: 任务的执行历史/executions: 所有执行记录/executions/{id}: 特定执行记录/executions/{id}/logs: 执行日志/calendar_events: 日历事件/notifications: 通知记录
工具 (Tools):
createTask: 创建新任务updateTask: 更新任务配置enableTask: 启用任务disableTask: 禁用任务executeTask: 手动执行任务cancelExecution: 取消正在运行的任务scheduleTask: 调度任务执行getTaskStatus: 获取任务状态configureNotification: 配置任务通知sendNotification: 发送通知syncCalendar: 与日历同步generateReport: 生成任务执行报告
在接下来的章节中,我们将逐步实现这些组件和功能,构建一个完整的自动化助手系统。
步骤1:构建任务自动化工具集
首先,我们需要构建自动化助手的核心部分:任务自动化工具集。这包括任务的定义、存储、调度和执行。
设置项目结构
让我们创建一个清晰的项目结构:
automation_assistant/
│
├── mcp_server.py # MCP服务器入口
├── database/
│ ├── __init__.py
│ ├── db_manager.py # 数据库管理
│ └── models.py # 数据模型
│
├── services/
│ ├── __init__.py
│ ├── task_service.py # 任务管理服务
│ ├── scheduler.py # 任务调度服务
│ └── executor.py # 任务执行服务
│
├── adapters/
│ ├── __init__.py
│ ├── script_adapter.py # 脚本执行适配器
│ ├── api_adapter.py # API调用适配器
│ ├── web_adapter.py # Web自动化适配器
│ └── email_adapter.py # 邮件自动化适配器
│
├── utils/
│ ├── __init__.py
│ └── logger.py # 日志工具
│
└── data/
└── automation.db # SQLite数据库
实现数据库模型
首先,我们创建数据库模型来存储任务信息。在database/models.py中:
import json
import time
import uuid
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Union
@dataclass
class Task:
id: str
name: str
description: str
type: str # 'script', 'api', 'web', 'email', 'custom'
config: Dict[str, Any]
schedule: Dict[str, Any]
enabled: bool
tags: List[str]
owner: str
created_at: int = None
updated_at: int = None
def to_dict(self) -> Dict[str, Any]:
now = int(time.time())
return {
"id": self.id,
"name": self.name,
"description": self.description,
"type": self.type,
"config": self.config,
"schedule": self.schedule,
"enabled": self.enabled,
"tags": self.tags,
"owner": self.owner,
"created_at": self.created_at or now,
"updated_at": self.updated_at or now
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Task':
return cls(
id=data["id"],
name=data["name"],
description=data["description"],
type=data["type"],
config=data["config"],
schedule=data["schedule"],
enabled=data["enabled"],
tags=data["tags"],
owner=data["owner"],
created_at=data.get("created_at"),
updated_at=data.get("updated_at")
)
@dataclass
class Execution:
id: str
task_id: str
start_time: int
end_time: int = None
status: str = "pending" # 'pending', 'running', 'success', 'failed', 'cancelled'
result: Dict[str, Any] = None
logs: List[str] = None
error: str = None
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"task_id": self.task_id,
"start_time": self.start_time,
"end_time": self.end_time,
"status": self.status,
"result": self.result or {},
"logs": self.logs or [],
"error": self.error
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Execution':
return cls(
id=data["id"],
task_id=data["task_id"],
start_time=data["start_time"],
end_time=data.get("end_time"),
status=data["status"],
result=data.get("result", {}),
logs=data.get("logs", []),
error=data.get("error")
)
创建数据库管理器
接下来,我们实现数据库管理器来处理数据存储。在database/db_manager.py中:
import sqlite3
import json
import uuid
import time
from typing import List, Dict, Any, Optional
from pathlib import Path
from .models import Task, Execution
class DatabaseManager:
def __init__(self, db_path="data/automation.db"):
# 确保数据目录存在
Path(db_path).parent.mkdir(exist_ok=True)
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self._create_tables()
def _create_tables(self):
cursor = self.conn.cursor()
# 创建任务表
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
type TEXT NOT NULL,
config TEXT NOT NULL,
schedule TEXT NOT NULL,
enabled INTEGER NOT NULL,
tags TEXT NOT NULL,
owner TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)
''')
# 创建执行记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS executions (
id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
start_time INTEGER NOT NULL,
end_time INTEGER,
status TEXT NOT NULL,
result TEXT,
logs TEXT,
error TEXT,
FOREIGN KEY (task_id) REFERENCES tasks (id)
)
''')
self.conn.commit()
def close(self):
self.conn.close()
# 任务操作
def create_task(self, task: Task) -> Task:
if not task.id:
task.id = str(uuid.uuid4())
now = int(time.time())
task.created_at = now
task.updated_at = now
data = task.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'INSERT INTO tasks VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
(
data["id"],
data["name"],
data["description"],
data["type"],
json.dumps(data["config"]),
json.dumps(data["schedule"]),
1 if data["enabled"] else 0,
json.dumps(data["tags"]),
data["owner"],
data["created_at"],
data["updated_at"]
)
)
self.conn.commit()
return task
def update_task(self, task: Task) -> Task:
task.updated_at = int(time.time())
data = task.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'''UPDATE tasks
SET name = ?, description = ?, type = ?, config = ?,
schedule = ?, enabled = ?, tags = ?, updated_at = ?
WHERE id = ?''',
(
data["name"],
data["description"],
data["type"],
json.dumps(data["config"]),
json.dumps(data["schedule"]),
1 if data["enabled"] else 0,
json.dumps(data["tags"]),
data["updated_at"],
data["id"]
)
)
self.conn.commit()
return task
def get_task(self, task_id: str) -> Optional[Task]:
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM tasks WHERE id = ?', (task_id,))
row = cursor.fetchone()
if row:
return Task.from_dict({
"id": row["id"],
"name": row["name"],
"description": row["description"],
"type": row["type"],
"config": json.loads(row["config"]),
"schedule": json.loads(row["schedule"]),
"enabled": bool(row["enabled"]),
"tags": json.loads(row["tags"]),
"owner": row["owner"],
"created_at": row["created_at"],
"updated_at": row["updated_at"]
})
return None
def list_tasks(self, type=None, owner=None, enabled=None, tag=None) -> List[Task]:
cursor = self.conn.cursor()
query = 'SELECT * FROM tasks'
conditions = []
params = []
if type:
conditions.append('type = ?')
params.append(type)
if owner:
conditions.append('owner = ?')
params.append(owner)
if enabled is not None:
conditions.append('enabled = ?')
params.append(1 if enabled else 0)
if tag:
# 这种方式在SQLite中查询JSON数组内容并不理想,但作为简单示例可以接受
# 在实际应用中,应考虑更好的标签存储和查询方式
conditions.append('tags LIKE ?')
params.append(f'%"{tag}"%')
if conditions:
query += ' WHERE ' + ' AND '.join(conditions)
cursor.execute(query, params)
rows = cursor.fetchall()
return [Task.from_dict({
"id": row["id"],
"name": row["name"],
"description": row["description"],
"type": row["type"],
"config": json.loads(row["config"]),
"schedule": json.loads(row["schedule"]),
"enabled": bool(row["enabled"]),
"tags": json.loads(row["tags"]),
"owner": row["owner"],
"created_at": row["created_at"],
"updated_at": row["updated_at"]
}) for row in rows]
def enable_task(self, task_id: str) -> bool:
cursor = self.conn.cursor()
cursor.execute(
'UPDATE tasks SET enabled = 1, updated_at = ? WHERE id = ?',
(int(time.time()), task_id)
)
self.conn.commit()
return cursor.rowcount > 0
def disable_task(self, task_id: str) -> bool:
cursor = self.conn.cursor()
cursor.execute(
'UPDATE tasks SET enabled = 0, updated_at = ? WHERE id = ?',
(int(time.time()), task_id)
)
self.conn.commit()
return cursor.rowcount > 0
def delete_task(self, task_id: str) -> bool:
cursor = self.conn.cursor()
cursor.execute('DELETE FROM tasks WHERE id = ?', (task_id,))
self.conn.commit()
return cursor.rowcount > 0
# 执行记录操作
def create_execution(self, execution: Execution) -> Execution:
if not execution.id:
execution.id = str(uuid.uuid4())
data = execution.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'INSERT INTO executions VALUES (?, ?, ?, ?, ?, ?, ?, ?)',
(
data["id"],
data["task_id"],
data["start_time"],
data["end_time"],
data["status"],
json.dumps(data["result"]),
json.dumps(data["logs"]),
data["error"]
)
)
self.conn.commit()
return execution
def update_execution(self, execution: Execution) -> Execution:
data = execution.to_dict()
cursor = self.conn.cursor()
cursor.execute(
'''UPDATE executions
SET end_time = ?, status = ?, result = ?, logs = ?, error = ?
WHERE id = ?''',
(
data["end_time"],
data["status"],
json.dumps(data["result"]),
json.dumps(data["logs"]),
data["error"],
data["id"]
)
)
self.conn.commit()
return execution
def get_execution(self, execution_id: str) -> Optional[Execution]:
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM executions WHERE id = ?', (execution_id,))
row = cursor.fetchone()
if row:
return Execution.from_dict({
"id": row["id"],
"task_id": row["task_id"],
"start_time": row["start_time"],
"end_time": row["end_time"],
"status": row["status"],
"result": json.loads(row["result"]) if row["result"] else {},
"logs": json.loads(row["logs"]) if row["logs"] else [],
"error": row["error"]
})
return None
def list_executions(self, task_id=None, status=None, limit=None) -> List[Execution]:
cursor = self.conn.cursor()
query = 'SELECT * FROM executions'
conditions = []
params = []
if task_id:
conditions.append('task_id = ?')
params.append(task_id)
if status:
conditions.append('status = ?')
params.append(status)
if conditions:
query += ' WHERE ' + ' AND '.join(conditions)
query += ' ORDER BY start_time DESC'
if limit:
query += ' LIMIT ?'
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
return [Execution.from_dict({
"id": row["id"],
"task_id": row["task_id"],
"start_time": row["start_time"],
"end_time": row["end_time"],
"status": row["status"],
"result": json.loads(row["result"]) if row["result"] else {},
"logs": json.loads(row["logs"]) if row["logs"] else [],
"error": row["error"]
}) for row in rows]
def add_execution_log(self, execution_id: str, log_entry: str) -> bool:
execution = self.get_execution(execution_id)
if not execution:
return False
logs = execution.logs or []
logs.append(log_entry)
execution.logs = logs
self.update_execution(execution)
return True
创建日志工具
为了记录任务执行的详细信息,我们创建一个日志工具。在utils/logger.py中:
import logging
import sys
import time
from typing import Dict, Any, Optional
class TaskLogger:
def __init__(self, db_manager=None, execution_id=None):
self.db_manager = db_manager
self.execution_id = execution_id
self.logs = []
# 配置标准日志
self.logger = logging.getLogger(f"task_{execution_id}" if execution_id else "task")
self.logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def info(self, message: str):
"""记录信息性日志"""
self._log(message, "INFO")
def warning(self, message: str):
"""记录警告日志"""
self._log(message, "WARNING")
def error(self, message: str):
"""记录错误日志"""
self._log(message, "ERROR")
def debug(self, message: str):
"""记录调试日志"""
self._log(message, "DEBUG")
def _log(self, message: str, level: str):
"""内部日志处理方法"""
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] [{level}] {message}"
# 添加到内存中的日志列表
self.logs.append(log_entry)
# 输出到标准日志
if level == "INFO":
self.logger.info(message)
elif level == "WARNING":
self.logger.warning(message)
elif level == "ERROR":
self.logger.error(message)
elif level == "DEBUG":
self.logger.debug(message)
# 如果提供了数据库管理器和执行ID,则保存到数据库
if self.db_manager and self.execution_id:
self.db_manager.add_execution_log(self.execution_id, log_entry)
def get_logs(self) -> list:
"""获取所有日志"""
return self.logs
这样,我们已经建立了自动化系统的数据模型、存储和日志功能。在下一部分,我们将实现任务服务和执行适配器。
实现任务服务
现在,我们需要实现任务管理服务,它将作为处理任务的中心控制器。在services/task_service.py中:
from typing import Dict, List, Optional, Any, Union
import time
import uuid
from database.db_manager import DatabaseManager
from database.models import Task, Execution
from utils.logger import TaskLogger
class TaskService:
def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager
self.logger = TaskLogger()
def create_task(self, name: str, description: str, type: str, config: Dict[str, Any],
schedule: Dict[str, Any], tags: List[str], owner: str,
enabled: bool = True) -> Task:
"""创建新任务"""
# 验证任务类型
valid_types = ['script', 'api', 'web', 'email', 'custom']
if type not in valid_types:
raise ValueError(f"Invalid task type: {type}. Must be one of: {', '.join(valid_types)}")
# 验证调度配置
valid_schedule_types = ['interval', 'cron', 'one_time', 'trigger']
if schedule.get('type') not in valid_schedule_types:
raise ValueError(
f"Invalid schedule type: {schedule.get('type')}. Must be one of: {', '.join(valid_schedule_types)}")
# 创建任务对象
task = Task(
id=str(uuid.uuid4()),
name=name,
description=description,
type=type,
config=config,
schedule=schedule,
enabled=enabled,
tags=tags,
owner=owner
)
# 存储任务
created_task = self.db_manager.create_task(task)
self.logger.info(f"Created task: {name} (ID: {created_task.id})")
return created_task
def update_task(self, task_id: str, name: str = None, description: str = None,
config: Dict[str, Any] = None, schedule: Dict[str, Any] = None,
tags: List[str] = None, enabled: bool = None) -> Task:
"""更新任务配置"""
# 获取现有任务
task = self.db_manager.get_task(task_id)
if not task:
raise ValueError(f"Task not found: {task_id}")
# 更新提供的字段
if name is not None:
task.name = name
if description is not None:
task.description = description
if config is not None:
task.config = config
if schedule is not None:
# 验证调度配置
valid_schedule_types = ['interval', 'cron', 'one_time', 'trigger']
if schedule.get('type') not in valid_schedule_types:
raise ValueError(
f"Invalid schedule type: {schedule.get('type')}. Must be one of: {', '.join(valid_schedule_types)}")
task.schedule = schedule
if tags is not None:
task.tags = tags
if enabled is not None:
task.enabled = enabled
# 更新任务
updated_task = self.db_manager.update_task(task)
self.logger.info(f"Updated task: {updated_task.name} (ID: {updated_task.id})")
return updated_task
def get_task(self, task_id: str) -> Optional[Task]:
"""获取特定任务"""
return self.db_manager.get_task(task_id)
def list_tasks(self, type=None, owner=None, enabled=None, tag=None) -> List[Task]:
"""列出任务,可选按类型、所有者、状态或标签筛选"""
return self.db_manager.list_tasks(type, owner, enabled, tag)
def enable_task(self, task_id: str) -> bool:
"""启用任务"""
result = self.db_manager.enable_task(task_id)
if result:
self.logger.info(f"Enabled task: {task_id}")
return result
def disable_task(self, task_id: str) -> bool:
"""禁用任务"""
result = self.db_manager.disable_task(task_id)
if result:
self.logger.info(f"Disabled task: {task_id}")
return result
def delete_task(self, task_id: str) -> bool:
"""删除任务"""
result = self.db_manager.delete_task(task_id)
if result:
self.logger.info(f"Deleted task: {task_id}")
return result
def create_execution(self, task_id: str) -> Optional[Execution]:
"""创建任务执行记录"""
# 获取任务
task = self.db_manager.get_task(task_id)
if not task:
self.logger.error(f"Cannot create execution: Task not found: {task_id}")
return None
# 创建执行记录
execution = Execution(
id=str(uuid.uuid4()),
task_id=task_id,
start_time=int(time.time()),
status="pending"
)
# 存储执行记录
created_execution = self.db_manager.create_execution(execution)
self.logger.info(f"Created execution: {created_execution.id} for task: {task.name}")
return created_execution
def update_execution(self, execution_id: str, status: str = None, result: Dict[str, Any] = None,
error: str = None, end_time: int = None) -> Optional[Execution]:
"""更新任务执行记录"""
# 获取执行记录
execution = self.db_manager.get_execution(execution_id)
if not execution:
self.logger.error(f"Cannot update execution: Execution not found: {execution_id}")
return None
# 更新提供的字段
if status is not None:
valid_statuses = ['pending', 'running', 'success', 'failed', 'cancelled']
if status not in valid_statuses:
raise ValueError(f"Invalid status: {status}. Must be one of: {', '.join(valid_statuses)}")
execution.status = status
if result is not None:
if execution.result is None:
execution.result = {}
execution.result.update(result)
if error is not None:
execution.error = error
# 如果设置了状态为成功或失败,且未提供结束时间,则自动设置
if status in ['success', 'failed', 'cancelled'] and end_time is None:
end_time = int(time.time())
if end_time is not None:
execution.end_time = end_time
# 更新执行记录
updated_execution = self.db_manager.update_execution(execution)
self.logger.info(f"Updated execution: {updated_execution.id}, status: {updated_execution.status}")
return updated_execution
def get_execution(self, execution_id: str) -> Optional[Execution]:
"""获取特定执行记录"""
return self.db_manager.get_execution(execution_id)
def list_executions(self, task_id=None, status=None, limit=None) -> List[Execution]:
"""列出执行记录,可选按任务ID、状态筛选,且可限制数量"""
return self.db_manager.list_executions(task_id, status, limit)
def add_execution_log(self, execution_id: str, message: str, level: str = "INFO") -> bool:
"""添加执行日志"""
# 创建特定于此执行的日志器
execution_logger = TaskLogger(self.db_manager, execution_id)
# 根据级别记录日志
if level == "INFO":
execution_logger.info(message)
elif level == "WARNING":
execution_logger.warning(message)
elif level == "ERROR":
execution_logger.error(message)
elif level == "DEBUG":
execution_logger.debug(message)
else:
execution_logger.info(message) # 默认使用INFO级别
return True
实现任务调度器
接下来,我们实现任务调度器,它负责管理任务的计划执行。在services/scheduler.py中:
import time
from datetime import datetime
from typing import Dict, List, Optional, Any, Callable, Union
import threading
import asyncio
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from database.db_manager import DatabaseManager
from database.models import Task
from services.task_service import TaskService
from utils.logger import TaskLogger
class Scheduler:
def __init__(self, task_service: TaskService, execution_callback: Callable[[str], None]):
self.task_service = task_service
self.execution_callback = execution_callback
self.logger = TaskLogger()
# 创建APScheduler
self.scheduler = BackgroundScheduler()
self.scheduler.start()
# 存储任务ID到作业ID的映射
self.task_jobs = {}
self.logger.info("Scheduler initialized")
def schedule_task(self, task: Task) -> bool:
"""根据任务配置调度任务"""
if not task.enabled:
self.logger.info(f"Task {task.id} is disabled, skipping scheduling")
return False
# 如果任务已经调度,先移除旧的调度
if task.id in self.task_jobs:
self.remove_task(task.id)
# 根据调度类型创建触发器
schedule_type = task.schedule.get('type')
if schedule_type == 'interval':
# 创建间隔触发器
interval_seconds = task.schedule.get('value')
if not isinstance(interval_seconds, int) or interval_seconds <= 0:
self.logger.error(f"Invalid interval for task {task.id}: {interval_seconds}")
return False
trigger = IntervalTrigger(seconds=interval_seconds)
elif schedule_type == 'cron':
# 创建cron触发器
cron_expression = task.schedule.get('value')
if not isinstance(cron_expression, str):
self.logger.error(f"Invalid cron expression for task {task.id}: {cron_expression}")
return False
try:
# 解析cron表达式
trigger = CronTrigger.from_crontab(cron_expression)
except Exception as e:
self.logger.error(f"Error parsing cron expression for task {task.id}: {str(e)}")
return False
elif schedule_type == 'one_time':
# 创建一次性触发器
run_time = task.schedule.get('value')
if isinstance(run_time, str):
try:
# 尝试解析时间字符串
run_time = datetime.fromisoformat(run_time)
except ValueError:
self.logger.error(f"Invalid datetime format for task {task.id}: {run_time}")
return False
elif isinstance(run_time, int):
# 假设是Unix时间戳
run_time = datetime.fromtimestamp(run_time)
else:
self.logger.error(f"Invalid run_time type for task {task.id}: {type(run_time)}")
return False
# 检查时间是否已过
if run_time < datetime.now():
self.logger.warning(f"Run time for task {task.id} already passed: {run_time}")
return False
trigger = DateTrigger(run_date=run_time)
elif schedule_type == 'trigger':
# 触发型任务不会被自动调度,需要手动触发
self.logger.info(f"Task {task.id} is trigger-based, not scheduled automatically")
return True
else:
self.logger.error(f"Unknown schedule type for task {task.id}: {schedule_type}")
return False
# 添加任务到调度器
job = self.scheduler.add_job(
self._execute_task,
trigger=trigger,
args=[task.id],
id=f"task_{task.id}",
replace_existing=True
)
# 存储作业ID
self.task_jobs[task.id] = job.id
self.logger.info(f"Scheduled task {task.id} with schedule type: {schedule_type}")
return True
def remove_task(self, task_id: str) -> bool:
"""从调度器中移除任务"""
if task_id not in self.task_jobs:
return False
job_id = self.task_jobs[task_id]
try:
self.scheduler.remove_job(job_id)
del self.task_jobs[task_id]
self.logger.info(f"Removed task {task_id} from scheduler")
return True
except Exception as e:
self.logger.error(f"Error removing task {task_id} from scheduler: {str(e)}")
return False
def _execute_task(self, task_id: str):
"""调度器触发任务执行的回调函数"""
try:
# 检查任务是否仍然存在且已启用
task = self.task_service.get_task(task_id)
if not task:
self.logger.warning(f"Task {task_id} no longer exists, removing from scheduler")
self.remove_task(task_id)
return
if not task.enabled:
self.logger.info(f"Task {task_id} is disabled, skipping execution")
return
# 调用执行回调函数
self.execution_callback(task_id)
except Exception as e:
self.logger.error(f"Error in scheduler while executing task {task_id}: {str(e)}")
def trigger_task(self, task_id: str) -> bool:
"""手动触发任务执行"""
try:
# 检查任务是否存在且已启用
task = self.task_service.get_task(task_id)
if not task:
self.logger.warning(f"Cannot trigger task {task_id}: Task not found")
return False
if not task.enabled:
self.logger.info(f"Cannot trigger task {task_id}: Task is disabled")
return False
# 直接调用执行回调函数
self.execution_callback(task_id)
self.logger.info(f"Manually triggered task {task_id}")
return True
except Exception as e:
self.logger.error(f"Error triggering task {task_id}: {str(e)}")
return False
def reload_all_tasks(self):
"""重新加载并调度所有已启用的任务"""
# 清除当前所有调度的任务
for task_id in list(self.task_jobs.keys()):
self.remove_task(task_id)
# 获取所有已启用的任务
tasks = self.task_service.list_tasks(enabled=True)
for task in tasks:
self.schedule_task(task)
self.logger.info(f"Reloaded {len(tasks)} tasks into scheduler")
def shutdown(self):
"""关闭调度器"""
if self.scheduler.running:
self.scheduler.shutdown()
self.logger.info("Scheduler shut down")
在下一部分,我们将实现执行引擎和任务适配器,以便能够执行不同类型的自动化任务。
实现执行引擎
现在,我们实现任务执行引擎,它将根据任务类型调用适当的适配器来执行任务。在services/executor.py中:
import time
import threading
import importlib
from typing import Dict, Any, Optional, List, Type
import concurrent.futures
from database.models import Task, Execution
from services.task_service import TaskService
from utils.logger import TaskLogger
class TaskExecutor:
def __init__(self, task_service: TaskService):
self.task_service = task_service
self.logger = TaskLogger()
self.adapters = {}
self.running_executions = {}
self.thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
# 加载适配器
self._load_adapters()
def _load_adapters(self):
"""加载所有可用的任务适配器"""
try:
# 脚本适配器
from adapters.script_adapter import ScriptAdapter
self.adapters['script'] = ScriptAdapter
# API适配器
from adapters.api_adapter import ApiAdapter
self.adapters['api'] = ApiAdapter
# Web适配器
from adapters.web_adapter import WebAdapter
self.adapters['web'] = WebAdapter
# 邮件适配器
from adapters.email_adapter import EmailAdapter
self.adapters['email'] = EmailAdapter
self.logger.info(f"Loaded {len(self.adapters)} task adapters")
except ImportError as e:
self.logger.error(f"Error loading adapters: {str(e)}")
def execute_task(self, task_id: str) -> Optional[str]:
"""执行指定的任务,返回执行ID"""
# 获取任务
task = self.task_service.get_task(task_id)
if not task:
self.logger.error(f"Cannot execute task: Task not found: {task_id}")
return None
# 检查任务是否已启用
if not task.enabled:
self.logger.warning(f"Cannot execute disabled task: {task_id}")
return None
# 创建执行记录
execution = self.task_service.create_execution(task_id)
if not execution:
self.logger.error(f"Failed to create execution record for task: {task_id}")
return None
# 更新执行状态为运行中
self.task_service.update_execution(
execution_id=execution.id,
status="running"
)
# 启动执行线程
self.thread_pool.submit(self._execute_task_thread, task, execution)
self.logger.info(f"Started execution {execution.id} for task {task.name}")
return execution.id
def _execute_task_thread(self, task: Task, execution: Execution):
"""在线程中执行任务"""
execution_id = execution.id
try:
# 记录任务开始
self.task_service.add_execution_log(
execution_id=execution_id,
message=f"Starting task: {task.name}"
)
# 检查是否有适配器可用
if task.type not in self.adapters:
raise ValueError(f"No adapter available for task type: {task.type}")
# 创建适配器实例
adapter_class = self.adapters[task.type]
adapter = adapter_class(
logger=TaskLogger(self.task_service.db_manager, execution_id)
)
# 存储正在运行的执行
self.running_executions[execution_id] = {
'task': task,
'adapter': adapter,
'start_time': time.time()
}
# 执行任务
result = adapter.execute(task.config)
# 更新执行状态为成功
self.task_service.update_execution(
execution_id=execution_id,
status="success",
result=result,
end_time=int(time.time())
)
# 记录任务完成
self.task_service.add_execution_log(
execution_id=execution_id,
message=f"Task completed successfully"
)
except Exception as e:
# 记录错误
error_message = f"Task execution failed: {str(e)}"
self.logger.error(error_message)
# 更新执行状态为失败
self.task_service.update_execution(
execution_id=execution_id,
status="failed",
error=error_message,
end_time=int(time.time())
)
# 记录任务失败
self.task_service.add_execution_log(
execution_id=execution_id,
message=error_message,
level="ERROR"
)
finally:
# 清理
if execution_id in self.running_executions:
del self.running_executions[execution_id]
def cancel_execution(self, execution_id: str) -> bool:
"""取消正在运行的任务执行"""
if execution_id not in self.running_executions:
self.logger.warning(f"Cannot cancel: Execution not running or not found: {execution_id}")
return False
try:
# 获取执行信息
execution_info = self.running_executions[execution_id]
adapter = execution_info.get('adapter')
# 尝试通知适配器取消操作
if hasattr(adapter, 'cancel') and callable(adapter.cancel):
adapter.cancel()
# 更新执行状态
self.task_service.update_execution(
execution_id=execution_id,
status="cancelled",
end_time=int(time.time())
)
# 记录取消
self.task_service.add_execution_log(
execution_id=execution_id,
message="Execution cancelled by user",
level="WARNING"
)
# 清理
del self.running_executions[execution_id]
self.logger.info(f"Cancelled execution: {execution_id}")
return True
except Exception as e:
self.logger.error(f"Error cancelling execution {execution_id}: {str(e)}")
return False
def get_execution_status(self, execution_id: str) -> Dict[str, Any]:
"""获取执行的当前状态"""
# 检查是否是正在运行的执行
is_running = execution_id in self.running_executions
# 获取执行记录
execution = self.task_service.get_execution(execution_id)
if not execution:
return {
"found": False,
"message": f"Execution not found: {execution_id}"
}
# 计算运行时间
running_time = None
if execution.start_time:
if execution.end_time:
running_time = execution.end_time - execution.start_time
elif is_running:
running_time = int(time.time()) - execution.start_time
return {
"found": True,
"execution_id": execution.id,
"task_id": execution.task_id,
"status": execution.status,
"is_running": is_running,
"start_time": execution.start_time,
"end_time": execution.end_time,
"running_time": running_time,
"has_error": bool(execution.error),
"logs_count": len(execution.logs) if execution.logs else 0
}
def shutdown(self):
"""关闭执行器"""
# 取消所有正在运行的执行
for execution_id in list(self.running_executions.keys()):
self.cancel_execution(execution_id)
# 关闭线程池
self.thread_pool.shutdown()
self.logger.info("Task executor shut down")
实现任务适配器
接下来,我们需要实现各种任务适配器,以支持不同类型的自动化任务。首先,让我们创建一个脚本执行适配器。在adapters/script_adapter.py中:
import subprocess
import os
import tempfile
import sys
import time
from typing import Dict, Any, Optional, List, Union
from utils.logger import TaskLogger
class ScriptAdapter:
def __init__(self, logger: TaskLogger):
self.logger = logger
self.process = None
def execute(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""执行脚本任务"""
# 验证配置
self._validate_config(config)
# 获取脚本类型和内容
script_type = config.get('script_type', 'shell')
script_content = config.get('script')
timeout = config.get('timeout', 3600) # 默认超时1小时
# 记录开始执行
self.logger.info(f"Executing {script_type} script")
# 根据脚本类型执行
if script_type == 'shell':
return self._execute_shell_script(script_content, timeout, config)
elif script_type == 'python':
return self._execute_python_script(script_content, timeout, config)
else:
raise ValueError(f"Unsupported script type: {script_type}")
def _validate_config(self, config: Dict[str, Any]):
"""验证脚本配置"""
if 'script' not in config:
raise ValueError("Script content is required")
script_type = config.get('script_type', 'shell')
if script_type not in ['shell', 'python']:
raise ValueError(f"Unsupported script type: {script_type}")
def _execute_shell_script(self, script: str, timeout: int, config: Dict[str, Any]) -> Dict[str, Any]:
"""执行Shell脚本"""
# 创建临时脚本文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f:
f.write(script)
script_path = f.name
try:
# 设置执行权限
os.chmod(script_path, 0o755)
# 准备环境变量
env = os.environ.copy()
if 'env' in config and isinstance(config['env'], dict):
env.update(config['env'])
# 执行脚本
start_time = time.time()
self.process = subprocess.Popen(
['bash', script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True
)
# 实时获取输出
stdout, stderr = [], []
while self.process.poll() is None:
# 检查超时
if time.time() - start_time > timeout:
self.process.terminate()
raise TimeoutError(f"Script execution timed out after {timeout} seconds")
# 读取输出
stdout_line = self.process.stdout.readline()
if stdout_line:
line = stdout_line.rstrip()
stdout.append(line)
self.logger.info(f"[STDOUT] {line}")
stderr_line = self.process.stderr.readline()
if stderr_line:
line = stderr_line.rstrip()
stderr.append(line)
self.logger.warning(f"[STDERR] {line}")
if not stdout_line and not stderr_line:
time.sleep(0.1)
# 获取剩余输出
for line in self.process.stdout:
stdout.append(line.rstrip())
self.logger.info(f"[STDOUT] {line.rstrip()}")
for line in self.process.stderr:
stderr.append(line.rstrip())
self.logger.warning(f"[STDERR] {line.rstrip()}")
# 检查返回值
return_code = self.process.returncode
if return_code != 0:
self.logger.error(f"Script failed with return code {return_code}")
raise subprocess.CalledProcessError(return_code, 'bash')
# 计算运行时间
run_time = time.time() - start_time
return {
"success": True,
"return_code": return_code,
"stdout": "\n".join(stdout),
"stderr": "\n".join(stderr),
"run_time": run_time
}
except subprocess.CalledProcessError as e:
return {
"success": False,
"return_code": e.returncode,
"stdout": "\n".join(stdout) if 'stdout' in locals() else "",
"stderr": "\n".join(stderr) if 'stderr' in locals() else "",
"error": str(e)
}
except Exception as e:
self.logger.error(f"Error executing shell script: {str(e)}")
return {
"success": False,
"error": str(e)
}
finally:
# 清理临时文件
try:
os.unlink(script_path)
except:
pass
def _execute_python_script(self, script: str, timeout: int, config: Dict[str, Any]) -> Dict[str, Any]:
"""执行Python脚本"""
# 创建临时脚本文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(script)
script_path = f.name
try:
# 准备环境变量
env = os.environ.copy()
if 'env' in config and isinstance(config['env'], dict):
env.update(config['env'])
# 使用当前Python解释器路径
python_path = sys.executable
# 执行脚本
start_time = time.time()
self.process = subprocess.Popen(
[python_path, script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True
)
# 实时获取输出
stdout, stderr = [], []
while self.process.poll() is None:
# 检查超时
if time.time() - start_time > timeout:
self.process.terminate()
raise TimeoutError(f"Script execution timed out after {timeout} seconds")
# 读取输出
stdout_line = self.process.stdout.readline()
if stdout_line:
line = stdout_line.rstrip()
stdout.append(line)
self.logger.info(f"[STDOUT] {line}")
stderr_line = self.process.stderr.readline()
if stderr_line:
line = stderr_line.rstrip()
stderr.append(line)
self.logger.warning(f"[STDERR] {line}")
if not stdout_line and not stderr_line:
time.sleep(0.1)
# 获取剩余输出
for line in self.process.stdout:
stdout.append(line.rstrip())
self.logger.info(f"[STDOUT] {line.rstrip()}")
for line in self.process.stderr:
stderr.append(line.rstrip())
self.logger.warning(f"[STDERR] {line.rstrip()}")
# 检查返回值
return_code = self.process.returncode
if return_code != 0:
self.logger.error(f"Script failed with return code {return_code}")
raise subprocess.CalledProcessError(return_code, python_path)
# 计算运行时间
run_time = time.time() - start_time
return {
"success": True,
"return_code": return_code,
"stdout": "\n".join(stdout),
"stderr": "\n".join(stderr),
"run_time": run_time
}
except subprocess.CalledProcessError as e:
return {
"success": False,
"return_code": e.returncode,
"stdout": "\n".join(stdout) if 'stdout' in locals() else "",
"stderr": "\n".join(stderr) if 'stderr' in locals() else "",
"error": str(e)
}
except Exception as e:
self.logger.error(f"Error executing Python script: {str(e)}")
return {
"success": False,
"error": str(e)
}
finally:
# 清理临时文件
try:
os.unlink(script_path)
except:
pass
def cancel(self):
"""取消正在运行的脚本"""
if self.process and self.process.poll() is None:
try:
self.process.terminate()
self.logger.warning("Script execution terminated")
except:
try:
self.process.kill()
self.logger.warning("Script execution killed")
except:
self.logger.error("Failed to terminate script execution")
类似地,我们可以实现其他类型的适配器,如API适配器、Web自动化适配器和邮件适配器。由于篇幅限制,这里我们只展示了脚本适配器的实现。完整的项目代码会包含所有适配器的实现。
在下一个部分,我们将实现状态跟踪和报告功能,以监控和分析任务执行情况。
步骤2:实现状态跟踪与报告
在前面的步骤中,我们已经构建了任务自动化的核心功能。现在,我们将实现状态跟踪与报告功能,以便用户能够监控任务执行情况并分析其性能。
创建任务监控服务
首先,让我们实现一个任务监控服务,用于跟踪和分析任务执行状态。在services/monitor_service.py中:
import time
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from collections import defaultdict
from database.db_manager import DatabaseManager
from services.task_service import TaskService
class MonitorService:
def __init__(self, db_manager: DatabaseManager, task_service: TaskService):
self.db_manager = db_manager
self.task_service = task_service
def get_tasks_overview(self) -> Dict[str, Any]:
"""获取所有任务的概览信息"""
tasks = self.task_service.list_tasks()
# 计算各类任务的数量
task_counts = {
"total": len(tasks),
"enabled": sum(1 for task in tasks if task.enabled),
"disabled": sum(1 for task in tasks if not task.enabled),
"by_type": defaultdict(int),
"by_tag": defaultdict(int)
}
# 计算各类型和标签的任务数量
for task in tasks:
task_counts["by_type"][task.type] += 1
for tag in task.tags:
task_counts["by_tag"][tag] += 1
# 转换defaultdict为普通dict以便序列化
task_counts["by_type"] = dict(task_counts["by_type"])
task_counts["by_tag"] = dict(task_counts["by_tag"])
return task_counts
def get_executions_overview(self, days: int = 7) -> Dict[str, Any]:
"""获取指定天数内的执行概览"""
# 计算起始时间
start_time = int((datetime.now() - timedelta(days=days)).timestamp())
# 获取所有执行记录
all_executions = self.db_manager.list_executions()
# 筛选指定时间范围内的执行
recent_executions = [
exe for exe in all_executions
if exe.start_time >= start_time
]
# 计算各状态执行的数量
execution_counts = {
"total": len(recent_executions),
"success": sum(1 for exe in recent_executions if exe.status == "success"),
"failed": sum(1 for exe in recent_executions if exe.status == "failed"),
"cancelled": sum(1 for exe in recent_executions if exe.status == "cancelled"),
"running": sum(1 for exe in recent_executions if exe.status == "running"),
"pending": sum(1 for exe in recent_executions if exe.status == "pending")
}
# 计算每天的执行数量
daily_counts = defaultdict(lambda: {
"total": 0, "success": 0, "failed": 0, "cancelled": 0
})
for exe in recent_executions:
day = datetime.fromtimestamp(exe.start_time).strftime('%Y-%m-%d')
daily_counts[day]["total"] += 1
if exe.status in ["success", "failed", "cancelled"]:
daily_counts[day][exe.status] += 1
# 转换为列表格式,按日期排序
daily_data = [
{"date": date, **counts}
for date, counts in sorted(daily_counts.items())
]
# 计算任务执行时间统计
execution_times = []
for exe in recent_executions:
if exe.status == "success" and exe.end_time and exe.start_time:
execution_time = exe.end_time - exe.start_time
execution_times.append(execution_time)
time_stats = {}
if execution_times:
time_stats = {
"avg": sum(execution_times) / len(execution_times),
"min": min(execution_times),
"max": max(execution_times),
"total": sum(execution_times)
}
return {
"counts": execution_counts,
"daily": daily_data,
"time_stats": time_stats,
"period_days": days
}
def get_task_performance(self, task_id: str, limit: int = 10) -> Dict[str, Any]:
"""获取指定任务的性能数据"""
task = self.task_service.get_task(task_id)
if not task:
return {"error": f"Task not found: {task_id}"}
# 获取任务的执行记录
executions = self.db_manager.list_executions(task_id=task_id)
# 按开始时间排序(降序)
executions.sort(key=lambda x: x.start_time, reverse=True)
# 限制返回数量
if limit > 0:
executions = executions[:limit]
# 计算成功率
total_count = len(executions)
success_count = sum(1 for exe in executions if exe.status == "success")
success_rate = success_count / total_count if total_count > 0 else 0
# 计算平均执行时间
execution_times = []
for exe in executions:
if exe.status == "success" and exe.end_time and exe.start_time:
execution_time = exe.end_time - exe.start_time
execution_times.append(execution_time)
avg_time = sum(execution_times) / len(execution_times) if execution_times else 0
# 提取详细执行信息
execution_details = []
for exe in executions:
detail = {
"id": exe.id,
"status": exe.status,
"start_time": exe.start_time,
"end_time": exe.end_time,
"run_time": exe.end_time - exe.start_time if exe.end_time and exe.start_time else None,
"has_error": bool(exe.error),
"logs_count": len(exe.logs) if exe.logs else 0
}
execution_details.append(detail)
return {
"task_id": task_id,
"task_name": task.name,
"task_type": task.type,
"enabled": task.enabled,
"total_executions": total_count,
"success_count": success_count,
"success_rate": success_rate,
"avg_execution_time": avg_time,
"last_execution_time": executions[0].start_time if executions else None,
"executions": execution_details
}
def generate_execution_report(self, execution_id: str) -> Dict[str, Any]:
"""为特定执行生成详细报告"""
execution = self.db_manager.get_execution(execution_id)
if not execution:
return {"error": f"Execution not found: {execution_id}"}
task = self.task_service.get_task(execution.task_id)
if not task:
return {"error": f"Task not found for execution: {execution_id}"}
# 计算执行时间
run_time = None
if execution.start_time and execution.end_time:
run_time = execution.end_time - execution.start_time
# 提取日志中的级别统计
log_stats = {"INFO": 0, "WARNING": 0, "ERROR": 0, "DEBUG": 0}
if execution.logs:
for log in execution.logs:
for level in log_stats.keys():
if f"[{level}]" in log:
log_stats[level] += 1
break
# 根据任务类型提取特定结果数据
result_data = {}
if execution.result:
if task.type == "script":
result_data = {
"return_code": execution.result.get("return_code"),
"success": execution.result.get("success", False),
"run_time": execution.result.get("run_time")
}
elif task.type == "api":
result_data = {
"status_code": execution.result.get("status_code"),
"success": execution.result.get("success", False),
"response_time": execution.result.get("response_time")
}
# 生成报告
report = {
"execution_id": execution_id,
"task_id": execution.task_id,
"task_name": task.name,
"task_type": task.type,
"status": execution.status,
"start_time": execution.start_time,
"end_time": execution.end_time,
"run_time": run_time,
"result": result_data,
"log_stats": log_stats,
"error": execution.error,
"timestamp": int(time.time()),
"logs": execution.logs
}
return report
def get_system_health(self) -> Dict[str, Any]:
"""获取系统健康状态"""
tasks = self.task_service.list_tasks()
all_executions = self.db_manager.list_executions()
# 计算最近24小时的执行情况
now = int(time.time())
day_ago = now - (24 * 60 * 60)
recent_executions = [exe for exe in all_executions if exe.start_time >= day_ago]
# 计算失败率
recent_completed = [exe for exe in recent_executions
if exe.status in ["success", "failed"]]
recent_failed = [exe for exe in recent_completed if exe.status == "failed"]
failure_rate = (len(recent_failed) / len(recent_completed)
if recent_completed else 0)
# 计算当前运行中的任务
running_executions = [exe for exe in all_executions if exe.status == "running"]
# 计算最近的任务活动
sorted_executions = sorted(recent_executions,
key=lambda x: x.start_time,
reverse=True)
recent_activity = sorted_executions[:5] # 最近5个执行
return {
"tasks_count": len(tasks),
"enabled_tasks": sum(1 for task in tasks if task.enabled),
"recent_executions": len(recent_executions),
"recent_success": sum(1 for exe in recent_executions if exe.status == "success"),
"recent_failed": len(recent_failed),
"failure_rate": failure_rate,
"running_now": len(running_executions),
"last_execution_time": sorted_executions[0].start_time if sorted_executions else None,
"recent_activity": [
{
"id": exe.id,
"task_id": exe.task_id,
"status": exe.status,
"start_time": exe.start_time
} for exe in recent_activity
]
}
创建报告生成器
接下来,我们实现一个报告生成器,用于生成各种格式的报告。在utils/report_generator.py中:
import json
import csv
import os
import io
import time
from datetime import datetime
from typing import Dict, List, Any, Optional, Union, TextIO
from services.monitor_service import MonitorService
class ReportGenerator:
def __init__(self, monitor_service: MonitorService):
self.monitor_service = monitor_service
def generate_system_report(self, format: str = "json") -> Union[str, bytes]:
"""生成系统概览报告"""
# 获取数据
tasks_overview = self.monitor_service.get_tasks_overview()
executions_overview = self.monitor_service.get_executions_overview(days=30)
system_health = self.monitor_service.get_system_health()
# 组合报告数据
report_data = {
"timestamp": int(time.time()),
"generated_at": datetime.now().isoformat(),
"tasks": tasks_overview,
"executions": executions_overview,
"system_health": system_health
}
# 根据格式生成报告
if format == "json":
return self._to_json(report_data)
elif format == "csv":
return self._to_csv_system_report(report_data)
else:
raise ValueError(f"Unsupported report format: {format}")
def generate_task_report(self, task_id: str, format: str = "json") -> Union[str, bytes]:
"""生成特定任务的报告"""
# 获取任务性能数据
task_performance = self.monitor_service.get_task_performance(task_id, limit=20)
# 检查是否有错误
if "error" in task_performance:
return self._to_json({"error": task_performance["error"]})
# 根据格式生成报告
if format == "json":
return self._to_json(task_performance)
elif format == "csv":
return self._to_csv_task_report(task_performance)
else:
raise ValueError(f"Unsupported report format: {format}")
def generate_execution_report(self, execution_id: str, format: str = "json") -> Union[str, bytes]:
"""生成特定执行的报告"""
# 获取执行报告数据
execution_report = self.monitor_service.generate_execution_report(execution_id)
# 检查是否有错误
if "error" in execution_report:
return self._to_json({"error": execution_report["error"]})
# 根据格式生成报告
if format == "json":
return self._to_json(execution_report)
elif format == "csv":
return self._to_csv_execution_report(execution_report)
elif format == "text":
return self._to_text_execution_report(execution_report)
else:
raise ValueError(f"Unsupported report format: {format}")
def _to_json(self, data: Dict[str, Any]) -> str:
"""将数据转换为JSON格式"""
return json.dumps(data, indent=2)
def _to_csv_system_report(self, data: Dict[str, Any]) -> bytes:
"""将系统报告数据转换为CSV格式"""
output = io.StringIO()
writer = csv.writer(output)
# 写入标题
writer.writerow(["System Report", f"Generated at: {data['generated_at']}"])
writer.writerow([])
# 写入任务概览
writer.writerow(["Tasks Overview"])
writer.writerow(["Total", "Enabled", "Disabled"])
writer.writerow([
data["tasks"]["total"],
data["tasks"]["enabled"],
data["tasks"]["disabled"]
])
writer.writerow([])
# 写入任务类型分布
writer.writerow(["Tasks by Type"])
for task_type, count in data["tasks"]["by_type"].items():
writer.writerow([task_type, count])
writer.writerow([])
# 写入执行概览
writer.writerow(["Executions Overview (Last 30 Days)"])
writer.writerow(["Total", "Success", "Failed", "Cancelled", "Running"])
writer.writerow([
data["executions"]["counts"]["total"],
data["executions"]["counts"]["success"],
data["executions"]["counts"]["failed"],
data["executions"]["counts"]["cancelled"],
data["executions"]["counts"]["running"]
])
writer.writerow([])
# 写入每日执行统计
writer.writerow(["Daily Executions"])
writer.writerow(["Date", "Total", "Success", "Failed", "Cancelled"])
for daily in data["executions"]["daily"]:
writer.writerow([
daily["date"],
daily["total"],
daily["success"],
daily["failed"],
daily["cancelled"]
])
writer.writerow([])
# 写入系统健康状态
writer.writerow(["System Health"])
writer.writerow(["Metric", "Value"])
writer.writerow(["Enabled Tasks", data["system_health"]["enabled_tasks"]])
writer.writerow(["Recent Executions (24h)", data["system_health"]["recent_executions"]])
writer.writerow(["Recent Success (24h)", data["system_health"]["recent_success"]])
writer.writerow(["Recent Failed (24h)", data["system_health"]["recent_failed"]])
writer.writerow(["Failure Rate (24h)", f"{data['system_health']['failure_rate']:.2%}"])
writer.writerow(["Currently Running", data["system_health"]["running_now"]])
return output.getvalue().encode('utf-8')
def _to_csv_task_report(self, data: Dict[str, Any]) -> bytes:
"""将任务报告数据转换为CSV格式"""
output = io.StringIO()
writer = csv.writer(output)
# 写入任务信息
writer.writerow(["Task Report", f"Generated at: {datetime.now().isoformat()}"])
writer.writerow(["Task ID", data["task_id"]])
writer.writerow(["Task Name", data["task_name"]])
writer.writerow(["Task Type", data["task_type"]])
writer.writerow(["Enabled", "Yes" if data["enabled"] else "No"])
writer.writerow([])
# 写入性能概览
writer.writerow(["Performance Overview"])
writer.writerow(["Total Executions", data["total_executions"]])
writer.writerow(["Success Count", data["success_count"]])
writer.writerow(["Success Rate", f"{data['success_rate']:.2%}"])
writer.writerow(["Average Execution Time", f"{data['avg_execution_time']:.2f}s"])
writer.writerow([])
# 写入执行历史
writer.writerow(["Execution History"])
writer.writerow(["ID", "Status", "Start Time", "End Time", "Run Time", "Error"])
for exe in data["executions"]:
writer.writerow([
exe["id"],
exe["status"],
datetime.fromtimestamp(exe["start_time"]).isoformat() if exe["start_time"] else "",
datetime.fromtimestamp(exe["end_time"]).isoformat() if exe.get("end_time") else "",
f"{exe['run_time']:.2f}s" if exe.get("run_time") else "",
"Yes" if exe["has_error"] else "No"
])
return output.getvalue().encode('utf-8')
def _to_csv_execution_report(self, data: Dict[str, Any]) -> bytes:
"""将执行报告数据转换为CSV格式"""
output = io.StringIO()
writer = csv.writer(output)
# 写入执行信息
writer.writerow(["Execution Report", f"Generated at: {datetime.now().isoformat()}"])
writer.writerow(["Execution ID", data["execution_id"]])
writer.writerow(["Task ID", data["task_id"]])
writer.writerow(["Task Name", data["task_name"]])
writer.writerow(["Task Type", data["task_type"]])
writer.writerow(["Status", data["status"]])
writer.writerow([
"Start Time",
datetime.fromtimestamp(data["start_time"]).isoformat() if data["start_time"] else ""
])
writer.writerow([
"End Time",
datetime.fromtimestamp(data["end_time"]).isoformat() if data.get("end_time") else ""
])
writer.writerow(["Run Time", f"{data['run_time']}s" if data.get("run_time") else ""])
writer.writerow([])
# 写入结果信息
writer.writerow(["Result Details"])
for key, value in data.get("result", {}).items():
writer.writerow([key, value])
writer.writerow([])
# 写入日志统计
writer.writerow(["Log Statistics"])
for level, count in data.get("log_stats", {}).items():
writer.writerow([level, count])
writer.writerow([])
# 写入错误信息
if data.get("error"):
writer.writerow(["Error"])
writer.writerow([data["error"]])
writer.writerow([])
# 写入日志
writer.writerow(["Logs"])
for log in data.get("logs", []):
writer.writerow([log])
return output.getvalue().encode('utf-8')
def _to_text_execution_report(self, data: Dict[str, Any]) -> str:
"""将执行报告数据转换为文本格式"""
lines = []
# 添加标题
lines.append("=" * 80)
lines.append(f"EXECUTION REPORT - {data['execution_id']}")
lines.append("=" * 80)
lines.append("")
# 添加基本信息
lines.append(f"Task: {data['task_name']} ({data['task_id']})")
lines.append(f"Type: {data['task_type']}")
lines.append(f"Status: {data['status']}")
start_time = datetime.fromtimestamp(data["start_time"]).isoformat() if data["start_time"] else "N/A"
lines.append(f"Start Time: {start_time}")
end_time = datetime.fromtimestamp(data["end_time"]).isoformat() if data.get("end_time") else "N/A"
lines.append(f"End Time: {end_time}")
run_time = f"{data['run_time']}s" if data.get("run_time") else "N/A"
lines.append(f"Run Time: {run_time}")
lines.append("")
# 添加结果信息
lines.append("-" * 80)
lines.append("RESULT DETAILS")
lines.append("-" * 80)
for key, value in data.get("result", {}).items():
lines.append(f"{key}: {value}")
lines.append("")
# 添加日志统计
lines.append("-" * 80)
lines.append("LOG STATISTICS")
lines.append("-" * 80)
for level, count in data.get("log_stats", {}).items():
lines.append(f"{level}: {count}")
lines.append("")
# 添加错误信息
if data.get("error"):
lines.append("-" * 80)
lines.append("ERROR")
lines.append("-" * 80)
lines.append(data["error"])
lines.append("")
# 添加日志
lines.append("-" * 80)
lines.append("LOGS")
lines.append("-" * 80)
for log in data.get("logs", []):
lines.append(log)
return "\n".join(lines)
通过这些组件,我们实现了完整的任务状态跟踪和报告功能。在下一部分,我们将实现日历和通知功能,以保持用户及时了解任务执行情况。
步骤3:集成日历和通知功能
最后,我们将为自动化助手添加日历集成和通知功能,使其能够与用户的日历同步任务信息,并通过多种渠道发送通知。
实现日历集成
首先,我们创建一个日历服务,用于将任务与Google日历同步。在services/calendar_service.py中:
import os
import json
import time
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import pickle
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from database.db_manager import DatabaseManager
from database.models import Task
from utils.logger import TaskLogger
class CalendarService:
def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager
self.logger = TaskLogger()
self.service = None
# 初始化日历API
self._init_calendar_api()
def _init_calendar_api(self):
"""初始化Google日历API"""
try:
# 如果环境变量中没有设置凭据,则跳过初始化
if not os.environ.get('GOOGLE_CALENDAR_CREDENTIALS'):
self.logger.warning("Google Calendar credentials not found, calendar integration disabled")
return
creds = None
# 尝试从token文件加载凭据
token_path = 'data/token.pickle'
if os.path.exists(token_path):
with open(token_path, 'rb') as token:
creds = pickle.load(token)
# 如果没有有效凭据,则尝试刷新或获取新凭据
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
# 从环境变量加载凭据
creds_json = json.loads(os.environ['GOOGLE_CALENDAR_CREDENTIALS'])
flow = InstalledAppFlow.from_client_config(
creds_json, ['https://www.googleapis.com/auth/calendar']
)
creds = flow.run_local_server(port=0)
# 保存凭据以备后用
with open(token_path, 'wb') as token:
pickle.dump(creds, token)
# 构建日历服务
self.service = build('calendar', 'v3', credentials=creds)
self.logger.info("Google Calendar API initialized successfully")
except Exception as e:
self.logger.error(f"Error initializing Google Calendar API: {str(e)}")
self.service = None
def sync_task_to_calendar(self, task: Task, calendar_id: str = 'primary') -> Dict[str, Any]:
"""将任务同步到日历"""
if not self.service:
return {"success": False, "error": "Calendar service not initialized"}
try:
# 检查任务是否已有日历事件
task_config = task.config
calendar_event_id = task_config.get('calendar_event_id')
# 准备事件数据
event_data = self._prepare_event_data(task)
if calendar_event_id:
# 更新现有事件
event = self.service.events().update(
calendarId=calendar_id,
eventId=calendar_event_id,
body=event_data
).execute()
self.logger.info(f"Updated calendar event for task {task.id}: {event['id']}")
else:
# 创建新事件
event = self.service.events().insert(
calendarId=calendar_id,
body=event_data
).execute()
self.logger.info(f"Created calendar event for task {task.id}: {event['id']}")
# 更新任务配置,存储事件ID
task_config['calendar_event_id'] = event['id']
task.config = task_config
self.db_manager.update_task(task)
return {
"success": True,
"event_id": event['id'],
"event_link": event['htmlLink']
}
except Exception as e:
self.logger.error(f"Error syncing task {task.id} to calendar: {str(e)}")
return {
"success": False,
"error": str(e)
}
def delete_calendar_event(self, task: Task, calendar_id: str = 'primary') -> Dict[str, Any]:
"""从日历中删除任务事件"""
if not self.service:
return {"success": False, "error": "Calendar service not initialized"}
try:
# 获取事件ID
task_config = task.config
calendar_event_id = task_config.get('calendar_event_id')
if not calendar_event_id:
return {
"success": False,
"error": "Task has no associated calendar event"
}
# 删除事件
self.service.events().delete(
calendarId=calendar_id,
eventId=calendar_event_id
).execute()
# 更新任务配置,移除事件ID
del task_config['calendar_event_id']
task.config = task_config
self.db_manager.update_task(task)
self.logger.info(f"Deleted calendar event for task {task.id}: {calendar_event_id}")
return {
"success": True
}
except Exception as e:
self.logger.error(f"Error deleting calendar event for task {task.id}: {str(e)}")
return {
"success": False,
"error": str(e)
}
def _prepare_event_data(self, task: Task) -> Dict[str, Any]:
"""根据任务准备日历事件数据"""
# 确定事件开始和结束时间
start_time, end_time = self._calculate_event_times(task)
# 准备事件描述
description = f"""
Automated Task: {task.name}
Description: {task.description}
Type: {task.type}
Tags: {', '.join(task.tags)}
Schedule: {task.schedule.get('type')} - {task.schedule.get('value')}
"""
# 创建事件数据
event_data = {
'summary': f"Task: {task.name}",
'description': description.strip(),
'start': start_time,
'end': end_time,
'reminders': {
'useDefault': False,
'overrides': [
{'method': 'popup', 'minutes': 10}
]
},
'source': {
'title': 'Automation Assistant',
'url': f'http://localhost:8000/tasks/{task.id}' # 示例URL
}
}
return event_data
def _calculate_event_times(self, task: Task) -> tuple:
"""计算任务事件的开始和结束时间"""
schedule_type = task.schedule.get('type')
schedule_value = task.schedule.get('value')
now = datetime.now()
if schedule_type == 'one_time':
# 一次性任务使用指定时间
if isinstance(schedule_value, int):
start_dt = datetime.fromtimestamp(schedule_value)
else:
try:
start_dt = datetime.fromisoformat(schedule_value)
except:
start_dt = now
# 结束时间默认为开始时间后1小时
end_dt = start_dt + timedelta(hours=1)
elif schedule_type == 'cron':
# Cron任务使用下一个预计执行时间
# 简化处理,使用当前时间作为示例
start_dt = now
end_dt = now + timedelta(hours=1)
elif schedule_type == 'interval':
# 间隔任务从现在开始,持续时间为间隔
start_dt = now
interval_seconds = int(schedule_value)
end_dt = start_dt + timedelta(seconds=interval_seconds)
else:
# 默认为当前时间
start_dt = now
end_dt = now + timedelta(hours=1)
# 格式化为日历API所需格式
start_time = {
'dateTime': start_dt.isoformat(),
'timeZone': 'UTC'
}
end_time = {
'dateTime': end_dt.isoformat(),
'timeZone': 'UTC'
}
return start_time, end_time
def list_calendars(self) -> List[Dict[str, Any]]:
"""列出用户可用的日历"""
if not self.service:
return []
try:
calendars_result = self.service.calendarList().list().execute()
calendars = calendars_result.get('items', [])
result = []
for calendar in calendars:
result.append({
'id': calendar['id'],
'summary': calendar['summary'],
'description': calendar.get('description', ''),
'primary': calendar.get('primary', False)
})
return result
except Exception as e:
self.logger.error(f"Error listing calendars: {str(e)}")
return []
实现通知服务
接下来,我们实现一个通知服务,用于通过多种渠道发送通知。在services/notification_service.py中:
import os
import json
import time
import smtplib
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, List, Any, Optional, Union
from database.db_manager import DatabaseManager
from database.models import Task, Execution
from services.task_service import TaskService
from utils.logger import TaskLogger
class NotificationService:
def __init__(self, db_manager: DatabaseManager, task_service: TaskService):
self.db_manager = db_manager
self.task_service = task_service
self.logger = TaskLogger()
# 加载通知配置
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""加载通知配置"""
config = {
'email': {
'enabled': bool(os.environ.get('SMTP_HOST')),
'host': os.environ.get('SMTP_HOST', ''),
'port': int(os.environ.get('SMTP_PORT', 587)),
'username': os.environ.get('SMTP_USERNAME', ''),
'password': os.environ.get('SMTP_PASSWORD', ''),
'from_address': os.environ.get('SMTP_FROM', '')
},
'telegram': {
'enabled': bool(os.environ.get('TELEGRAM_BOT_TOKEN')),
'token': os.environ.get('TELEGRAM_BOT_TOKEN', '')
},
'webhook': {
'enabled': True # Webhook总是启用,因为它不需要额外配置
}
}
return config
def send_notification(self, channel: str, content: Dict[str, Any],
config: Dict[str, Any]) -> Dict[str, Any]:
"""发送通知"""
try:
if channel == 'email':
return self._send_email(content, config)
elif channel == 'telegram':
return self._send_telegram(content, config)
elif channel == 'webhook':
return self._send_webhook(content, config)
else:
return {
'success': False,
'error': f'Unsupported notification channel: {channel}'
}
except Exception as e:
self.logger.error(f"Error sending {channel} notification: {str(e)}")
return {
'success': False,
'error': str(e)
}
def _send_email(self, content: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
"""发送电子邮件通知"""
if not self.config['email']['enabled']:
return {
'success': False,
'error': 'Email notifications are not configured'
}
# 获取配置
to_address = config.get('to_address')
if not to_address:
return {
'success': False,
'error': 'Recipient email address not provided'
}
subject = content.get('subject', 'Automation Assistant Notification')
body_text = content.get('body', '')
body_html = content.get('html_body', '')
# 创建消息
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = self.config['email']['from_address']
msg['To'] = to_address
# 添加纯文本和HTML内容
msg.attach(MIMEText(body_text, 'plain'))
if body_html:
msg.attach(MIMEText(body_html, 'html'))
# 发送邮件
try:
with smtplib.SMTP(self.config['email']['host'], self.config['email']['port']) as server:
server.starttls()
server.login(self.config['email']['username'], self.config['email']['password'])
server.send_message(msg)
self.logger.info(f"Sent email notification to {to_address}")
return {
'success': True,
'channel': 'email',
'recipient': to_address
}
except Exception as e:
self.logger.error(f"Failed to send email: {str(e)}")
raise
def _send_telegram(self, content: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
"""发送Telegram通知"""
if not self.config['telegram']['enabled']:
return {
'success': False,
'error': 'Telegram notifications are not configured'
}
# 获取配置
chat_id = config.get('chat_id')
if not chat_id:
return {
'success': False,
'error': 'Telegram chat ID not provided'
}
message = content.get('message', '')
parse_mode = content.get('parse_mode', 'Markdown')
# 准备请求数据
api_url = f"https://api.telegram.org/bot{self.config['telegram']['token']}/sendMessage"
data = {
'chat_id': chat_id,
'text': message,
'parse_mode': parse_mode
}
# 发送请求
response = requests.post(api_url, json=data)
response.raise_for_status()
self.logger.info(f"Sent Telegram notification to chat {chat_id}")
return {
'success': True,
'channel': 'telegram',
'chat_id': chat_id,
'message_id': response.json().get('result', {}).get('message_id')
}
def _send_webhook(self, content: Dict[str, Any], config: Dict[str, Any]) -> Dict[str, Any]:
"""发送webhook通知"""
# 获取配置
webhook_url = config.get('webhook_url')
if not webhook_url:
return {
'success': False,
'error': 'Webhook URL not provided'
}
# 准备请求数据
headers = {
'Content-Type': 'application/json'
}
# 添加自定义头部
if 'headers' in config and isinstance(config['headers'], dict):
headers.update(config['headers'])
# 发送请求
response = requests.post(webhook_url, json=content, headers=headers)
response.raise_for_status()
self.logger.info(f"Sent webhook notification to {webhook_url}")
return {
'success': True,
'channel': 'webhook',
'url': webhook_url,
'status_code': response.status_code
}
def notify_task_completion(self, execution_id: str) -> Dict[str, Any]:
"""通知任务完成"""
# 获取执行记录
execution = self.task_service.get_execution(execution_id)
if not execution:
return {
'success': False,
'error': f'Execution not found: {execution_id}'
}
# 获取任务
task = self.task_service.get_task(execution.task_id)
if not task:
return {
'success': False,
'error': f'Task not found: {execution.task_id}'
}
# 检查任务是否配置了通知
task_config = task.config
notifications = task_config.get('notifications', [])
if not notifications:
return {
'success': True,
'message': 'No notifications configured for this task'
}
# 准备通知内容
base_content = self._prepare_task_notification_content(task, execution)
# 发送每个配置的通知
results = []
for notification_config in notifications:
channel = notification_config.get('channel')
if not channel:
continue
# 准备特定渠道的内容
content = base_content.copy()
channel_content = notification_config.get('content', {})
content.update(channel_content)
# 发送通知
result = self.send_notification(channel, content, notification_config)
results.append({
'channel': channel,
'result': result
})
return {
'success': True,
'notifications_sent': len(results),
'results': results
}
def _prepare_task_notification_content(self, task: Task, execution: Execution) -> Dict[str, Any]:
"""准备任务通知内容"""
# 确定任务状态
status = execution.status
status_emoji = "✅" if status == "success" else "❌" if status == "failed" else "⚠️"
# 格式化时间
start_time = datetime.fromtimestamp(execution.start_time).strftime('%Y-%m-%d %H:%M:%S')
end_time = "N/A"
duration = "N/A"
if execution.end_time:
end_time = datetime.fromtimestamp(execution.end_time).strftime('%Y-%m-%d %H:%M:%S')
duration = f"{execution.end_time - execution.start_time} seconds"
# 准备基本内容
content = {
'subject': f"{status_emoji} Task '{task.name}' {status}",
'body': f"""
Task '{task.name}' ({task.id}) {status}
Status: {status}
Start Time: {start_time}
End Time: {end_time}
Duration: {duration}
{execution.error if execution.error else ''}
""",
'html_body': f"""
<h2>{status_emoji} Task '{task.name}' {status}</h2>
<p><strong>ID:</strong> {task.id}</p>
<p><strong>Status:</strong> {status}</p>
<p><strong>Start Time:</strong> {start_time}</p>
<p><strong>End Time:</strong> {end_time}</p>
<p><strong>Duration:</strong> {duration}</p>
{f"<p style='color:red'><strong>Error:</strong> {execution.error}</p>" if execution.error else ''}
<hr>
<p><small>This is an automated notification from Automation Assistant.</small></p>
""",
'message': f"""
{status_emoji} *Task '{task.name}' {status}*
*ID:* `{task.id}`
*Status:* `{status}`
*Start Time:* `{start_time}`
*End Time:* `{end_time}`
*Duration:* `{duration}`
{f"*Error:* `{execution.error}`" if execution.error else ''}
""",
'execution_id': execution.id,
'task_id': task.id,
'task_name': task.name,
'status': status,
'start_time': execution.start_time,
'end_time': execution.end_time,
'error': execution.error
}
return content
现在,我们已经实现了日历集成和通知功能。这些组件将使自动化助手能够与用户的日历同步,并通过电子邮件、Telegram和webhook等多种渠道发送通知。
完整项目代码
在本节中,我们构建了一个功能齐全的自动化助手系统,它包含以下核心组件:
-
数据管理层
database/models.py:数据模型定义database/db_manager.py:数据库操作实现
-
核心服务层
services/task_service.py:任务管理服务services/scheduler.py:任务调度服务services/executor.py:任务执行服务services/monitor_service.py:状态监控服务services/calendar_service.py:日历集成服务services/notification_service.py:通知服务
-
适配器层
adapters/script_adapter.py:脚本执行适配器adapters/api_adapter.py:API调用适配器adapters/web_adapter.py:Web自动化适配器adapters/email_adapter.py:邮件自动化适配器
-
工具层
utils/logger.py:日志记录工具utils/report_generator.py:报告生成工具
-
MCP服务器
mcp_server.py:MCP服务器入口
所有这些组件一起工作,构成了一个强大而灵活的自动化助手系统,可以帮助用户自动执行各种重复性任务。
项目解析
自动化助手项目展示了MCP协议的强大功能和灵活性。通过实现这个项目,我们可以看到MCP如何帮助构建复杂的系统:
-
模块化设计:系统由多个独立组件组成,每个组件负责特定功能,与其他组件通过明确的接口通信。这种设计使系统易于理解、维护和扩展。
-
资源抽象:MCP资源提供了一种统一的方式来访问和管理系统数据,无论是任务、执行记录还是日历事件。
-
工具集成:MCP工具使客户端能够执行各种操作,从创建任务到生成报告,再到发送通知。这些工具形成了一个强大的API,使客户端能够与系统交互。
-
适配器模式:通过使用适配器模式,系统能够支持多种类型的任务,并且可以轻松添加新的任务类型。
-
状态跟踪:系统通过详细记录任务执行状态,使用户能够监控和分析自动化流程的性能。
-
多渠道集成:日历集成和多渠道通知使系统能够与用户的工作流程无缝集成,提高用户体验。
后续扩展
这个自动化助手系统还有许多可以扩展的方向:
-
Web界面:开发一个基于Web的用户界面,使用户能够可视化地创建、管理和监控任务。
-
更多任务类型:添加更多类型的任务适配器,如数据库操作、云服务集成等。
-
工作流支持:添加任务依赖和工作流功能,使任务能够按照特定顺序执行,并且后续任务可以依赖于前一个任务的结果。
-
集成AI:集成机器学习模型,实现智能化的任务推荐、异常检测和性能优化。
-
移动应用:开发移动应用,使用户能够随时随地管理和监控任务。
-
高级调度:增强调度系统,支持更复杂的调度策略,如负载平衡、资源分配优化等。
小结
在本节中,我们构建了一个功能齐全的自动化助手系统,它能够帮助用户自动执行日常工作中的重复性任务。通过使用MCP协议,我们实现了系统的各个组件,包括任务管理、调度、执行、状态跟踪、报告生成、日历集成和通知功能。
这个项目展示了MCP协议的强大功能和灵活性,以及如何使用MCP构建复杂的实际应用。通过实现这个项目,你学习了:
- 如何设计和实现基于MCP的模块化系统
- 如何使用资源和工具API提供统一的访问方式
- 如何实现适配器模式支持不同类型的自动化任务
- 如何设计和实现状态跟踪和报告功能
- 如何集成外部服务,如日历和通知渠道
这个自动化助手系统可以作为各种自动化应用的基础,通过进一步扩展和定制,可以适应不同领域的需求。
总结
在本章中,我们构建了一个功能全面的自动化助手系统。这个系统能够:
- 自动执行多种类型的任务:支持脚本执行、API调用、Web自动化和邮件发送等任务类型。
- 灵活调度任务:通过多种调度方式(定时、间隔、触发器)控制任务执行时间。
- 追踪执行状态:记录详细的执行日志和状态,支持实时监控。
- 生成分析报告:提供多种格式的报告,帮助分析任务执行效率和问题。
- 集成日历:将任务与Google日历同步,方便时间管理。
- 多渠道通知:通过邮件、Telegram和webhook等渠道发送任务执行通知。
通过MCP协议,我们实现了系统各组件之间的无缝通信。MCP资源定义了数据模型和访问方式,而MCP工具提供了操作这些资源的功能。系统采用了模块化设计,每个组件负责特定功能,可以独立开发和扩展。
这个自动化助手是MCP在实际项目中应用的一个优秀示例。它展示了如何使用MCP协议构建复杂、可扩展的系统,以及如何通过适配器模式和模块化设计提高系统的灵活性和可维护性。
在下一章中,我们将探索MCP的更高级应用,展示如何构建更复杂的系统和集成更多外部服务。
练习
- 扩展自动化助手,添加一个新的任务类型(如数据库操作或文件处理)。
- 实现一个简单的Web界面,用于可视化管理和监控任务。
- 添加任务依赖功能,使任务能够按特定顺序执行,并且后续任务可以依赖于前一个任务的结果。
- 实现更高级的错误处理机制,包括重试策略和异常处理。
- 集成机器学习模型,实现任务执行预测和异常检测。