要编写一个Python服务器巡检脚本,用于检查常用指标并将结果输出到Markdown文档,我们可以基于`psutil`库获取系统信息,并结合`platform`、`socket`等标准库。以下是一个完整的实现方案。
### 1. 脚本核心功能设计
脚本需要监控的关键指标包括:
| 类别 | 具体指标 | 说明 |
|------|----------|------|
| **系统信息** | 主机名、操作系统、内核版本、启动时间 | 基础环境信息 |
| **CPU** | 使用率(1分钟、5分钟、15分钟平均负载)、核心数、频率 | 计算资源状态 |
| **内存** | 总内存、可用内存、使用率、交换分区使用情况 | 内存资源状态 |
| **磁盘** | 各分区总空间、已用空间、使用率、inode使用率 | 存储资源状态 |
| **网络** | 网络接口状态、IP地址、收发字节数 | 网络连接状态 |
| **进程** | 总进程数、前N个高CPU/内存进程 | 运行状态分析 |
### 2. 完整脚本实现
```python
#!/usr/bin/env python3
"""
服务器自动化巡检脚本
功能:检查系统常用指标并生成Markdown格式报告
作者:基于生产级巡检需求设计
依赖:psutil, platform, datetime, socket
"""
import psutil
import platform
import socket
import datetime
import os
from typing import Dict, List, Tuple
class ServerInspector:
"""服务器巡检器"""
def __init__(self, output_file: str = "server_inspection_report.md"):
self.output_file = output_file
self.report_data = {}
self.timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def collect_system_info(self) -> Dict:
"""收集系统基础信息"""
info = {
"hostname": socket.gethostname(),
"os": platform.system(),
"os_version": platform.version(),
"platform": platform.platform(),
"architecture": platform.machine(),
"processor": platform.processor(),
"boot_time": datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S"),
"uptime": str(datetime.timedelta(seconds=psutil.boot_time())).split(".")[0]
}
return info
def collect_cpu_info(self) -> Dict:
"""收集CPU信息"""
cpu_times = psutil.cpu_times_percent(interval=1)
load_avg = os.getloadavg() if hasattr(os, 'getloadavg') else (0, 0, 0)
info = {
"physical_cores": psutil.cpu_count(logical=False),
"logical_cores": psutil.cpu_count(logical=True),
"cpu_usage_percent": psutil.cpu_percent(interval=1),
"cpu_user": cpu_times.user,
"cpu_system": cpu_times.system,
"cpu_idle": cpu_times.idle,
"load_avg_1min": load_avg[0],
"load_avg_5min": load_avg[1],
"load_avg_15min": load_avg[2],
"cpu_freq": psutil.cpu_freq().current if psutil.cpu_freq() else "N/A"
}
return info
def collect_memory_info(self) -> Dict:
"""收集内存信息"""
mem = psutil.virtual_memory()
swap = psutil.swap_memory()
info = {
"memory_total_gb": round(mem.total / (1024**3), 2),
"memory_available_gb": round(mem.available / (1024**3), 2),
"memory_used_gb": round(mem.used / (1024**3), 2),
"memory_usage_percent": mem.percent,
"swap_total_gb": round(swap.total / (1024**3), 2),
"swap_used_gb": round(swap.used / (1024**3), 2),
"swap_usage_percent": swap.percent
}
return info
def collect_disk_info(self) -> List[Dict]:
"""收集磁盘分区信息"""
disk_info = []
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
disk_info.append({
"device": partition.device,
"mountpoint": partition.mountpoint,
"fstype": partition.fstype,
"total_gb": round(usage.total / (1024**3), 2),
"used_gb": round(usage.used / (1024**3), 2),
"free_gb": round(usage.free / (1024**3), 2),
"usage_percent": usage.percent,
"inode_used": psutil.disk_usage(partition.mountpoint).inodes_used if hasattr(usage, 'inodes_used') else "N/A"
})
except PermissionError:
continue
return disk_info
def collect_network_info(self) -> List[Dict]:
"""收集网络接口信息"""
net_info = []
for interface, addrs in psutil.net_if_addrs().items():
interface_stats = psutil.net_io_counters(pernic=True).get(interface, None)
info = {
"interface": interface,
"ipv4": [],
"ipv6": [],
"mac": "N/A",
"bytes_sent": interface_stats.bytes_sent if interface_stats else 0,
"bytes_recv": interface_stats.bytes_recv if interface_stats else 0
}
for addr in addrs:
if addr.family == socket.AF_INET:
info["ipv4"].append(addr.address)
elif addr.family == socket.AF_INET6:
info["ipv6"].append(addr.address)
elif addr.family == psutil.AF_LINK:
info["mac"] = addr.address
net_info.append(info)
return net_info
def collect_process_info(self, top_n: int = 5) -> Dict:
"""收集进程信息"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# 按CPU使用率排序
cpu_processes = sorted(processes, key=lambda x: x['cpu_percent'], reverse=True)[:top_n]
# 按内存使用率排序
mem_processes = sorted(processes, key=lambda x: x['memory_percent'], reverse=True)[:top_n]
return {
"total_processes": len(processes),
"top_cpu_processes": cpu_processes,
"top_memory_processes": mem_processes
}
def collect_all_metrics(self):
"""收集所有指标"""
self.report_data = {
"timestamp": self.timestamp,
"system": self.collect_system_info(),
"cpu": self.collect_cpu_info(),
"memory": self.collect_memory_info(),
"disks": self.collect_disk_info(),
"network": self.collect_network_info(),
"processes": self.collect_process_info()
}
def generate_markdown_report(self):
"""生成Markdown格式报告"""
with open(self.output_file, 'w', encoding='utf-8') as f:
# 报告标题
f.write(f"# 服务器巡检报告\n\n")
f.write(f"**生成时间:** {self.report_data['timestamp']}\n")
f.write(f"**主机名:** {self.report_data['system']['hostname']}\n\n")
# 系统信息
f.write("## 1. 系统信息\n\n")
sys_info = self.report_data['system']
f.write("| 指标 | 值 |\n")
f.write("|------|-----|\n")
f.write(f"| 操作系统 | {sys_info['os']} {sys_info['os_version']} |\n")
f.write(f"| 平台架构 | {sys_info['architecture']} |\n")
f.write(f"| 处理器 | {sys_info['processor']} |\n")
f.write(f"| 启动时间 | {sys_info['boot_time']} |\n")
f.write(f"| 运行时间 | {sys_info['uptime']} |\n\n")
# CPU信息
f.write("## 2. CPU信息\n\n")
cpu_info = self.report_data['cpu']
f.write("| 指标 | 值 |\n")
f.write("|------|-----|\n")
f.write(f"| 物理核心数 | {cpu_info['physical_cores']} |\n")
f.write(f"| 逻辑核心数 | {cpu_info['logical_cores']} |\n")
f.write(f"| CPU使用率 | {cpu_info['cpu_usage_percent']}% |\n")
f.write(f"| 用户态使用率 | {cpu_info['cpu_user']}% |\n")
f.write(f"| 系统态使用率 | {cpu_info['cpu_system']}% |\n")
f.write(f"| 空闲率 | {cpu_info['cpu_idle']}% |\n")
f.write(f"| 1分钟平均负载 | {cpu_info['load_avg_1min']:.2f} |\n")
f.write(f"| 5分钟平均负载 | {cpu_info['load_avg_5min']:.2f} |\n")
f.write(f"| 15分钟平均负载 | {cpu_info['load_avg_15min']:.2f} |\n")
f.write(f"| CPU频率 | {cpu_info['cpu_freq']} MHz |\n\n")
# 内存信息
f.write("## 3. 内存信息\n\n")
mem_info = self.report_data['memory']
f.write("| 指标 | 值 |\n")
f.write("|------|-----|\n")
f.write(f"| 总内存 | {mem_info['memory_total_gb']} GB |\n")
f.write(f"| 可用内存 | {mem_info['memory_available_gb']} GB |\n")
f.write(f"| 已用内存 | {mem_info['memory_used_gb']} GB |\n")
f.write(f"| 内存使用率 | {mem_info['memory_usage_percent']}% |\n")
f.write(f"| 交换分区总大小 | {mem_info['swap_total_gb']} GB |\n")
f.write(f"| 交换分区已用 | {mem_info['swap_used_gb']} GB |\n")
f.write(f"| 交换分区使用率 | {mem_info['swap_usage_percent']}% |\n\n")
# 磁盘信息
f.write("## 4. 磁盘信息\n\n")
f.write("| 设备 | 挂载点 | 文件系统 | 总容量(GB) | 已用(GB) | 可用(GB) | 使用率 |\n")
f.write("|------|--------|----------|------------|----------|----------|--------|\n")
for disk in self.report_data['disks']:
usage_status = "⚠️" if disk['usage_percent'] > 90 else "✅"
f.write(f"| {disk['device']} | {disk['mountpoint']} | {disk['fstype']} | "
f"{disk['total_gb']} | {disk['used_gb']} | {disk['free_gb']} | "
f"{usage_status} {disk['usage_percent']}% |\n")
f.write("\n")
# 网络信息
f.write("## 5. 网络信息\n\n")
for net in self.report_data['network']:
if net['ipv4'] or net['ipv6']:
f.write(f"### 接口: {net['interface']}\n")
f.write(f"- **MAC地址:** {net['mac']}\n")
if net['ipv4']:
f.write(f"- **IPv4地址:** {', '.join(net['ipv4'])}\n")
if net['ipv6']:
f.write(f"- **IPv6地址:** {', '.join(net['ipv6'][:2])}...\n")
f.write(f"- **发送字节:** {net['bytes_sent']:,}\n")
f.write(f"- **接收字节:** {net['bytes_recv']:,}\n\n")
# 进程信息
f.write("## 6. 进程信息\n\n")
proc_info = self.report_data['processes']
f.write(f"**总进程数:** {proc_info['total_processes']}\n\n")
f.write("### 6.1 CPU使用率前5的进程\n")
f.write("| PID | 进程名 | CPU使用率 | 内存使用率 |\n")
f.write("|-----|--------|-----------|------------|\n")
for proc in proc_info['top_cpu_processes']:
f.write(f"| {proc['pid']} | {proc['name']} | {proc['cpu_percent']:.1f}% | {proc['memory_percent']:.1f}% |\n")
f.write("\n")
f.write("### 6.2 内存使用率前5的进程\n")
f.write("| PID | 进程名 | 内存使用率 | CPU使用率 |\n")
f.write("|-----|--------|------------|-----------|\n")
for proc in proc_info['top_memory_processes']:
f.write(f"| {proc['pid']} | {proc['name']} | {proc['memory_percent']:.1f}% | {proc['cpu_percent']:.1f}% |\n")
# 总结与建议
f.write("\n## 7. 巡检总结\n\n")
# 检查潜在问题
issues = []
# 检查CPU负载
load_avg = cpu_info['load_avg_1min']
cpu_cores = cpu_info['logical_cores']
if load_avg > cpu_cores * 1.5:
issues.append(f"⚠️ CPU负载较高:1分钟平均负载({load_avg:.2f})超过核心数({cpu_cores})的1.5倍")
# 检查内存使用
if mem_info['memory_usage_percent'] > 90:
issues.append(f"⚠️ 内存使用率过高:{mem_info['memory_usage_percent']}%")
# 检查磁盘使用
for disk in self.report_data['disks']:
if disk['usage_percent'] > 90:
issues.append(f"⚠️ 磁盘空间告急:{disk['mountpoint']} 使用率 {disk['usage_percent']}%")
elif disk['usage_percent'] > 80:
issues.append(f"📝 磁盘空间预警:{disk['mountpoint']} 使用率 {disk['usage_percent']}%")
if issues:
f.write("### 发现的问题:\n")
for issue in issues:
f.write(f"- {issue}\n")
else:
f.write("✅ 所有指标正常,未发现明显问题。\n")
f.write(f"\n---\n*报告生成完成于 {self.timestamp}*")
def main():
"""主函数"""
try:
print("开始收集服务器指标...")
# 创建巡检器实例
inspector = ServerInspector("server_inspection_report.md")
# 收集所有指标
inspector.collect_all_metrics()
# 生成报告
inspector.generate_markdown_report()
print(f"巡检完成!报告已保存至: {inspector.output_file}")
# 显示报告摘要
print(f"\n=== 巡检摘要 ===")
print(f"主机名: {inspector.report_data['system']['hostname']}")
print(f"CPU使用率: {inspector.report_data['cpu']['cpu_usage_percent']}%")
print(f"内存使用率: {inspector.report_data['memory']['memory_usage_percent']}%")
print(f"磁盘数量: {len(inspector.report_data['disks'])}")
print(f"总进程数: {inspector.report_data['processes']['total_processes']}")
except Exception as e:
print(f"巡检过程中发生错误: {str(e)}")
return 1
return 0
if __name__ == "__main__":
exit(main())
```
### 3. 脚本使用说明
#### 3.1 环境准备
```bash
# 安装依赖
pip install psutil
# 或使用requirements.txt
echo "psutil>=5.9.0" > requirements.txt
pip install -r requirements.txt
```
#### 3.2 运行脚本
```bash
# 直接运行
python server_inspector.py
# 或添加执行权限
chmod +x server_inspector.py
./server_inspector.py
```
#### 3.3 定时执行配置
```bash
# 使用crontab设置每天凌晨2点执行
crontab -e
# 添加以下行
0 2 * * * /usr/bin/python3 /path/to/server_inspector.py
# 或者每小时执行一次
0 * * * * /usr/bin/python3 /path/to/server_inspector.py
```
### 4. 生成的Markdown报告示例
运行脚本后,会生成`server_inspection_report.md`文件,内容结构如下:
```markdown
# 服务器巡检报告
**生成时间:** 2024-