<think>我们之前已经配置好了虚拟环境,现在需要启动训练脚本。关键在于确保使用虚拟环境中的Python解释器,并且正确设置工作目录和参数。
假设项目结构如下:
E:\autonomous_driving_project\
├── venv\ # 虚拟环境
├── src\ # 源代码目录
│ ├── train.py # 训练脚本
│ └── ...
├── data\ # 数据目录
├── requirements.txt
└── ...
目标:在虚拟环境中运行 train.py,并能够看到训练进度。
步骤:
1. 激活虚拟环境
2. 确保工作目录正确(通常是项目根目录,但有时脚本可能需要从脚本所在目录运行)
3. 运行训练脚本,并传递必要的参数(如果有)
注意:训练脚本可能依赖相对路径,因此设置工作目录很重要。
有两种常见方式:
方式一:在项目根目录运行,使用模块方式运行(如果项目是包结构)
方式二:进入脚本所在目录运行
这里假设 train.py 在 src 目录下,且设计为从项目根目录运行(即使用相对路径如 'data/' 或 'logs/' 等)。
因此,我们将在项目根目录(E:\autonomous_driving_project)运行,但指定 src 下的 train.py。
具体步骤:
</think>
## 在Python中配置正确路径并监控训练进度的完整指南
在自动驾驶项目中正确配置Python解释器路径并监控训练进度至关重要。以下是详细的操作步骤和代码实现:
### 一、创建路径配置模块
在项目根目录创建 `config/paths.py`:
```python
# config/paths.py
import os
from pathlib import Path
class ProjectPaths:
def __init__(self):
# 获取项目根目录(当前文件的上三级目录)
self.root = Path(__file__).resolve().parents[2]
# 主要目录路径
self.data_dir = self.root / 'data'
self.models_dir = self.root / 'models'
self.logs_dir = self.root / 'logs'
self.src_dir = self.root / 'src'
# 确保目录存在
self._ensure_dirs()
def _ensure_dirs(self):
"""确保所有需要的目录都存在"""
for path in [self.data_dir, self.models_dir, self.logs_dir]:
path.mkdir(parents=True, exist_ok=True)
def get_model_path(self, model_name: str) -> Path:
"""获取模型保存路径"""
return self.models_dir / f"{model_name}.pt"
def get_dataset_path(self, dataset_name: str) -> Path:
"""获取数据集路径"""
return self.data_dir / dataset_name
def get_tensorboard_logdir(self, experiment_name: str) -> Path:
"""获取TensorBoard日志目录"""
return self.logs_dir / 'tensorboard' / experiment_name
def __repr__(self):
return f"ProjectPaths(root={self.root})"
# 全局路径实例
paths = ProjectPaths()
if __name__ == '__main__':
# 测试路径配置
print("项目根目录:", paths.root)
print("数据目录:", paths.data_dir)
print("模型保存路径示例:", paths.get_model_path("resnet50"))
print("TensorBoard日志示例:", paths.get_tensorboard_logdir("exp1"))
```
### 二、创建训练监控模块
在 `src/utils/training_monitor.py` 中:
```python
# src/utils/training_monitor.py
import time
import sys
from datetime import timedelta
from collections import deque
from pathlib import Path
import torch
class TrainingMonitor:
def __init__(self, total_steps: int, log_interval: int = 10, metrics: list = ['loss']):
"""
训练进度监控器
参数:
total_steps: 总训练步数
log_interval: 日志打印间隔
metrics: 需要监控的指标列表
"""
self.total_steps = total_steps
self.log_interval = log_interval
self.metrics = metrics
# 初始化状态
self.start_time = time.time()
self.step = 0
self.epoch = 0
self.metric_history = {metric: deque(maxlen=100) for metric in metrics}
def update(self, metrics: dict, step: int = None, epoch: int = None):
"""
更新训练状态
参数:
metrics: 指标字典,如 {'loss': 0.5, 'accuracy': 0.85}
step: 当前步数
epoch: 当前轮次
"""
if step is not None:
self.step = step
else:
self.step += 1
if epoch is not None:
self.epoch = epoch
# 更新指标历史
for metric, value in metrics.items():
if metric in self.metric_history:
self.metric_history[metric].append(value)
def _get_elapsed_time(self):
"""获取已用时间"""
elapsed = time.time() - self.start_time
return timedelta(seconds=int(elapsed))
def _get_eta(self):
"""计算预计剩余时间"""
if self.step == 0:
return "N/A"
elapsed = time.time() - self.start_time
steps_per_sec = self.step / elapsed
remaining_steps = self.total_steps - self.step
remaining_time = remaining_steps / steps_per_sec if steps_per_sec > 0 else float('inf')
return timedelta(seconds=int(remaining_time))
def _get_progress_bar(self, width=40):
"""获取文本进度条"""
progress = self.step / self.total_steps
filled = int(width * progress)
bar = '[' + '=' * filled + ' ' * (width - filled) + ']'
return f"{bar} {progress:.1%}"
def log_progress(self):
"""打印训练进度"""
if self.step % self.log_interval == 0:
# 计算平均指标值
avg_metrics = {}
for metric, values in self.metric_history.items():
if values:
avg_metrics[metric] = sum(values) / len(values)
# 构建指标字符串
metrics_str = " - ".join([f"{k}: {v:.4f}" for k, v in avg_metrics.items()])
# 打印进度
sys.stdout.write(
f"\rEpoch: {self.epoch} | Step: {self.step}/{self.total_steps} "
f"{self._get_progress_bar()} | "
f"Elapsed: {self._get_elapsed_time()} | "
f"ETA: {self._get_eta()} | {metrics_str}"
)
sys.stdout.flush()
# 在最后一个步骤打印新行
if self.step == self.total_steps:
print()
def log_checkpoint(self, model, optimizer, epoch, step, path):
"""保存检查点"""
checkpoint = {
'epoch': epoch,
'step': step,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'metrics': {k: list(v) for k, v in self.metric_history.items()}
}
torch.save(checkpoint, path)
print(f"\n检查点已保存至: {path}")
```
### 三、配置训练脚本
在 `src/train.py` 中:
```python
# src/train.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from config.paths import paths
from utils.training_monitor import TrainingMonitor
# 示例模型
class AutonomousDrivingModel(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
self.fc1 = nn.Linear(64*64*64, 256) # 假设输入为64x64图像
self.fc2 = nn.Linear(256, 10) # 10个输出类
def forward(self, x):
x = torch.relu(self.conv1(x))
x = torch.max_pool2d(x, 2)
x = torch.relu(self.conv2(x))
x = torch.max_pool2d(x, 2)
x = x.view(x.size(0), -1)
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
def main():
# 1. 初始化配置
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")
# 2. 创建模型、优化器和损失函数
model = AutonomousDrivingModel().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
# 3. 加载数据集 (这里使用伪代码,实际需要替换)
# train_dataset = CustomDataset(paths.get_dataset_path("autonomous_driving"))
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# 4. 创建监控器和TensorBoard写入器
total_steps = 1000 # 假设总共1000步
monitor = TrainingMonitor(total_steps, log_interval=10, metrics=['loss', 'accuracy'])
writer = SummaryWriter(log_dir=paths.get_tensorboard_logdir("exp1"))
# 5. 训练循环
model.train()
for epoch in range(10): # 10个epoch
# 伪代码:实际需要遍历数据加载器
for step in range(100): # 每个epoch100步
# 伪数据 - 实际应从数据加载器获取
inputs = torch.randn(32, 3, 64, 64).to(device)
labels = torch.randint(0, 10, (32,)).to(device)
# 前向传播
outputs = model(inputs)
loss = criterion(outputs, labels)
# 计算准确率
_, predicted = torch.max(outputs, 1)
accuracy = (predicted == labels).sum().item() / labels.size(0)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 更新监控器
monitor.update(
metrics={'loss': loss.item(), 'accuracy': accuracy},
step=epoch * 100 + step,
epoch=epoch
)
# 记录TensorBoard指标
writer.add_scalar('Loss/train', loss.item(), monitor.step)
writer.add_scalar('Accuracy/train', accuracy, monitor.step)
# 打印进度
monitor.log_progress()
# 每100步保存检查点
if monitor.step % 100 == 0:
checkpoint_path = paths.models_dir / f"checkpoint_epoch{epoch}_step{step}.pt"
monitor.log_checkpoint(model, optimizer, epoch, step, checkpoint_path)
# 6. 保存最终模型
final_model_path = paths.get_model_path("autonomous_driving_final")
torch.save(model.state_dict(), final_model_path)
print(f"\n训练完成! 最终模型保存至: {final_model_path}")
# 关闭TensorBoard写入器
writer.close()
if __name__ == "__main__":
main()
```
### 四、启动训练的完整操作流程
#### 步骤1:激活虚拟环境
```powershell
# 打开PowerShell
cd E:\autonomous_driving_project
.\venv\Scripts\activate
```
#### 步骤2:设置Python解释器路径
确保你的IDE或编辑器使用正确的解释器:
1. 在VS Code中:
- 按 `Ctrl+Shift+P` 打开命令面板
- 输入 "Python: Select Interpreter"
- 选择 `E:\autonomous_driving_project\venv\Scripts\python.exe`
2. 在PyCharm中:
- 打开设置 `File > Settings > Project: autonomous_driving_project > Python Interpreter`
- 点击齿轮图标 > Add
- 选择 `Virtualenv Environment > Existing environment`
- 路径:`E:\autonomous_driving_project\venv\Scripts\python.exe`
#### 步骤3:运行训练脚本
```powershell
# 确保在项目根目录
python -m src.train
# 或者直接运行脚本
python src/train.py
```
#### 步骤4:监控训练进度
控制台将显示实时进度:
```
Epoch: 0 | Step: 10/1000 [==== ] 1.0% | Elapsed: 0:00:05 | ETA: 0:08:15 | loss: 2.3124 - accuracy: 0.1250
Epoch: 0 | Step: 20/1000 [======== ] 2.0% | Elapsed: 0:00:10 | ETA: 0:08:20 | loss: 2.2103 - accuracy: 0.1875
...
```
#### 步骤5:启动TensorBoard监控
```powershell
# 在新终端中
tensorboard --logdir E:\autonomous_driving_project\logs\tensorboard
# 访问 http://localhost:6006 查看图表
```
### 五、高级监控技巧
#### 1. 分布式训练监控
```python
# 在训练脚本中添加分布式支持
import torch.distributed as dist
def init_distributed():
dist.init_process_group(
backend='nccl',
init_method='env://'
)
local_rank = int(os.environ['LOCAL_RANK'])
torch.cuda.set_device(local_rank)
return local_rank
# 修改监控器支持分布式
class DistributedTrainingMonitor(TrainingMonitor):
def __init__(self, total_steps, log_interval=10, metrics=['loss'], rank=0):
super().__init__(total_steps, log_interval, metrics)
self.rank = rank # 主节点为0
def log_progress(self):
if self.rank == 0 and self.step % self.log_interval == 0:
# ... 仅主节点打印日志
```
#### 2. 实时指标通知
```python
# 添加桌面通知(Windows)
import win10toast
class NotificationMonitor(TrainingMonitor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.toaster = win10toast.ToastNotifier()
self.last_notify_step = 0
def log_progress(self):
super().log_progress()
# 每完成10%发送通知
progress = self.step / self.total_steps
if progress > 0.1 and (self.step - self.last_notify_step) > self.total_steps * 0.1:
self.toaster.show_toast(
"训练进度更新",
f"已完成 {progress:.0%}",
duration=10
)
self.last_notify_step = self.step
```
#### 3. 自动学习率调整
```python
# 添加学习率调度器
from torch.optim.lr_scheduler import ReduceLROnPlateau
# 在训练循环前添加
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=3, verbose=True)
# 在每个epoch后
scheduler.step(accuracy) # 根据准确率调整学习率
```
### 六、故障排除
#### 问题1:路径错误
**现象**:
```
FileNotFoundError: [Errno 2] No such file or directory: 'E:\\autonomous_driving_project\\data'
```
**解决方案**:
```python
# 在paths.py中添加自动创建目录
def _ensure_dirs(self):
for path in [self.data_dir, self.models_dir, self.logs_dir]:
path.mkdir(parents=True, exist_ok=True)
```
#### 问题2:CUDA内存不足
**解决方案**:
```python
# 减少批次大小
train_loader = DataLoader(dataset, batch_size=16) # 从32减少到16
# 使用梯度累积
accumulation_steps = 2
optimizer.zero_grad()
for i, (inputs, labels) in enumerate(train_loader):
outputs = model(inputs)
loss = criterion(outputs, labels) / accumulation_steps
loss.backward()
if (i + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
```
#### 问题3:训练进度不更新
**检查点**:
1. 确认数据加载器是否正常工作
2. 检查损失值是否合理(不应为NaN)
3. 验证模型参数是否更新:
```python
# 在训练循环中添加
if step == 1:
print("第一个批次的模型参数:", next(model.parameters()).data)
```