### **Python开发非Windows系统下PCB制前工程CAM自动化系统完整示例**
本示例将构建一个在**CentOS**(或其他Linux发行版)系统上,基于Python的PCB制前工程CAM自动化系统。该系统将整合设计文件接收、Gerber文件自动检查、CAM脚本处理及任务状态管理等功能,实现从设计文件到生产准备文件的自动化流转。
#### **1. 系统架构与环境配置**
系统采用模块化设计,主要组件包括:任务调度器、文件处理器、CAM引擎接口和数据库。开发与运行环境如下:
| 组件 | 技术选型 | 说明 |
| :--- | :--- | :--- |
| **操作系统** | CentOS 7/8 | 作为稳定的非Windows服务器环境 [ref_1]。 |
| **开发语言** | Python 3.8+ | 核心自动化脚本语言,具备丰富的库支持。 |
| **集成开发环境** | PyCharm (通过SSH远程开发) | 便于在本地Windows/Mac上开发远程CentOS服务器上的代码 [ref_1]。 |
| **版本控制** | Git | 代码与配置管理。 |
| **数据库** | SQLite (轻量级) / PostgreSQL (生产级) | 用于存储任务、文件、日志等信息。 |
| **CAM引擎** | InCAMPro (商业) / 开源工具链 (如 `pcb-tools`) | 核心CAM处理软件或库。本示例将使用开源工具链进行演示。 |
| **通信协议** | SSH, SFTP | 用于远程文件传输和命令执行。 |
**环境初始化脚本 (`setup_env.sh`):**
```bash
#!/bin/bash
# 1. 安装系统依赖
sudo yum install -y python3 python3-pip git
sudo pip3 install --upgrade pip
# 2. 创建项目目录
PROJECT_ROOT="/opt/pcb_cam_auto"
sudo mkdir -p $PROJECT_ROOT/{src,config,logs,input,output,temp}
sudo chown -R $(whoami):$(whoami) $PROJECT_ROOT
# 3. 安装Python依赖库
cd $PROJECT_ROOT
cat > requirements.txt << EOF
# 核心框架
flask>=2.0.0 # 可选,用于构建简单的Web状态监控界面
sqlalchemy>=1.4.0 # ORM
schedule>=1.0.0 # 定时任务调度
# PCB文件处理
pcb-tools>=0.4.0 # 用于解析Gerber/Excellon文件 [ref_4]
numpy>=1.20.0 # 数值计算
shapely>=1.8.0 # 几何图形操作
# 文件与系统操作
watchdog>=2.0.0 # 文件系统监控
paramiko>=2.9.0 # SSH/SFTP客户端 [ref_1]
EOF
pip3 install -r requirements.txt
# 4. 初始化数据库 (以SQLite为例)
cd $PROJECT_ROOT/src
python3 -c "
from database import init_db
init_db()
print('Database initialized.')
"
```
#### **2. 核心模块设计与代码实现**
**2.1 数据库模型 (`src/database.py`)**
定义系统核心数据表结构。
```python
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, Enum, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
import enum
Base = declarative_base()
engine = create_engine('sqlite:///../config/cam_tasks.db') # 生产环境可换为PostgreSQL
SessionLocal = sessionmaker(bind=engine)
class TaskStatus(enum.Enum):
PENDING = "pending"
PROCESSING = "processing"
GERBER_CHECKED = "gerber_checked"
CAM_PROCESSED = "cam_processed"
FAILED = "failed"
COMPLETED = "completed"
class DesignTask(Base):
__tablename__ = 'design_tasks'
id = Column(Integer, primary_key=True)
task_id = Column(String(64), unique=True, nullable=False) # 唯一任务号,如时间戳+随机数
original_filename = Column(String(255)) # 原始设计文件名(如 .pcb, .brd)
gerber_zip_path = Column(String(512)) # 上传的Gerber压缩包路径
status = Column(Enum(TaskStatus), default=TaskStatus.PENDING)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
logs = relationship("TaskLog", back_populates="task", cascade="all, delete-orphan")
class TaskLog(Base):
__tablename__ = 'task_logs'
id = Column(Integer, primary_key=True)
task_id = Column(Integer, ForeignKey('design_tasks.id'))
level = Column(String(20)) # INFO, WARNING, ERROR
message = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
task = relationship("DesignTask", back_populates="logs")
def init_db():
Base.metadata.create_all(bind=engine)
```
**2.2 Gerber文件自动检查模块 (`src/gerber_inspector.py`)**
此模块负责对制板厂发回的Gerber文件进行标准化检查,这是CAM自动化前的重要质检环节 [ref_4]。
```python
import os
import zipfile
from pathlib import Path
from pcb_tools.gbr import GerberFile
from pcb_tools.excellon import ExcellonFile
import database
from database import SessionLocal, TaskStatus, TaskLog
import logging
class GerberInspector:
def __init__(self, task_id):
self.task_id = task_id
self.session = SessionLocal()
self.task = self.session.query(database.DesignTask).filter_by(task_id=task_id).first()
self.base_dir = Path(f"/opt/pcb_cam_auto/temp/{task_id}")
self.base_dir.mkdir(parents=True, exist_ok=True)
def extract_and_validate(self, gerber_zip_path):
"""解压Gerber包并进行基础验证"""
# 1. 解压文件
with zipfile.ZipFile(gerber_zip_path, 'r') as zip_ref:
zip_ref.extractall(self.base_dir)
extracted_files = list(self.base_dir.rglob('*'))
# 2. 记录文件列表
file_list = [str(f.relative_to(self.base_dir)) for f in extracted_files if f.is_file()]
self._log(f"解压完成,共 {len(file_list)} 个文件: {file_list}")
# 3. 核心检查项
checks = {
"格式检查": self._check_rs274x_format(extracted_files),
"命名规范": self._check_ipc_naming(extracted_files),
"钻孔一致性": self._check_drill_consistency(extracted_files),
"原点与单位": self._check_origin_and_units(extracted_files),
}
all_passed = all(checks.values())
if all_passed:
self.task.status = TaskStatus.GERBER_CHECKED
self._log("Gerber文件检查全部通过", level="INFO")
else:
failed_checks = [k for k, v in checks.items() if not v]
self.task.status = TaskStatus.FAILED
self._log(f"Gerber文件检查失败,未通过项: {failed_checks}", level="ERROR")
self.session.commit()
return all_passed
def _check_rs274x_format(self, files):
"""强制要求使用RS-274X格式,禁用Aperture Macro [ref_4]"""
gerber_files = [f for f in files if f.suffix.lower() in ['.gbr', '.gtl', '.gbl', '.gts', '.gbs']]
for g_file in gerber_files:
try:
with open(g_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read(500)
# 检查文件头是否包含RS-274X标识
if "FSLAX" not in content and "FSLAY" not in content:
self._log(f"文件 {g_file.name} 可能非标准RS-274X格式", level="WARNING")
return False
# 粗略检查是否包含宏定义(应避免)
if "%AM" in content:
self._log(f"文件 {g_file.name} 包含Aperture Macro,可能引发兼容性问题 [ref_4]", level="ERROR")
return False
except Exception as e:
self._log(f"读取文件 {g_file.name} 失败: {e}", level="ERROR")
return False
return True
def _check_drill_consistency(self, files):
"""检查NC Drill文件与Gerber层间孔数是否一致 [ref_4]"""
drill_files = [f for f in files if f.suffix.lower() in ['.txt', '.drl', '.xln']]
gerber_files = [f for f in files if f.suffix.lower() in ['.gbr', '.gtl', '.gbl']]
if not drill_files:
self._log("未找到钻孔文件", level="WARNING")
return True # 可能为无孔板
try:
# 使用pcb-tools解析一个钻孔文件示例
for d_file in drill_files[:1]: # 检查第一个钻孔文件
drill_data = ExcellonFile.read(d_file)
hole_count = len(drill_data.holes)
self._log(f"钻孔文件 {d_file.name} 包含 {hole_count} 个孔")
# 此处可添加更复杂的逻辑,如与Gerber焊盘数量对比
except Exception as e:
self._log(f"解析钻孔文件失败: {e}", level="ERROR")
return False
return True
def _check_origin_and_units(self, files):
"""检查原点是否为绝对原点,单位是否统一(mm/mil)[ref_4]"""
# 此检查通常需要解析Gerber文件的FS(格式说明)命令
for g_file in files:
if g_file.suffix.lower() not in ['.gbr', '.gtl', '.gbl', '.gts', '.gbs']:
continue
try:
gerber = GerberFile.read(g_file)
# pcb-tools库可能已解析单位信息,此处进行简单判断
# 实际应用中,应检查所有文件的单位是否一致
pass # 简化示例,实际应实现具体检查逻辑
except Exception as e:
self._log(f"检查文件 {g_file.name} 原点与单位时出错: {e}", level="WARNING")
return True # 假设检查通过
def _check_ipc_naming(self, files):
"""检查文件名是否符合IPC标准命名规范,便于CAM软件自动识别 [ref_4]"""
# 示例:检查常见的层命名约定
standard_suffixes = {'.gtl': 'Top Layer', '.gbl': 'Bottom Layer', '.gts': 'Top Solder Mask', '.gbs': 'Bottom Solder Mask', '.gtp': 'Top Paste', '.gbp': 'Bottom Paste', '.gko': 'Keep-Out Layer', '.gml': 'Mechanical Layer', '.drl': 'Drill Drawing'}
for f in files:
suffix = f.suffix.lower()
if suffix in standard_suffixes:
self._log(f"文件 {f.name} 符合命名规范: {standard_suffixes[suffix]}")
# 可扩展更复杂的正则匹配
return True
def _log(self, message, level="INFO"):
"""记录任务日志"""
log_entry = TaskLog(task_id=self.task.id, level=level, message=message)
self.session.add(log_entry)
self.session.commit()
# 同时打印到控制台
print(f"[{level}] Task {self.task_id}: {message}")
```
**2.3 CAM处理引擎接口 (`src/cam_processor.py`)**
此模块封装对CAM软件(如InCAMPro)或开源工具链的调用。以下示例展示如何调用外部脚本。
```python
import subprocess
import shutil
from pathlib import Path
import database
from database import SessionLocal, TaskStatus, TaskLog
class CAMProcessor:
def __init__(self, task_id):
self.task_id = task_id
self.session = SessionLocal()
self.task = self.session.query(database.DesignTask).filter_by(task_id=task_id).first()
self.workspace = Path(f"/opt/pcb_cam_auto/temp/{task_id}/cam_processing")
self.workspace.mkdir(parents=True, exist_ok=True)
def run_cam_workflow(self):
"""执行预设的CAM处理流程"""
self._log("开始CAM处理流程")
try:
# 1. 准备输入文件(将已检查的Gerber文件复制到工作区)
source_dir = Path(f"/opt/pcb_cam_auto/temp/{self.task_id}")
for item in source_dir.iterdir():
if item.is_file():
shutil.copy2(item, self.workspace / item.name)
# 2. 调用外部CAM处理脚本(示例:使用开源工具生成钻孔报告)
# 假设有一个Python脚本 `generate_drill_report.py`
script_path = "/opt/pcb_cam_auto/scripts/generate_drill_report.py"
result = subprocess.run(
['python3', script_path, '--input-dir', str(self.workspace), '--output-dir', str(self.workspace)],
capture_output=True,
text=True,
timeout=300 # 5分钟超时
)
if result.returncode == 0:
self._log("CAM处理脚本执行成功")
# 3. 处理输出文件(如生成的光绘文件、钻孔文件等)
output_files = list(self.workspace.glob('output_*'))
# 此处可添加文件打包、上传到生产系统等逻辑
self.task.status = TaskStatus.CAM_PROCESSED
self._log(f"生成输出文件: {[f.name for f in output_files]}")
else:
self.task.status = TaskStatus.FAILED
self._log(f"CAM处理脚本执行失败: {result.stderr}", level="ERROR")
except subprocess.TimeoutExpired:
self.task.status = TaskStatus.FAILED
self._log("CAM处理超时", level="ERROR")
except Exception as e:
self.task.status = TaskStatus.FAILED
self._log(f"CAM处理过程发生异常: {e}", level="ERROR")
finally:
self.session.commit()
def _log(self, message, level="INFO"):
log_entry = TaskLog(task_id=self.task.id, level=level, message=message)
self.session.add(log_entry)
self.session.commit()
print(f"[{level}] CAM Task {self.task_id}: {message}")
```
**2.4 主调度器与服务入口 (`src/main_scheduler.py`)**
集成各模块,实现任务监听、调度和状态机管理。
```python
import time
import schedule
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
import database
from database import SessionLocal, TaskStatus
from gerber_inspector import GerberInspector
from cam_processor import CAMProcessor
import uuid
import shutil
class NewFileHandler(FileSystemEventHandler):
"""监控指定目录,新上传的Gerber压缩包触发任务"""
def on_created(self, event):
if not event.is_directory and event.src_path.endswith('.zip'):
print(f"检测到新文件: {event.src_path}")
# 创建新任务
session = SessionLocal()
task_id = f"TASK_{int(time.time())}_{uuid.uuid4().hex[:6]}"
new_task = database.DesignTask(
task_id=task_id,
original_filename=Path(event.src_path).name,
gerber_zip_path=event.src_path,
status=TaskStatus.PENDING
)
session.add(new_task)
session.commit()
session.close()
print(f"已创建任务: {task_id}")
# 立即启动处理(也可放入队列)
process_task(task_id)
def process_task(task_id):
"""处理单个任务的状态流转"""
session = SessionLocal()
task = session.query(database.DesignTask).filter_by(task_id=task_id).first()
if not task:
return
try:
# 状态机逻辑
if task.status == TaskStatus.PENDING:
print(f"开始处理任务: {task_id}")
task.status = TaskStatus.PROCESSING
session.commit()
# 步骤1: Gerber文件检查
inspector = GerberInspector(task_id)
if inspector.extract_and_validate(task.gerber_zip_path):
# 步骤2: CAM处理
processor = CAMProcessor(task_id)
processor.run_cam_workflow()
if task.status == TaskStatus.CAM_PROCESSED:
task.status = TaskStatus.COMPLETED
print(f"任务 {task_id} 处理完成")
else:
print(f"任务 {task_id} 在Gerber检查阶段失败")
else:
print(f"任务 {task_id} 当前状态为 {task.status},跳过处理")
except Exception as e:
task.status = TaskStatus.FAILED
print(f"处理任务 {task_id} 时发生异常: {e}")
finally:
session.commit()
session.close()
def run_scheduler():
"""启动调度器,可定期执行清理等维护任务"""
# 示例:每天凌晨清理3天前的临时文件
schedule.every().day.at("03:00").do(cleanup_old_temp_files)
print("任务调度器已启动...")
while True:
schedule.run_pending()
time.sleep(60)
def cleanup_old_temp_files():
"""清理旧的临时文件"""
temp_root = Path("/opt/p