# Python+ENSP自动化运维实战:5分钟搞定交换机批量配置(附完整代码)
如果你是一位网络工程师,每天面对几十甚至上百台交换机的配置任务,还在逐台登录、逐条敲命令,那这篇文章就是为你准备的。我经历过那种重复劳动带来的疲惫和低效,也深知一次手误可能导致全网故障的风险。直到我开始将Python脚本与华为ENSP模拟器结合,才真正从繁琐的配置工作中解放出来。今天,我想分享的,不是高深的理论,而是一套能让你在5分钟内,从零开始构建自动化配置能力的实战方案。无论你是刚接触网络自动化的新手,还是希望优化现有流程的老手,这套方法都能直接应用到你的日常工作中,将批量配置的时间从几小时压缩到几分钟。
## 1. 环境搭建与基础准备:从零到一的快速启动
在开始编写任何自动化脚本之前,一个稳定、可复现的实验环境至关重要。很多工程师卡在第一步,不是因为技术复杂,而是环境配置的细节问题。我建议你完全按照下面的步骤操作,避免走弯路。
### 1.1 ENSP模拟器与Python环境部署
ENSP(Enterprise Network Simulation Platform)是华为官方的网络仿真工具,它完美模拟了真实设备的行为,是我们进行自动化测试的沙盒。首先,确保你的电脑满足以下最低要求:
- **操作系统**:Windows 10 64位(ENSP对Windows兼容性最好)
- **内存**:8GB以上(运行多台设备时16GB更佳)
- **虚拟化支持**:需要在BIOS中开启Intel VT-x/AMD-V
安装ENSP时,它会自动安装必要的虚拟化组件(如VirtualBox、WinPcap)。但有一个关键点经常被忽略:**安装路径不能包含中文或特殊字符**,否则可能导致设备启动失败。我习惯安装在 `C:\eNSP` 这样的纯英文路径下。
Python环境方面,我强烈推荐使用 **Python 3.8 或 3.9** 版本。这两个版本在库兼容性和稳定性上表现最佳。避免使用最新的Python 3.11+,因为部分网络自动化库可能尚未适配。安装时务必勾选“Add Python to PATH”,这样可以在任何命令行窗口直接调用Python。
验证安装是否成功,打开命令提示符(CMD)或PowerShell,分别输入:
```bash
ensp
python --version
pip --version
```
如果都能正确显示版本信息,说明基础环境就绪。
### 1.2 关键Python库的安装与选择
网络自动化领域有几个核心库,它们各有侧重。盲目选择可能导致代码复杂或功能受限。下面这个表格对比了最常用的三个库,帮你快速做出选择:
| 库名称 | 核心优势 | 适用场景 | 学习曲线 | 设备支持广度 |
|--------|----------|----------|----------|--------------|
| **Paramiko** | 纯Python实现,底层SSH协议控制精细 | 需要深度定制SSH交互、处理非标准设备 | 较陡峭 | 广泛(需自行适配) |
| **Netmiko** | 基于Paramiko,封装了常见网络设备交互模式 | 多厂商设备统一管理、快速开发 | 平缓 | 非常好(内置大量设备类型) |
| **NAPALM** | 配置与状态获取的抽象层,支持配置差异比较 | 多厂商配置标准化、配置合规检查 | 中等 | 较好(但部分厂商驱动需额外安装) |
对于绝大多数ENSP环境下的华为设备自动化,**Netmiko**是最佳起点。它屏蔽了底层SSH连接的复杂性,提供了简洁统一的API。安装它及其依赖只需一行命令:
```bash
pip install netmiko
```
但这里有个细节:国内网络有时访问PyPI较慢,可能导致安装失败。你可以使用清华镜像源加速:
```bash
pip install netmiko -i https://pypi.tuna.tsinghua.edu.cn/simple
```
安装完成后,写一个简单的测试脚本验证Netmiko能否正常工作:
```python
from netmiko import ConnectHandler
# 这只是个测试,不会真正连接
print("Netmiko版本:", netmiko.__version__)
```
如果输出版本号(如 `4.1.2`),说明安装成功。
### 1.3 ENSP设备SSH基础配置
自动化连接的前提是设备开启了SSH服务。在ENSP中拖入一台S5700交换机,启动后按以下步骤配置:
```bash
<Huawei>system-view
[Huawei]sysname SW1
[SW1]interface Vlanif 1
[SW1-Vlanif1]ip address 192.168.1.100 24
[SW1-Vlanif1]quit
[SW1]stelnet server enable
[SW1]ssh user admin authentication-type password
[SW1]ssh user admin service-type stelnet
[SW1]aaa
[SW1-aaa]local-user admin password cipher Admin@123
[SW1-aaa]local-user admin privilege level 15
[SW1-aaa]local-user admin service-type ssh
[SW1-aaa]quit
[SW1]user-interface vty 0 4
[SW1-ui-vty0-4]authentication-mode aaa
[SW1-ui-vty0-4]protocol inbound ssh
[SW1-ui-vty0-4]quit
[SW1]rsa local-key-pair create
```
> **注意**:生成RSA密钥时,直接按回车使用默认512位即可。在ENSP模拟环境中,这完全够用,且生成速度更快。
配置完成后,在真实机命令行用SSH客户端测试连接:
```bash
ssh admin@192.168.1.100
```
输入密码 `Admin@123`,如果能看到 `[SW1]` 提示符,说明SSH配置成功。这个步骤看似基础,但却是后续所有自动化的基石,务必确保每台设备都按此配置。
## 2. 单设备自动化:从手动到自动的关键一跃
掌握了环境搭建,我们进入实战环节。很多教程一上来就讲多线程、批量处理,但我认为**先搞定单设备,再扩展批量**才是更稳妥的学习路径。这一节,我会带你编写第一个真正可用的自动化脚本。
### 2.1 第一个可用的Netmiko脚本
让我们从一个最简单的需求开始:自动登录交换机,查看设备基本信息。创建文件 `first_script.py`,输入以下代码:
```python
from netmiko import ConnectHandler
import time
# 设备连接参数
device = {
'device_type': 'huawei',
'ip': '192.168.1.100',
'username': 'admin',
'password': 'Admin@123',
'port': 22, # SSH默认端口
'secret': '', # 华为设备一般不需要enable密码
'verbose': False, # 设为True可看到详细交互过程
}
# 建立连接
print(f"正在连接 {device['ip']}...")
try:
connection = ConnectHandler(**device)
print("连接成功!")
# 进入系统视图(Netmiko会自动处理)
# 发送显示命令
output = connection.send_command('display version')
print("设备版本信息:")
print(output[:500]) # 只打印前500字符避免刷屏
# 获取接口简要信息
output = connection.send_command('display ip interface brief')
print("\n接口IP信息:")
print(output)
# 断开连接
connection.disconnect()
print("连接已关闭")
except Exception as e:
print(f"连接失败: {str(e)}")
```
运行这个脚本,你应该能看到设备的版本和接口信息。这里有几个关键点:
1. **`device_type`** 必须指定为 `'huawei'`,Netmiko根据这个值决定如何与设备交互
2. **`send_command()`** 方法用于执行显示命令,它会等待命令执行完成并返回所有输出
3. **异常处理** 很重要,网络设备可能临时不可达,脚本需要有容错能力
> **提示**:如果你看到类似 `Authentication failed` 的错误,请检查用户名、密码和SSH配置。如果是 `Connection refused`,确保设备IP正确且SSH服务已启动。
### 2.2 配置下发:VLAN批量创建的实战案例
查看信息只是第一步,真正的价值在于自动配置。假设我们需要在交换机上创建VLAN 10到VLAN 20,并为每个VLAN添加描述。手动操作需要输入11条命令,而用Python只需要几行:
```python
from netmiko import ConnectHandler
device = {
'device_type': 'huawei',
'ip': '192.168.1.100',
'username': 'admin',
'password': 'Admin@123',
}
# 要创建的VLAN列表
vlans_to_create = list(range(10, 21)) # [10, 11, ..., 20]
# 生成配置命令列表
config_commands = []
for vlan_id in vlans_to_create:
config_commands.append(f'vlan {vlan_id}')
config_commands.append(f'description Python-Auto-VLAN-{vlan_id}')
print(f"将创建 {len(vlans_to_create)} 个VLAN")
print("配置命令:")
for cmd in config_commands:
print(f" {cmd}")
# 连接并执行
try:
connection = ConnectHandler(**device)
# 发送配置命令
output = connection.send_config_set(config_commands)
# 保存配置(华为设备命令)
output += connection.send_command('save', expect_string=r'\[Y/N\]:')
output += connection.send_command('Y', expect_string=r'\[Y/N\]:')
print("\n配置完成!输出摘要:")
# 只显示关键信息,避免过多输出
for line in output.split('\n'):
if 'successfully' in line.lower() or 'error' in line.lower() or 'vlan' in line.lower():
print(line)
connection.disconnect()
except Exception as e:
print(f"配置过程中出错: {str(e)}")
```
这个脚本展示了 `send_config_set()` 方法的使用,它可以接收一个命令列表,自动按顺序执行。注意华为设备需要显式保存配置,否则重启后会丢失。
### 2.3 配置检查与回滚机制
自动化配置最怕的是什么?是配置错误导致网络中断。因此,**配置前检查**和**出错回滚**机制必不可少。下面是一个更健壮的版本:
```python
from netmiko import ConnectHandler
from netmiko.ssh_exception import NetmikoTimeoutException, NetmikoAuthenticationException
import difflib
def get_config_backup(connection):
"""获取当前配置备份"""
return connection.send_command('display current-configuration')
def compare_configs(old, new):
"""比较两个配置的差异"""
old_lines = old.splitlines()
new_lines = new.splitlines()
diff = difflib.unified_diff(old_lines, new_lines, lineterm='')
return '\n'.join(diff)
device = {
'device_type': 'huawei',
'ip': '192.168.1.100',
'username': 'admin',
'password': 'Admin@123',
}
try:
# 连接设备
connection = ConnectHandler(**device)
# 1. 备份当前配置
print("步骤1: 备份当前配置...")
original_config = get_config_backup(connection)
# 2. 执行新配置
print("步骤2: 应用新配置...")
new_commands = [
'interface GigabitEthernet 0/0/1',
'description Uplink-to-Core',
'port link-type trunk',
'port trunk allow-pass vlan 10 20 30',
]
connection.send_config_set(new_commands)
# 3. 获取应用后的配置
updated_config = get_config_backup(connection)
# 4. 显示差异
print("步骤3: 配置变更对比:")
changes = compare_configs(original_config, updated_config)
if changes:
print("以下配置被修改:")
print(changes)
else:
print("未检测到配置变更")
# 5. 验证配置
print("步骤4: 验证接口配置...")
verify_output = connection.send_command('display interface GigabitEthernet 0/0/1 brief')
print(verify_output)
# 询问是否保存
save = input("\n是否保存配置?(yes/no): ").lower()
if save == 'yes':
connection.send_command('save', expect_string=r'\[Y/N\]:')
connection.send_command('Y', expect_string=r'\[Y/N\]:')
print("配置已保存")
else:
# 回滚到原始配置
print("执行回滚...")
rollback_commands = [
'interface GigabitEthernet 0/0/1',
'undo description',
'undo port link-type',
'undo port trunk allow-pass vlan',
]
connection.send_config_set(rollback_commands)
print("已回滚到原始配置")
connection.disconnect()
except NetmikoTimeoutException:
print("错误: 连接超时,请检查网络连通性")
except NetmikoAuthenticationException:
print("错误: 认证失败,请检查用户名/密码")
except Exception as e:
print(f"未知错误: {str(e)}")
```
这个脚本引入了几个重要概念:
- **配置备份**:在执行任何变更前,先保存当前配置
- **差异对比**:使用Python的difflib库显示配置变化,便于审核
- **交互式确认**:重要变更前要求人工确认
- **回滚机制**:用户取消时自动恢复原配置
- **异常分类处理**:针对不同错误类型给出具体提示
## 3. 多设备批量管理:效率的指数级提升
单设备自动化已经能节省大量时间,但真正的威力在于批量处理。当你有10台、50台甚至100台设备需要相同配置时,批量自动化带来的效率提升是指数级的。
### 3.1 设备清单管理与连接池
首先,我们需要一个灵活的设备清单管理方式。我推荐使用YAML格式,因为它既人类可读又易于程序解析。创建文件 `devices.yaml`:
```yaml
---
# 生产环境核心交换机
core_switches:
- name: "CORE-SW-01"
ip: "192.168.1.101"
device_type: "huawei"
username: "admin"
password: "Admin@123"
site: "数据中心-A"
role: "核心"
- name: "CORE-SW-02"
ip: "192.168.1.102"
device_type: "huawei"
username: "admin"
password: "Admin@123"
site: "数据中心-B"
role: "核心"
# 接入层交换机
access_switches:
- name: "ACC-SW-F1-01"
ip: "192.168.1.201"
device_type: "huawei"
username: "admin"
password: "Admin@123"
site: "办公楼-1F"
role: "接入"
- name: "ACC-SW-F2-01"
ip: "192.168.1.202"
device_type: "huawei"
username: "admin"
password: "Admin@123"
site: "办公楼-2F"
role: "接入"
- name: "ACC-SW-F3-01"
ip: "192.168.1.203"
device_type: "huawei"
username: "admin"
password: "Admin@123"
site: "办公楼-3F"
role: "接入"
```
对应的Python脚本可以这样读取和处理:
```python
import yaml
from netmiko import ConnectHandler
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('batch_operation.log'),
logging.StreamHandler()
]
)
def load_devices_from_yaml(file_path):
"""从YAML文件加载设备清单"""
with open(file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
# 展平所有设备到一个列表
all_devices = []
for group in data.values():
all_devices.extend(group)
return all_devices
def configure_single_device(device_info, commands):
"""配置单个设备"""
device_name = device_info['name']
device_ip = device_info['ip']
try:
logging.info(f"开始配置设备 {device_name} ({device_ip})")
# 建立连接
connection = ConnectHandler(**{k: v for k, v in device_info.items()
if k in ['device_type', 'ip', 'username', 'password', 'port']})
# 执行配置命令
output = connection.send_config_set(commands)
# 保存配置
connection.send_command('save', expect_string=r'\[Y/N\]:')
connection.send_command('Y', expect_string=r'\[Y/N\]:')
connection.disconnect()
logging.info(f"设备 {device_name} 配置完成")
return {"device": device_name, "status": "success", "output": output[:200]} # 只返回前200字符
except Exception as e:
logging.error(f"设备 {device_name} 配置失败: {str(e)}")
return {"device": device_name, "status": "failed", "error": str(e)}
def batch_configure_devices(device_list, commands, max_workers=5):
"""批量配置设备(使用线程池)"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_device = {
executor.submit(configure_single_device, device, commands): device['name']
for device in device_list
}
# 收集结果
for future in as_completed(future_to_device):
device_name = future_to_device[future]
try:
result = future.result(timeout=300) # 5分钟超时
results.append(result)
except Exception as e:
results.append({
"device": device_name,
"status": "timeout",
"error": f"操作超时: {str(e)}"
})
return results
if __name__ == "__main__":
# 加载设备
devices = load_devices_from_yaml('devices.yaml')
# 定义要下发的配置(例如:配置NTP服务器)
ntp_commands = [
'ntp-service unicast-server 192.168.100.1',
'ntp-service unicast-server 192.168.100.2',
'clock timezone CST add 08:00:00',
]
print(f"共发现 {len(devices)} 台设备")
print("开始批量配置NTP...")
# 执行批量配置
results = batch_configure_devices(devices, ntp_commands, max_workers=3)
# 统计结果
success_count = sum(1 for r in results if r['status'] == 'success')
failed_count = sum(1 for r in results if r['status'] == 'failed')
timeout_count = sum(1 for r in results if r['status'] == 'timeout')
print(f"\n批量配置完成!")
print(f"成功: {success_count} 台")
print(f"失败: {failed_count} 台")
print(f"超时: {timeout_count} 台")
# 显示失败详情
if failed_count > 0 or timeout_count > 0:
print("\n失败设备详情:")
for result in results:
if result['status'] != 'success':
print(f" {result['device']}: {result.get('error', '未知错误')}")
```
这个脚本的核心优势在于:
1. **线程池并发**:使用 `ThreadPoolExecutor` 同时配置多台设备,大幅缩短总时间
2. **完善的日志**:记录每个设备的操作状态,便于排查问题
3. **结果统计**:自动汇总成功/失败数量,一目了然
4. **优雅的错误处理**:单台设备失败不会影响其他设备
> **注意**:`max_workers` 参数控制并发数,不宜设置过大。对于ENSP模拟环境,建议设为3-5,因为模拟器本身资源有限。生产环境中可根据网络带宽和设备性能调整,通常10-20是安全范围。
### 3.2 配置文件模板与变量替换
在实际运维中,不同设备可能需要相似的配置,但某些参数(如IP地址、VLAN ID等)各不相同。这时可以使用Jinja2模板引擎。首先安装Jinja2:
```bash
pip install jinja2
```
创建模板文件 `interface_config.j2`:
```jinja2
interface {{ interface_name }}
description {{ description }}
{% if port_type == 'access' %}
port link-type access
port default vlan {{ vlan_id }}
{% elif port_type == 'trunk' %}
port link-type trunk
port trunk allow-pass vlan {{ allowed_vlans }}
{% endif %}
{% if stp_enabled %}
stp edged-port enable
{% endif %}
```
对应的Python脚本:
```python
from jinja2 import Environment, FileSystemLoader
from netmiko import ConnectHandler
import yaml
def generate_config_from_template(template_file, context):
"""使用模板生成配置"""
env = Environment(loader=FileSystemLoader('.'))
template = env.get_template(template_file)
return template.render(context)
def apply_interface_config(device_info, interface_configs):
"""应用接口配置到设备"""
connection = ConnectHandler(**device_info)
all_output = ""
for config in interface_configs:
# 生成配置命令
commands = config.split('\n')
# 过滤空行
commands = [cmd.strip() for cmd in commands if cmd.strip()]
if commands:
output = connection.send_config_set(commands)
all_output += output + "\n"
# 保存配置
connection.send_command('save', expect_string=r'\[Y/N\]:')
connection.send_command('Y', expect_string=r'\[Y/N\]:')
connection.disconnect()
return all_output
# 定义设备接口配置
interface_configs = [
{
'interface_name': 'GigabitEthernet 0/0/1',
'description': 'PC-01',
'port_type': 'access',
'vlan_id': 10,
'stp_enabled': True
},
{
'interface_name': 'GigabitEthernet 0/0/24',
'description': 'Uplink-to-Core',
'port_type': 'trunk',
'allowed_vlans': '10 20 30',
'stp_enabled': False
}
]
# 为每个接口生成配置
generated_configs = []
for config in interface_configs:
config_text = generate_config_from_template('interface_config.j2', config)
generated_configs.append(config_text)
print(f"生成的配置 {config['interface_name']}:")
print(config_text)
print("-" * 50)
# 应用到设备
device = {
'device_type': 'huawei',
'ip': '192.168.1.100',
'username': 'admin',
'password': 'Admin@123',
}
result = apply_interface_config(device, generated_configs)
print("配置应用完成!")
```
模板化的好处显而易见:配置逻辑与数据分离。当需要修改配置格式时,只需改模板文件;当需要为不同设备生成配置时,只需提供不同的上下文数据。
### 3.3 批量操作的最佳实践与陷阱规避
在长期使用批量自动化的过程中,我总结了一些最佳实践和常见陷阱:
**最佳实践:**
1. **先验证后执行**:在真正修改前,先用 `display` 命令验证设备状态
2. **分批执行**:不要一次性操作所有设备,先小范围测试
3. **配置备份**:每次变更前自动备份配置
4. **操作日志**:详细记录谁、在什么时间、对哪些设备、做了什么操作
5. **回滚计划**:准备好快速回滚的方案和脚本
**常见陷阱及规避方法:**
| 陷阱 | 现象 | 规避方法 |
|------|------|----------|
| **连接风暴** | 同时连接太多设备,导致设备或网络拥塞 | 使用连接池,限制并发数 |
| **配置冲突** | 多个脚本同时修改同一设备 | 实现配置锁机制,或使用队列串行化操作 |
| **密码过期** | 脚本运行时密码突然过期 | 定期检查密码有效期,提前更新 |
| **版本差异** | 不同设备版本命令语法不同 | 根据设备版本动态调整命令 |
| **超时设置不足** | 复杂命令执行时间超过默认超时 | 根据命令复杂度调整超时时间 |
这里是一个增强版的批量操作脚本,包含了上述最佳实践:
```python
import yaml
import logging
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from netmiko import ConnectHandler
from netmiko.ssh_exception import NetmikoTimeoutException, NetmikoAuthenticationException
class BatchConfigManager:
def __init__(self, config_file='devices.yaml', max_workers=5):
self.devices = self.load_devices(config_file)
self.max_workers = max_workers
self.setup_logging()
def setup_logging(self):
"""设置日志"""
log_filename = f'config_operation_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_filename),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def load_devices(self, config_file):
"""加载设备配置"""
with open(config_file, 'r') as f:
return yaml.safe_load(f)
def pre_check(self, device_info):
"""执行前置检查"""
try:
conn = ConnectHandler(**device_info, timeout=10)
# 检查设备型号和版本
version_output = conn.send_command('display version', delay_factor=2)
self.logger.info(f"{device_info['name']} - 版本检查通过")
# 检查配置保存状态
config_status = conn.send_command('display saved-configuration last')
if 'The last saved configuration' in config_status:
self.logger.info(f"{device_info['name']} - 配置已保存")
else:
self.logger.warning(f"{device_info['name']} - 配置未保存")
conn.disconnect()
return True
except Exception as e:
self.logger.error(f"{device_info['name']} - 前置检查失败: {str(e)}")
return False
def backup_config(self, device_info):
"""备份设备配置"""
try:
conn = ConnectHandler(**device_info)
config = conn.send_command('display current-configuration')
conn.disconnect()
# 保存到文件
backup_file = f"backup_{device_info['name']}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.cfg"
with open(backup_file, 'w', encoding='utf-8') as f:
f.write(config)
self.logger.info(f"{device_info['name']} - 配置已备份到 {backup_file}")
return backup_file
except Exception as e:
self.logger.error(f"{device_info['name']} - 备份失败: {str(e)}")
return None
def safe_configure(self, device_info, commands, backup_first=True):
"""安全配置设备"""
device_name = device_info['name']
# 步骤1: 前置检查
if not self.pre_check(device_info):
return {"status": "failed", "reason": "pre_check_failed"}
# 步骤2: 备份配置
backup_file = None
if backup_first:
backup_file = self.backup_config(device_info)
if not backup_file:
self.logger.warning(f"{device_name} - 继续执行(备份失败)")
# 步骤3: 执行配置
try:
self.logger.info(f"{device_name} - 开始配置")
conn = ConnectHandler(**device_info, timeout=30)
# 发送配置命令
output = conn.send_config_set(commands, delay_factor=2)
# 验证配置
verify_cmd = 'display current-configuration | include ' + commands[0].split()[0] if commands else ''
if verify_cmd:
verify_output = conn.send_command(verify_cmd)
self.logger.debug(f"{device_name} - 验证输出: {verify_output[:100]}")
# 保存配置
conn.send_command('save', expect_string=r'\[Y/N\]:', delay_factor=2)
conn.send_command('Y', expect_string=r'\[Y/N\]:', delay_factor=2)
conn.disconnect()
self.logger.info(f"{device_name} - 配置成功")
return {
"status": "success",
"backup_file": backup_file,
"output_summary": output[:500] # 只保存前500字符
}
except NetmikoTimeoutException:
self.logger.error(f"{device_name} - 连接超时")
return {"status": "failed", "reason": "timeout"}
except NetmikoAuthenticationException:
self.logger.error(f"{device_name} - 认证失败")
return {"status": "failed", "reason": "authentication"}
except Exception as e:
self.logger.error(f"{device_name} - 配置失败: {str(e)}")
return {"status": "failed", "reason": str(e)}
def batch_operation(self, device_group, commands, operation_name="批量配置"):
"""执行批量操作"""
self.logger.info(f"开始 {operation_name},共 {len(device_group)} 台设备")
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交任务
future_to_device = {
executor.submit(self.safe_configure, device, commands): device['name']
for device in device_group
}
# 收集结果
for future in as_completed(future_to_device):
device_name = future_to_device[future]
try:
result = future.result(timeout=300)
results.append((device_name, result))
except Exception as e:
results.append((device_name, {
"status": "failed",
"reason": f"future_error: {str(e)}"
}))
# 生成报告
self.generate_report(results, operation_name)
return results
def generate_report(self, results, operation_name):
"""生成操作报告"""
success = [r for _, r in results if r['status'] == 'success']
failed = [r for _, r in results if r['status'] == 'failed']
report = f"""
{operation_name} 完成报告
========================================
操作时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
总设备数: {len(results)}
成功: {len(success)}
失败: {len(failed)}
失败设备详情:
"""
for device_name, result in results:
if result['status'] == 'failed':
report += f" - {device_name}: {result.get('reason', '未知原因')}\n"
print(report)
# 保存报告到文件
report_file = f"report_{operation_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
self.logger.info(f"报告已保存到 {report_file}")
# 使用示例
if __name__ == "__main__":
# 初始化管理器
manager = BatchConfigManager(max_workers=3)
# 定义要执行的命令
ntp_commands = [
'ntp-service unicast-server 192.168.100.1',
'ntp-service unicast-server 192.168.100.2 prefer',
'clock timezone CST add 08:00:00',
'clock daylight-saving-time CST repeating 02:00 2024 01:00 2024-12-31 02:00',
]
# 执行批量配置
results = manager.batch_operation(
device_group=manager.devices['access_switches'],
commands=ntp_commands,
operation_name="NTP服务器配置"
)
```
这个增强版脚本提供了完整的生产级功能,包括前置检查、自动备份、详细日志和报告生成。它是我在实际工作中不断迭代优化的结果,能够处理大多数批量配置场景。
## 4. 高级技巧与生产环境考量
当你掌握了基础的单设备和批量操作后,可能会遇到更复杂的需求。这一节分享一些高级技巧和在生产环境中需要考虑的实际问题。
### 4.1 配置合规性检查与自动修复
网络设备配置需要符合一定的安全规范和最佳实践。我们可以编写脚本自动检查并修复不合规的配置。以下是一个检查SSH安全配置的例子:
```python
import re
from netmiko import ConnectHandler
class SecurityAuditor:
def __init__(self, device_info):
self.device = device_info
self.connection = None
def connect(self):
"""建立连接"""
self.connection = ConnectHandler(**self.device)
def disconnect(self):
"""断开连接"""
if self.connection:
self.connection.disconnect()
def check_ssh_config(self):
"""检查SSH配置"""
checks = []
# 获取SSH配置
ssh_config = self.connection.send_command('display ssh server status')
# 检查1: SSH服务是否启用
if 'SSH server : Disable' in ssh_config:
checks.append({
'check': 'SSH服务状态',
'status': 'FAIL',
'issue': 'SSH服务未启用',
'fix_command': 'stelnet server enable'
})
else:
checks.append({
'check': 'SSH服务状态',
'status': 'PASS',
'issue': '',
'fix_command': ''
})
# 检查2: SSH协议版本
ssh_version = self.connection.send_command('display ssh server')
if 'SSH version : 1.99' in ssh_version:
checks.append({
'check': 'SSH协议版本',
'status': 'WARN',
'issue': '同时支持SSHv1和SSHv2,建议禁用SSHv1',
'fix_command': 'ssh server compatible-ssh1x disable'
})
else:
checks.append({
'check': 'SSH协议版本',
'status': 'PASS',
'issue': '',
'fix_command': ''
})
# 检查3: 认证超时时间
timeout_config = self.connection.send_command('display ssh server timeout')
match = re.search(r'Timeout interval\s*:\s*(\d+)', timeout_config)
if match:
timeout = int(match.group(1))
if timeout > 60: # 超过60秒不安全
checks.append({
'check': 'SSH认证超时',
'status': 'FAIL',
'issue': f'认证超时时间过长: {timeout}秒',
'fix_command': f'ssh server timeout {min(timeout, 60)}'
})
# 检查4: 最大认证尝试次数
auth_config = self.connection.send_command('display ssh server authentication-retries')
match = re.search(r'Authentication retries\s*:\s*(\d+)', auth_config)
if match:
retries = int(match.group(1))
if retries > 3: # 超过3次不安全
checks.append({
'check': 'SSH认证重试次数',
'status': 'FAIL',
'issue': f'认证重试次数过多: {retries}次',
'fix_command': f'ssh server authentication-retries 3'
})
return checks
def check_user_accounts(self):
"""检查用户账户"""
checks = []
# 获取本地用户配置
user_config = self.connection.send_command('display local-user')
# 检查是否存在默认账户
default_users = ['admin', 'root', 'user']
for line in user_config.split('\n'):
for default_user in default_users:
if f'User-name : {default_user}' in line:
checks.append({
'check': '默认账户检查',
'status': 'WARN',
'issue': f'存在默认账户: {default_user}',
'fix_command': f'undo local-user {default_user}'
})
# 检查密码复杂度(简化检查)
password_policy = self.connection.send_command('display password-policy')
if 'Password complexity check: Disable' in password_policy:
checks.append({
'check': '密码复杂度策略',
'status': 'FAIL',
'issue': '未启用密码复杂度检查',
'fix_command': 'password-policy complexity enable'
})
return checks
def generate_report(self, checks):
"""生成检查报告"""
report = []
report.append("=" * 60)
report.append("安全配置合规性检查报告")
report.append("=" * 60)
for check in checks:
status_icon = "✅" if check['status'] == 'PASS' else "⚠️" if check['status'] == 'WARN' else "❌"
report.append(f"{status_icon} {check['check']}: {check['status']}")
if check['issue']:
report.append(f" 问题: {check['issue']}")
if check['fix_command']:
report.append(f" 修复命令: {check['fix_command']}")
report.append("")
# 统计
pass_count = sum(1 for c in checks if c['status'] == 'PASS')
warn_count = sum(1 for c in checks if c['status'] == 'WARN')
fail_count = sum(1 for c in checks if c['status'] == 'FAIL')
report.append(f"检查完成: 通过 {pass_count}, 警告 {warn_count}, 失败 {fail_count}")
return '\n'.join(report)
def auto_fix_issues(self, checks):
"""自动修复发现的问题"""
fix_commands = []
for check in checks:
if check['status'] in ['FAIL', 'WARN'] and check['fix_command']:
fix_commands.append(check['fix_command'])
if fix_commands:
print(f"发现 {len(fix_commands)} 个问题需要修复")
confirm = input("是否自动修复?(yes/no): ")
if confirm.lower() == 'yes':
output = self.connection.send_config_set(fix_commands)
print("修复命令已执行")
return output
return None
# 使用示例
device = {
'device_type': 'huawei',
'ip': '192.168.1.100',
'username': 'admin',
'password': 'Admin@123',
}
auditor = SecurityAuditor(device)
try:
auditor.connect()
# 执行检查
ssh_checks = auditor.check_ssh_config()
user_checks = auditor.check_user_accounts()
all_checks = ssh_checks + user_checks
# 生成报告
report = auditor.generate_report(all_checks)
print(report)
# 可选:自动修复
auditor.auto_fix_issues(all_checks)
finally:
auditor.disconnect()
```
这个安全审计脚本可以扩展更多检查项,比如检查SNMP社区字符串、检查未使用的端口、检查日志配置等。关键是建立一套可扩展的检查框架。
### 4.2 性能监控与告警集成
自动化配置之外,监控设备状态同样重要。我们可以编写脚本定期收集设备性能数据,并在异常时触发告警。以下是一个简单的监控示例:
```python
import time
import json
from datetime import datetime
from netmiko import ConnectHandler
import smtplib
from email.mime.text import MIMEText
class DeviceMonitor:
def __init__(self, device_info, check_interval=300):
self.device = device_info
self.check_interval = check_interval
self.thresholds = {
'cpu_usage': 80, # CPU使用率阈值%
'memory_usage': 85, # 内存使用率阈值%
'temperature': 60, # 温度阈值℃
}
def collect_metrics(self):
"""收集设备指标"""
metrics = {
'timestamp': datetime.now().isoformat(),
'device': self.device['name'],
'ip': self.device['ip']
}
try:
conn = ConnectHandler(**self.device, timeout=10)
# 收集CPU使用率
cpu_output = conn.send_command('display cpu-usage')
cpu_match = re.search(r'CPU Usage\s*:\s*(\d+)%', cpu_output)
if cpu_match:
metrics['cpu_usage'] = int(cpu_match.group(1))
# 收集内存使用率
memory_output = conn.send_command('display memory-usage')
memory_match = re.search(r'Memory Using Percentage:\s*(\d+)%', memory_output)
if memory_match:
metrics['memory_usage'] = int(memory_match.group(1))
# 收集温度信息
temp_output = conn.send_command('display temperature all')
temp_match = re.search(r'Temperature\s*:\s*(\d+)', temp_output)
if temp_match:
metrics['temperature'] = int(temp_match.group(1))
# 收集接口状态
interface_output = conn.send_command('display interface brief')
up_count = interface_output.count('up')
down_count = interface_output.count('down')
metrics['interfaces_up'] = up_count
metrics['interfaces_down'] = down_count
conn.disconnect()
except Exception as e:
metrics['error'] = str(e)
return metrics
def check_thresholds(self, metrics):
"""检查阈值并生成告警"""
alerts = []
if 'cpu_usage' in metrics and metrics['cpu_usage'] > self.thresholds['cpu_usage']:
alerts.append(f"CPU使用率过高: {metrics['cpu_usage']}% (阈值: {self.thresholds['cpu_usage']}%)")
if 'memory_usage' in metrics and metrics['memory_usage'] > self.thresholds['memory_usage']:
alerts.append(f"内存使用率过高: {metrics['memory_usage']}% (阈值: {self.thresholds['memory_usage']}%)")
if 'temperature' in metrics and metrics['temperature'] > self.thresholds['temperature']:
alerts.append(f"温度过高: {metrics['temperature']}℃ (阈值: {self.thresholds['temperature']}℃)")
if 'interfaces_down' in metrics and metrics['interfaces_down'] > 0:
alerts.append(f"有 {metrics['interfaces_down']} 个接口处于down状态")
return alerts
def send_alert(self, alerts, metrics):
"""发送告警邮件"""
if not alerts:
return
# 构建邮件内容
subject = f"设备告警: {self.device['name']} ({self.device['ip']})"
body = f"设备: {self.device['name']}\n"
body += f"IP地址: {self.device['ip']}\n"
body += f"时间: {metrics['timestamp']}\n\n"
body += "告警信息:\n"
for alert in alerts:
body += f"- {alert}\n"
body += "\n当前指标:\n"
for key, value in metrics.items():
if key not in ['timestamp', 'device', 'ip', 'error']:
body += f"- {key}: {value}\n"
# 发送邮件(这里需要配置SMTP服务器)
# 实际使用时需要填写真实的SMTP配置
try:
msg = MIMEText(body, 'plain', 'utf-8')
msg['Subject'] = subject
msg['From'] = 'monitor@example.com'
msg['To'] = 'admin@example.com'
# 连接SMTP服务器并发送
# with smtplib.SMTP('smtp.example.com', 587) as server:
# server.starttls()
# server.login('username', 'password')
# server.send_message(msg)
print(f"模拟发送告警邮件:\n{body}")
except Exception as e:
print(f"发送告警邮件失败: {str(e)}")
def save_metrics(self, metrics):
"""保存指标到文件"""
filename = f"metrics_{self.device['name']}.jsonl"
with open(filename, 'a', encoding='utf-8') as f:
f.write(json.dumps(metrics) + '\n')
def run_monitoring(self, duration_hours=24):
"""运行监控"""
end_time = time.time() + (duration_hours * 3600)
print(f"开始监控设备 {self.device['name']},将持续 {duration_hours} 小时")
while time.time() < end_time:
try:
# 收集指标
metrics = self.collect_metrics()
# 保存指标
self.save_metrics(metrics)
# 检查告警
alerts = self.check_thresholds(metrics)
# 发送告警
if alerts:
self.send_alert(alerts, metrics)
print(f"检测到告警: {alerts}")
else:
print(f"{datetime.now()}: 设备状态正常")
# 等待下一个检查周期
time.sleep(self.check_interval)
except KeyboardInterrupt:
print("监控被用户中断")
break
except Exception as e:
print(f"监控过程中出错: {str(e)}")
time.sleep(60) # 出错后等待1分钟再重试
print("监控结束")
# 使用示例
if __name__ == "__main__":
device = {
'device_type': 'huawei',
'ip': '192.168.1.100',
'username': 'admin',
'password': 'Admin@123',
'name': '核心交换机-01'
}
monitor = DeviceMonitor(device, check_interval=60) # 每60秒检查一次
# 监控1小时(测试用)
monitor.run_monitoring(duration_hours=1)
```
这个监控脚本可以扩展为服务,持续运行并收集数据。收集的数据可以导入到Prometheus、Grafana等监控系统中进行可视化。
### 4.3 生产环境部署建议
当你的脚本从实验室走向生产环境时,需要考虑更多因素:
**1. 代码版本控制**
- 使用Git管理所有脚本和配置文件
- 建立清晰的版本发布流程
- 为每个生产变更打上标签
**2. 配置管理**
- 将设备凭证存储在安全的配置管理系统(如HashiCorp Vault)中
- 使用环境变量或配置文件管理不同环境的参数
- 实现配置的加密存储和传输
**3. 错误处理与重试机制**
- 实现指数退避的重试策略
- 记录详细的错误日志,便于排查
- 设置操作超时,避免脚本挂起
**4. 权限与审计**
- 遵循最小权限原则,为脚本分配必要的权限
- 记录所有自动化操作的操作日志
- 定期审计脚本的执行记录
**5. 测试策略**
- 建立完整的测试环境,模拟生产网络
- 实现单元测试和集成测试
- 每次变更前在测试环境充分验证
**6. 文档与知识库**
- 为每个脚本编写详细的使用文档
- 记录常见问题和解决方案
- 建立团队内部的知识共享机制
下面是一个生产环境就绪的脚本框架示例:
```python
"""
生产环境自动化脚本框架
功能:安全的设备配置管理
作者:网络自动化团队
版本:1.0.0
"""
import os
import sys
import logging
import argparse
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
import yaml
from dotenv import load_dotenv
from netmiko import ConnectHandler
# 加载环境变量
load_dotenv()
class ProductionConfigManager:
"""生产环境配置管理器"""
def __init__(self, config_path: str = None):
self.setup_logging()
self.load_configs(config_path)
self.validate_environment()
def setup_logging(self):
"""配置结构化日志"""
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"automation_{datetime.now().strftime('%Y%m%d')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(module)s:%(lineno)d - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def load_configs(self, config_path: str):
"""加载配置文件"""
# 从环境变量获取敏感信息
self.default_username = os.getenv('NETWORK_USERNAME')
self.default_password = os.getenv('NETWORK_PASSWORD')
if not self.default_username or not self.default_password:
self.logger.error("未设置网络设备认证信息")
sys.exit(1)
# 加载设备清单
if config_path and Path(config_path).exists():
with open(config_path, 'r', encoding='utf-8') as f:
self.device_inventory = yaml.safe_load(f)
else:
# 默认设备清单
self.device_inventory = {
'devices': [
{
'name': 'test-device',
'ip': '192.168.1.100',
'device_type': 'huawei'
}
]
}
self.logger.warning("使用默认设备清单,建议提供配置文件")
def validate_environment(self):
"""验证运行环境"""
required_vars = ['NETWORK_USERNAME', 'NETWORK_PASSWORD']
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
self.logger.error(f"缺少必需的环境变量: {', '.join(missing_vars)}")
sys.exit(1)
self.logger.info("环境验证通过")
def get_device_connection(self, device_info: Dict[str, Any]) -> ConnectHandler:
"""获取设备连接"""
connection_params = {
'device_type': device_info.get('device_type', 'huawei'),
'ip': device_info['ip'],
'username': device_info.get('username', self.default_username),
'password': device_info.get('password', self.default_password),
'port': device_info.get('port', 22),
'timeout': 30,
'session_timeout': 30,
'banner_timeout': 15,
'auth_timeout': 15,
}
try:
self.logger.info(f"连接设备: {device_info.get('name', device_info['ip'])}")
connection = ConnectHandler(**connection_params)
return connection
except Exception as e:
self.logger.error(f"连接设备失败: {str(e)}")
raise
def execute_safely(self, device_info: Dict[str, Any], commands: List[str],
dry_run: bool = False) -> Dict[str, Any]:
"""安全执行命令"""
result = {
'device': device_info.get('name', device_info['ip']),
'success': False,
'output': '',
'error': None,
'backup_file': None
}
try:
conn = self.get_device_connection(device_info)
# 备份当前配置
backup = conn.send_command('display current-configuration')
backup_file = f"backup_{device_info['name']}_{datetime.now().strftime('%H%M%S')}.cfg"
with open(backup_file, 'w', encoding='utf-8') as f:
f.write(backup)
result['backup_file'] = backup_file
if dry_run:
self.logger.info(f"模拟执行(干跑模式): {device_info['name']}")
result['output'] = "干跑模式 - 未实际执行命令"
result['success'] = True
else:
# 实际执行命令
output = conn.send_config_set(commands)
result['output'] = output
# 保存配置
conn.send_command('save', expect_string=r'\[Y/N\]:')
conn.send_command('Y', expect_string=r'\[Y/N\]:')
result['success'] = True
self.logger.info(f"命令执行成功: {device_info['name']}")
conn.disconnect()
except Exception as e:
result['error'] = str(e)
self.logger.error(f"执行失败: {device_info['name']} - {str(e)}")
return result
def batch_execute(self, device_group: str, commands: List[str],
dry_run: bool = False) -> List[Dict[str, Any]]:
"""批量执行命令"""
if device_group not in self.device_inventory:
self.logger.error(f"设备组不存在: {device_group}")
return []
devices = self.device_inventory[device_group]
results = []
self.logger.info(f"开始批量执行,设备组: {device_group}, 设备数: {len(devices)}")
for device in devices:
result = self.execute_safely(device, commands, dry_run)
results.append(result)
# 添加延迟,避免对设备造成压力
import time
time.sleep(1)
# 生成执行报告
self.generate_execution_report(results, device_group, dry_run)
return results
def generate_execution_report(self, results: List[Dict[str, Any]],
device_group: str, dry_run: bool):
"""生成执行报告"""
success_count = sum(1 for r in results if r['success'])
fail_count = len(results) - success_count
report = [
"=" * 60,
f"批量执行报告",
f"设备组: {device_group}",
f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"干跑模式: {'是' if dry_run else '否'}",
f"总设备数: {len(results)}",
f"成功: {success_count}",
f"失败: {fail_count}",
"=" * 60,
]
if fail_count > 0:
report.append("\n失败设备详情:")
for result in results:
if not result['success']:
report.append(f" - {result['device']}: {result.get('error', '未知错误')}")
report_text = '\n'.join(report)
print(report_text)
# 保存报告到文件
report_dir = Path("reports")
report_dir.mkdir(exist_ok=True)
report_file = report_dir / f"report_{device_group}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report_text)
self.logger.info(f"报告已保存: {report_file}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='生产环境网络设备配置工具')
parser.add_argument('--config', '-c', help='设备配置文件路径')
parser.add_argument('--group', '-g', required=True, help='设备组名称')
parser.add_argument('--commands', '-cmd', required=True, help='要执行的命令,用分号分隔')
parser.add_argument('--dry-run', action='store_true', help='干跑模式(不实际执行)')
parser.add_argument('--verbose', '-v', action='store_true', help='详细输出')
args = parser.parse_args()
# 解析命令
commands = [cmd.strip() for cmd in args.commands.split(';') if cmd.strip()]
if not commands:
print("错误: 未提供有效的命令")
sys.exit(1)
# 初始化管理器
manager = ProductionConfigManager(args.config)
if args.verbose:
print(f"设备组: {args.group}")
print(f"命令列表: {commands}")
print(f"干跑模式: {args.dry_run}")
# 执行批量操作
results = manager.batch_execute(args.group, commands, args.dry_run)
# 根据结果退出
success_count = sum(1 for r in results if r['success'])
if success_count == len(results):
sys.exit(0) # 全部成功
else:
sys.exit(1) # 有失败
if __name__ == "__main__":
main()
```
这个框架提供了生产环境所需的核心功能:安全的凭证管理、完善的日志记录、干跑模式支持、执行报告生成等。你可以基于这个框架开发具体的业务逻辑。
在实际部署时,我建议将这样的脚本部署在专门的自动化服务器上,通过CI/CD流水线进行版本管理和发布,结合任务调度系统(如Apache Airflow)定期执行监控和配置检查任务。