Python软件项目开发是一个系统性的工程,需要遵循清晰的流程和最佳实践。以下将结合一个简单的“任务管理器”案例,详细介绍从框架搭建到打包发售的全流程。
## 1. 项目框架搭建
项目框架是项目的骨架,良好的结构能提高可维护性和可扩展性。现代Python项目通常采用标准化的目录结构。
### 1.1 标准项目结构
```python
# 项目目录结构示例
task_manager/
├── task_manager/ # 主包目录
│ ├── __init__.py # 包初始化文件
│ ├── core/ # 核心模块
│ │ ├── __init__.py
│ │ ├── models.py # 数据模型
│ │ ├── database.py # 数据库操作
│ │ └── algorithms.py # 算法实现
│ ├── ui/ # 用户界面
│ │ ├── __init__.py
│ │ ├── main_window.py
│ │ └── components.py
│ ├── utils/ # 工具函数
│ │ ├── __init__.py
│ │ ├── validators.py
│ │ └── helpers.py
│ └── tests/ # 测试代码
│ ├── __init__.py
│ ├── test_models.py
│ └── test_algorithms.py
├── requirements.txt # 依赖包列表
├── setup.py # 安装配置
├── README.md # 项目说明
├── LICENSE # 许可证
└── examples/ # 使用示例
```
### 1.2 依赖管理配置
```python
# requirements.txt 示例
# 基础依赖
python>=3.8
# 数据库
sqlalchemy>=1.4.0
# 界面库
PyQt5>=5.15.0
# 测试框架
pytest>=7.0.0
pytest-cov>=4.0.0
# 打包工具
pyinstaller>=5.0.0
# 代码质量
black>=22.0.0
flake8>=5.0.0
```
```python
# setup.py 配置示例
from setuptools import setup, find_packages
setup(
name="task_manager",
version="1.0.0",
author="Your Name",
description="A simple task management application",
packages=find_packages(),
install_requires=[
"PyQt5>=5.15.0",
"sqlalchemy>=1.4.0",
],
entry_points={
"console_scripts": [
"task-manager=task_manager.ui.main_window:main",
],
},
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: End Users/Desktop",
"Programming Language :: Python :: 3",
],
)
```
## 2. 数据结构设计
良好的数据结构设计是软件的基础。以任务管理器为例,需要设计合理的模型类[ref_4]。
### 2.1 数据模型定义
```python
# task_manager/core/models.py
from datetime import datetime
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Optional
class Priority(Enum):
"""任务优先级枚举"""
LOW = 1
MEDIUM = 2
HIGH = 3
URGENT = 4
class Status(Enum):
"""任务状态枚举"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
CANCELLED = "cancelled"
@dataclass
class Task:
"""任务数据类"""
id: int = field(default=None)
title: str = field(default="")
description: str = field(default="")
priority: Priority = field(default=Priority.MEDIUM)
status: Status = field(default=Status.PENDING)
created_at: datetime = field(default_factory=datetime.now)
due_date: Optional[datetime] = field(default=None)
tags: List[str] = field(default_factory=list)
estimated_hours: float = field(default=0.0)
actual_hours: float = field(default=0.0)
def __post_init__(self):
"""初始化后处理"""
if not self.title.strip():
raise ValueError("Task title cannot be empty")
@property
def is_overdue(self) -> bool:
"""检查是否过期"""
if self.due_date and self.status != Status.COMPLETED:
return datetime.now() > self.due_date
return False
@property
def progress(self) -> float:
"""计算进度百分比"""
if self.estimated_hours > 0:
return min(100.0, (self.actual_hours / self.estimated_hours) * 100)
return 0.0
@dataclass
class Project:
"""项目数据类"""
id: int = field(default=None)
name: str = field(default="")
description: str = field(default="")
tasks: List[Task] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
@property
def completion_rate(self) -> float:
"""计算项目完成率"""
if not self.tasks:
return 0.0
completed = sum(1 for task in self.tasks if task.status == Status.COMPLETED)
return (completed / len(self.tasks)) * 100
def add_task(self, task: Task) -> None:
"""添加任务到项目"""
self.tasks.append(task)
```
## 3. 算法优化
在任务管理器中,可能需要优化任务排序、搜索和调度算法。
### 3.1 任务排序算法
```python
# task_manager/core/algorithms.py
from typing import List, Callable
from .models import Task, Priority
from datetime import datetime
class TaskSorter:
"""任务排序器"""
@staticmethod
def sort_by_priority(tasks: List[Task], descending: bool = True) -> List[Task]:
"""按优先级排序"""
return sorted(
tasks,
key=lambda t: t.priority.value,
reverse=descending
)
@staticmethod
def sort_by_due_date(tasks: List[Task]) -> List[Task]:
"""按截止日期排序(即将到期的在前)"""
def get_due_date(task: Task) -> datetime:
return task.due_date or datetime.max
return sorted(tasks, key=get_due_date)
@staticmethod
def smart_sort(tasks: List[Task]) -> List[Task]:
"""智能排序:综合考虑优先级、截止日期和进度"""
def calculate_score(task: Task) -> float:
"""计算任务综合得分"""
# 基础分:优先级(40%权重)
priority_score = task.priority.value * 0.4
# 时间紧迫性分(30%权重)
time_score = 0.0
if task.due_date:
days_left = (task.due_date - datetime.now()).days
if days_left <= 0:
time_score = 1.0 # 已过期
elif days_left <= 3:
time_score = 0.8 # 3天内到期
elif days_left <= 7:
time_score = 0.5 # 一周内到期
# 进度分(30%权重)
progress_score = (100 - task.progress) * 0.003 # 进度越低越紧急
return priority_score + time_score + progress_score
return sorted(tasks, key=calculate_score, reverse=True)
class TaskSearch:
"""任务搜索算法"""
@staticmethod
def search_by_keyword(tasks: List[Task], keyword: str) -> List[Task]:
"""关键词搜索(支持模糊匹配)"""
keyword = keyword.lower().strip()
if not keyword:
return tasks
results = []
for task in tasks:
# 在标题和描述中搜索
if (keyword in task.title.lower() or
keyword in task.description.lower()):
results.append(task)
return results
@staticmethod
def filter_by_status(tasks: List[Task], status: Status) -> List[Task]:
"""按状态过滤"""
return [task for task in tasks if task.status == status]
@staticmethod
def find_related_tasks(tasks: List[Task], reference_task: Task) -> List[Task]:
"""查找相关任务(基于标签匹配)"""
if not reference_task.tags:
return []
related = []
for task in tasks:
if task.id == reference_task.id:
continue
# 计算标签匹配度
common_tags = set(reference_task.tags) & set(task.tags)
if common_tags:
task.match_score = len(common_tags) / len(reference_task.tags)
related.append(task)
# 按匹配度排序
return sorted(related, key=lambda t: getattr(t, 'match_score', 0), reverse=True)
```
## 4. 函数编码实现
### 4.1 数据库操作函数
```python
# task_manager/core/database.py
import sqlite3
from contextlib import contextmanager
from typing import List, Optional, Any
from datetime import datetime
import json
from .models import Task, Project, Status, Priority
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_path: str = "tasks.db"):
self.db_path = db_path
self._init_database()
@contextmanager
def get_connection(self):
"""获取数据库连接(上下文管理器)"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def _init_database(self):
"""初始化数据库表"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 创建任务表
cursor.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
priority INTEGER NOT NULL,
status TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
due_date TIMESTAMP,
tags TEXT, -- JSON格式存储
estimated_hours REAL,
actual_hours REAL,
project_id INTEGER,
FOREIGN KEY (project_id) REFERENCES projects(id)
)
""")
# 创建项目表
cursor.execute("""
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP NOT NULL
)
""")
def add_task(self, task: Task) -> int:
"""添加任务到数据库"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO tasks
(title, description, priority, status, created_at, due_date, tags, estimated_hours, actual_hours, project_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
task.title,
task.description,
task.priority.value,
task.status.value,
task.created_at,
task.due_date,
json.dumps(task.tags),
task.estimated_hours,
task.actual_hours,
None # 项目ID,可根据需要修改
))
return cursor.lastrowid
def get_tasks(self, filters: Optional[dict] = None) -> List[Task]:
"""获取任务列表(支持过滤)"""
query = "SELECT * FROM tasks WHERE 1=1"
params = []
if filters:
if 'status' in filters:
query += " AND status = ?"
params.append(filters['status'].value)
if 'priority_min' in filters:
query += " AND priority >= ?"
params.append(filters['priority_min'])
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
rows = cursor.fetchall()
tasks = []
for row in rows:
task = Task(
id=row['id'],
title=row['title'],
description=row['description'],
priority=Priority(row['priority']),
status=Status(row['status']),
created_at=datetime.fromisoformat(row['created_at']),
due_date=datetime.fromisoformat(row['due_date']) if row['due_date'] else None,
tags=json.loads(row['tags']),
estimated_hours=row['estimated_hours'],
actual_hours=row['actual_hours']
)
tasks.append(task)
return tasks
def update_task(self, task_id: int, updates: dict) -> bool:
"""更新任务信息"""
if not updates:
return False
set_clause = ", ".join([f"{key} = ?" for key in updates.keys()])
values = list(updates.values())
values.append(task_id)
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"""
UPDATE tasks SET {set_clause} WHERE id = ?
""", values)
return cursor.rowcount > 0
```
## 5. 模拟测试与单元测试
### 5.1 模拟测试环境
```python
# task_manager/tests/test_models.py
import pytest
from datetime import datetime, timedelta
from task_manager.core.models import Task, Project, Priority, Status
class TestTaskModel:
"""测试Task模型"""
def test_task_creation(self):
"""测试任务创建"""
task = Task(
title="完成项目报告",
description="编写项目总结报告",
priority=Priority.HIGH,
due_date=datetime.now() + timedelta(days=3)
)
assert task.title == "完成项目报告"
assert task.priority == Priority.HIGH
assert task.status == Status.PENDING
assert not task.is_overdue
def test_task_validation(self):
"""测试任务验证"""
with pytest.raises(ValueError):
Task(title="") # 空标题应该抛出异常
def test_task_progress_calculation(self):
"""测试进度计算"""
task = Task(
title="测试任务",
estimated_hours=10.0,
actual_hours=5.0
)
assert task.progress == 50.0 # 5/10 * 100 = 50
def test_overdue_detection(self):
"""测试过期检测"""
# 创建已过期的任务
past_date = datetime.now() - timedelta(days=1)
task = Task(
title="过期任务",
due_date=past_date
)
assert task.is_overdue
# 已完成的任务不应显示为过期
task.status = Status.COMPLETED
assert not task.is_overdue
class TestProjectModel:
"""测试Project模型"""
def test_project_completion_rate(self):
"""测试项目完成率计算"""
project = Project(name="测试项目")
# 添加任务
task1 = Task(title="任务1", status=Status.COMPLETED)
task2 = Task(title="任务2", status=Status.IN_PROGRESS)
task3 = Task(title="任务3", status=Status.PENDING)
project.add_task(task1)
project.add_task(task2)
project.add_task(task3)
# 1个完成,3个总共,完成率应为33.33%
assert round(project.completion_rate, 2) == 33.33
def test_empty_project_completion(self):
"""测试空项目完成率"""
project = Project(name="空项目")
assert project.completion_rate == 0.0
# task_manager/tests/test_algorithms.py
from task_manager.core.algorithms import TaskSorter, TaskSearch
from task_manager.core.models import Task, Priority, Status
from datetime import datetime, timedelta
class TestTaskSorter:
"""测试任务排序算法"""
def setup_method(self):
"""测试准备"""
self.tasks = [
Task(id=1, title="低优先级", priority=Priority.LOW,
due_date=datetime.now() + timedelta(days=7)),
Task(id=2, title="高优先级", priority=Priority.HIGH,
due_date=datetime.now() + timedelta(days=1)),
Task(id=3, title="中优先级", priority=Priority.MEDIUM,
due_date=datetime.now() + timedelta(days=3)),
]
def test_sort_by_priority(self):
"""测试按优先级排序"""
sorted_tasks = TaskSorter.sort_by_priority(self.tasks)
assert sorted_tasks[0].priority == Priority.HIGH
assert sorted_tasks[-1].priority == Priority.LOW
def test_sort_by_due_date(self):
"""测试按截止日期排序"""
sorted_tasks = TaskSorter.sort_by_due_date(self.tasks)
# 截止日期最近的应该在最前面
assert sorted_tasks[0].due_date == self.tasks[1].due_date
def test_smart_sort(self):
"""测试智能排序"""
# 添加一些任务属性以测试智能排序
self.tasks[0].actual_hours = 2.0 # 低优先级任务,进度20%
self.tasks[0].estimated_hours = 10.0
self.tasks[1].actual_hours = 8.0 # 高优先级任务,进度80%
self.tasks[1].estimated_hours = 10.0
sorted_tasks = TaskSorter.smart_sort(self.tasks)
# 高优先级且即将到期的任务应该在最前面
assert sorted_tasks[0].id == 2
class TestTaskSearch:
"""测试任务搜索算法"""
def setup_method(self):
"""测试准备"""
self.tasks = [
Task(id=1, title="完成项目报告", description="编写项目总结",
tags=["工作", "报告"]),
Task(id=2, title="购物清单", description="购买日常用品",
tags=["生活", "购物"]),