# SecureCRT Python脚本实战:构建企业级SSH批量管理与智能日志系统
在当今快节奏的IT运维环境中,网络工程师经常需要同时管理数十甚至上百台设备。传统的手动登录操作不仅效率低下,还容易出错。本文将深入探讨如何利用SecureCRT的Python脚本功能,打造一套完整的SSH批量操作解决方案,涵盖IP列表处理、命令批量执行、异常捕获和智能日志记录等高级功能。
## 1. SecureCRT Python脚本开发环境搭建
### 1.1 版本兼容性检查
在开始编写脚本前,首先要确保开发环境配置正确:
```python
# 脚本头声明(必须放在脚本最开头)
# $language = "Python"
# $interface = "1.0"
```
**版本注意事项**:
- SecureCRT 8.x及更早版本仅支持Python 2.7
- SecureCRT 9.0+开始支持Python 3.8.x(需单独安装)
- 32位/64位版本必须与Python安装版本一致
> 提示:如果遇到脚本引擎加载失败,检查SecureCRT选项中的"脚本引擎"设置,确保Python路径配置正确。
### 1.2 基础API快速入门
SecureCRT提供了丰富的API接口,主要分为两大类:
1. **Dialog功能**:用于创建用户交互界面
2. **Screen功能**:用于终端屏幕控制和数据获取
以下是一个最简单的连接示例:
```python
def main():
# 基本连接参数
host = '192.168.1.1'
username = 'admin'
password = 'password'
# 构建连接命令
cmd = "/SSH2 /L {} /PASSWORD {} /C 3DES /M MD5 {}".format(
username, password, host)
# 建立连接
crt.Session.Connect(cmd)
# 等待连接完成
crt.Screen.WaitForString("#")
# 发送测试命令
crt.Screen.Send("show version\n")
main()
```
## 2. 企业级IP列表处理方案
### 2.1 智能IP列表解析
实际运维中,我们经常需要处理各种格式的IP列表。下面是一个健壮的IP列表处理器:
```python
import re
def parse_ip_list(file_path):
"""
解析多种格式的IP列表文件
支持格式:
- 纯IP列表
- IP+端口(192.168.1.1:22)
- 带注释的IP(#开头为注释)
"""
ip_list = []
with open(file_path, 'r') as f:
for line in f:
line = line.strip()
# 跳过空行和注释
if not line or line.startswith('#'):
continue
# 提取IP和端口
match = re.match(r'^([\d.]+)(?::(\d+))?$', line)
if match:
ip = match.group(1)
port = match.group(2) or '22' # 默认SSH端口
ip_list.append((ip, port))
return ip_list
```
### 2.2 IP列表验证与异常处理
```python
def validate_ip(ip):
"""验证IP地址格式"""
pattern = r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$'
if not re.match(pattern, ip):
return False
octets = list(map(int, ip.split('.')))
return all(0 <= octet <= 255 for octet in octets)
def check_connectivity(ip, port=22, timeout=3):
"""检查IP和端口的连通性"""
import socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip, int(port)))
return result == 0
except Exception as e:
return False
finally:
sock.close()
```
## 3. 高级SSH批量操作框架
### 3.1 核心执行引擎设计
```python
class SSHBatchExecutor:
def __init__(self, ip_list_file, command_file, output_dir="logs"):
self.ip_list = parse_ip_list(ip_list_file)
self.commands = self._load_commands(command_file)
self.output_dir = output_dir
self.failed_hosts = []
# 创建输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
def _load_commands(self, command_file):
"""从文件加载要执行的命令列表"""
with open(command_file, 'r') as f:
return [line.strip() for line in f if line.strip()]
def execute_on_host(self, ip, port):
"""在单个主机上执行命令序列"""
log_file = os.path.join(self.output_dir, f"{ip.replace('.', '_')}.log")
try:
# 建立连接
cmd = f"/SSH2 /L {username} /PASSWORD {password} /C 3DES /M MD5 {ip}:{port}"
crt.Session.Connect(cmd)
# 等待登录完成
if not crt.Screen.WaitForString(["#", ">", "$"], 10):
raise Exception("登录超时或提示符未识别")
# 执行命令并记录输出
with open(log_file, 'w') as f:
for command in self.commands:
crt.Screen.Send(command + "\n")
# 等待命令完成
crt.Screen.WaitForString(["#", ">", "$"], 5)
# 获取屏幕输出
output = crt.Screen.Get2(1, 1, crt.Screen.CurrentRow, crt.Screen.CurrentColumn)
f.write(f"=== {command} ===\n{output}\n\n")
return True
except Exception as e:
# 记录失败信息
with open(log_file, 'a') as f:
f.write(f"\n!!! 执行失败: {str(e)}\n")
self.failed_hosts.append((ip, str(e)))
return False
finally:
# 确保断开连接
if crt.Session.Connected:
crt.Session.Disconnect()
```
### 3.2 异常处理与重试机制
```python
def execute_with_retry(self, ip, port, max_retries=3):
"""带重试机制的执行方法"""
for attempt in range(1, max_retries + 1):
try:
if self.execute_on_host(ip, port):
return True
if attempt < max_retries:
crt.Dialog.MessageBox(
f"主机 {ip} 第 {attempt} 次尝试失败,剩余 {max_retries - attempt} 次重试",
"警告", 48)
except Exception as e:
if attempt == max_retries:
raise e
return False
```
## 4. 智能日志系统实现
### 4.1 结构化日志记录
```python
class StructuredLogger:
def __init__(self, base_dir="logs"):
self.base_dir = base_dir
self.session_log = os.path.join(base_dir, "session_audit.log")
self._init_logs()
def _init_logs(self):
"""初始化日志文件和目录"""
if not os.path.exists(self.base_dir):
os.makedirs(self.base_dir)
# 写入日志头
with open(self.session_log, 'a') as f:
f.write("\n" + "="*80 + "\n")
f.write(f"新的会话日志 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("="*80 + "\n\n")
def log_command(self, host, command, output, status="SUCCESS"):
"""记录命令执行详情"""
entry = {
'timestamp': datetime.now().isoformat(),
'host': host,
'command': command,
'status': status,
'output': output
}
# 写入JSON格式日志
with open(self.session_log, 'a') as f:
json.dump(entry, f, ensure_ascii=False)
f.write("\n")
# 同时写入主机专属日志
host_log = os.path.join(self.base_dir, f"{host.replace('.', '_')}.log")
with open(host_log, 'a') as f:
f.write(f"[{entry['timestamp']}] {command} ({status})\n")
f.write(output + "\n\n")
```
### 4.2 日志分析与报告生成
```python
def generate_report(log_dir, output_file="report.html"):
"""从日志生成HTML报告"""
# 收集所有日志数据
logs = []
for log_file in glob.glob(os.path.join(log_dir, "*.log")):
if log_file.endswith("session_audit.log"):
continue
with open(log_file, 'r') as f:
content = f.read()
logs.append({
'host': os.path.basename(log_file).replace('.log', '').replace('_', '.'),
'content': content
})
# 生成HTML报告
html_template = """
<html>
<head>
<title>设备巡检报告</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.host { margin-bottom: 30px; border-bottom: 1px solid #ccc; }
.hostname { font-weight: bold; color: #0066cc; }
pre { background: #f5f5f5; padding: 10px; border-radius: 3px; }
</style>
</head>
<body>
<h1>设备巡检报告 - {date}</h1>
{content}
</body>
</html>
"""
host_contents = []
for log in logs:
host_contents.append(f"""
<div class="host">
<div class="hostname">{log['host']}</div>
<pre>{log['content']}</pre>
</div>
""")
with open(output_file, 'w') as f:
f.write(html_template.format(
date=datetime.now().strftime('%Y-%m-%d'),
content="\n".join(host_contents)
))
```
## 5. 完整实战案例:自动化网络巡检系统
### 5.1 系统架构设计
```
网络巡检系统工作流程:
1. 读取IP列表文件
2. 验证设备可达性
3. 并行执行巡检命令
4. 收集并分析输出
5. 生成可视化报告
6. 发送邮件通知
```
### 5.2 主程序实现
```python
def main():
# 初始化组件
ip_list = "devices.txt"
commands = "commands.txt"
output_dir = "inspection_logs"
# 用户确认
if not crt.Dialog.MessageBox(
f"即将对 {len(parse_ip_list(ip_list))} 台设备执行巡检,继续吗?",
"确认", 1):
return
# 创建执行器
executor = SSHBatchExecutor(ip_list, commands, output_dir)
logger = StructuredLogger(output_dir)
# 遍历执行
total = len(executor.ip_list)
for i, (ip, port) in enumerate(executor.ip_list, 1):
crt.Dialog.MessageBox(f"正在处理 {ip} ({i}/{total})...", "进度")
try:
if executor.execute_with_retry(ip, port):
logger.log_command(ip, "巡检命令集", "所有命令执行成功")
else:
logger.log_command(ip, "巡检命令集", "部分命令执行失败", "WARNING")
except Exception as e:
logger.log_command(ip, "巡检命令集", str(e), "ERROR")
# 生成报告
generate_report(output_dir)
# 显示摘要
summary = f"""
巡检完成!
总设备数: {total}
成功: {total - len(executor.failed_hosts)}
失败: {len(executor.failed_hosts)}
"""
crt.Dialog.MessageBox(summary, "巡检摘要")
if __name__ == "__main__":
main()
```
### 5.3 高级功能扩展
**并行执行优化**:
```python
from threading import Thread
class ParallelExecutor:
def __init__(self, max_workers=5):
self.max_workers = max_workers
def run(self, task_func, items):
"""并行执行任务"""
threads = []
results = []
for item in items:
while len(threads) >= self.max_workers:
self._clean_finished_threads(threads)
t = Thread(target=lambda: results.append(task_func(item)))
t.start()
threads.append(t)
# 等待所有线程完成
for t in threads:
t.join()
return results
def _clean_finished_threads(self, threads):
"""清理已完成的线程"""
for t in threads[:]:
if not t.is_alive():
threads.remove(t)
```
**邮件通知集成**:
```python
def send_email(subject, body, attachments=None):
"""发送邮件通知"""
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
# 配置邮件服务器
smtp_server = "smtp.example.com"
smtp_port = 587
username = "your_email@example.com"
password = "your_password"
# 创建邮件
msg = MIMEMultipart()
msg['From'] = username
msg['To'] = "recipient@example.com"
msg['Subject'] = subject
# 添加正文
msg.attach(MIMEText(body, 'plain'))
# 添加附件
if attachments:
for filepath in attachments:
part = MIMEBase('application', 'octet-stream')
with open(filepath, 'rb') as f:
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
f'attachment; filename="{os.path.basename(filepath)}"')
msg.attach(part)
# 发送邮件
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(username, password)
server.send_message(msg)
server.quit()
```
## 6. 安全增强与最佳实践
### 6.1 敏感信息处理
```python
from cryptography.fernet import Fernet
class CredentialManager:
def __init__(self, key_file="secret.key"):
self.key_file = key_file
self.key = self._load_or_generate_key()
self.cipher = Fernet(self.key)
def _load_or_generate_key(self):
"""加载或生成加密密钥"""
if os.path.exists(self.key_file):
with open(self.key_file, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(self.key_file, 'wb') as f:
f.write(key)
return key
def encrypt(self, data):
"""加密数据"""
return self.cipher.encrypt(data.encode()).decode()
def decrypt(self, encrypted_data):
"""解密数据"""
return self.cipher.decrypt(encrypted_data.encode()).decode()
# 使用示例
cred_manager = CredentialManager()
encrypted = cred_manager.encrypt("my_password")
decrypted = cred_manager.decrypt(encrypted)
```
### 6.2 脚本权限控制
```python
def check_user_permission():
"""检查用户权限"""
import getpass
allowed_users = ["admin", "operator1", "operator2"]
current_user = getpass.getuser()
if current_user not in allowed_users:
crt.Dialog.MessageBox(
f"用户 {current_user} 无权限执行此脚本", "错误", 16)
raise PermissionError("Insufficient privileges")
# 可选:验证sudo权限
if os.name == 'posix':
import subprocess
try:
subprocess.check_call(['sudo', '-n', 'true'])
except subprocess.CalledProcessError:
crt.Dialog.MessageBox(
"需要sudo权限但未配置免密", "错误", 16)
raise
```
## 7. 性能优化技巧
### 7.1 连接池管理
```python
class ConnectionPool:
def __init__(self, max_size=5):
self.max_size = max_size
self.pool = {}
def get_connection(self, host, port, username, password):
"""从池中获取连接"""
key = f"{host}:{port}"
if key in self.pool:
conn = self.pool[key]
if self._check_connection_alive(conn):
return conn
else:
del self.pool[key]
# 创建新连接
if len(self.pool) >= self.max_size:
self._evict_oldest()
conn = self._create_connection(host, port, username, password)
self.pool[key] = conn
return conn
def _check_connection_alive(self, conn):
"""检查连接是否仍然活跃"""
try:
conn.Send("echo test\n")
return conn.WaitForString("test", 2)
except:
return False
def _create_connection(self, host, port, username, password):
"""创建新连接"""
cmd = f"/SSH2 /L {username} /PASSWORD {password} /C 3DES /M MD5 {host}:{port}"
session = crt.Session.Connect(cmd)
session.WaitForString(["#", ">", "$"], 10)
return session
def _evict_oldest(self):
"""移除最旧的连接"""
oldest_key = next(iter(self.pool))
self.pool[oldest_key].Disconnect()
del self.pool[oldest_key]
def cleanup(self):
"""清理所有连接"""
for conn in self.pool.values():
conn.Disconnect()
self.pool.clear()
```
### 7.2 命令执行超时控制
```python
def execute_with_timeout(command, timeout=30):
"""带超时控制的命令执行"""
import threading
result = {"output": None, "error": None}
def worker():
try:
crt.Screen.Send(command + "\n")
output = []
start_time = time.time()
while time.time() - start_time < timeout:
if crt.Screen.WaitForString(["\n", "#", ">", "$"], 1):
row = crt.Screen.CurrentRow
line = crt.Screen.Get(row, 1, row, crt.Screen.CurrentColumn)
output.append(line.strip())
if "#" in output[-1] or ">" in output[-1] or "$" in output[-1]:
break
result["output"] = "\n".join(output)
except Exception as e:
result["error"] = str(e)
t = threading.Thread(target=worker)
t.start()
t.join(timeout)
if t.is_alive():
crt.Screen.Send("\x03") # 发送Ctrl+C中断命令
result["error"] = "命令执行超时"
return result
```
## 8. 调试与故障排除
### 8.1 脚本调试技巧
```python
# 调试模式开关
DEBUG_MODE = True
def debug_log(message):
"""调试日志记录"""
if DEBUG_MODE:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("debug.log", "a") as f:
f.write(f"[{timestamp}] {message}\n")
def capture_screen(filename="screen_capture.txt"):
"""捕获当前屏幕内容"""
rows = crt.Screen.Rows
cols = crt.Screen.Columns
content = crt.Screen.Get2(1, 1, rows, cols)
with open(filename, "w") as f:
f.write(content)
return content
```
### 8.2 常见问题解决方案
**问题1:脚本执行速度慢**
*解决方案*:
- 减少`WaitForString`的超时时间
- 关闭屏幕同步(`crt.Screen.Synchronous = False`)
- 使用并行执行
**问题2:命令输出不完整**
*解决方案*:
```python
# 确保屏幕同步开启
crt.Screen.Synchronous = True
# 增加适当的等待时间
crt.Screen.WaitForString(["#", ">", "$"], 5)
# 使用Get2而不是Get获取完整输出
output = crt.Screen.Get2(1, 1, crt.Screen.CurrentRow, crt.Screen.CurrentColumn)
```
**问题3:特殊字符处理异常**
*解决方案*:
```python
# 发送特殊字符
def send_special(key):
special_keys = {
"enter": "\r",
"tab": "\t",
"esc": "\x1b",
"ctrl+c": "\x03"
}
crt.Screen.Send(special_keys.get(key, key))
```
## 9. 企业级部署方案
### 9.1 集中式脚本管理
```
建议的脚本目录结构:
/scripts
├── /bin # 可执行脚本
├── /lib # 公共库文件
├── /config # 配置文件
├── /logs # 日志目录
└── /templates # 报告模板
```
### 9.2 版本控制集成
```python
def update_from_git(repo_url, local_dir):
"""从Git仓库更新脚本"""
import subprocess
if not os.path.exists(local_dir):
# 首次克隆
subprocess.check_call(["git", "clone", repo_url, local_dir])
else:
# 拉取更新
subprocess.check_call(["git", "-C", local_dir, "pull"])
# 验证更新
commit_hash = subprocess.check_output(
["git", "-C", local_dir, "rev-parse", "HEAD"]).decode().strip()
return f"更新成功,当前版本: {commit_hash}"
```
## 10. 未来扩展方向
### 10.1 与CMDB集成
```python
def get_devices_from_cmdb(api_url, token):
"""从CMDB系统获取设备列表"""
import requests
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
try:
response = requests.get(api_url, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
crt.Dialog.MessageBox(f"CMDB接口错误: {str(e)}", "错误", 16)
return []
```
### 10.2 自动化测试框架集成
```python
class AutomatedTester:
def __init__(self, test_cases):
self.test_cases = test_cases
def run_tests(self):
results = []
for case in self.test_cases:
try:
output = self._execute_test_case(case)
results.append({
"name": case["name"],
"status": "PASSED",
"output": output
})
except Exception as e:
results.append({
"name": case["name"],
"status": "FAILED",
"error": str(e)
})
return results
def _execute_test_case(self, case):
"""执行单个测试用例"""
# 连接到设备
self._connect(case["host"], case["port"], case["credentials"])
# 执行测试命令
outputs = []
for command in case["commands"]:
crt.Screen.Send(command + "\n")
output = crt.Screen.WaitForString(["#", ">", "$"], case["timeout"])
outputs.append(output)
# 验证输出
for pattern in case["expected_patterns"]:
if not any(pattern in out for out in outputs):
raise AssertionError(f"未找到预期模式: {pattern}")
return "\n".join(outputs)
```
通过本文介绍的技术方案,您可以构建一个完整的企业级网络设备自动化管理系统。这套系统不仅能够显著提高运维效率,还能通过标准化的操作流程和详尽的日志记录,大大降低人为错误的发生概率。