## 1. 信号处理:从操作系统到Python的桥梁
信号,听起来有点玄乎,但你可以把它想象成你手机上的通知。当有新消息、来电或者电量不足时,手机会用不同的铃声或震动提醒你。在操作系统中,信号就是类似的机制——它是进程间通信的一种方式,用来通知某个进程发生了特定事件。
我在实际项目中第一次接触信号处理,是因为一个线上服务总是被运维同学“暴力”杀掉。每次发版更新,他们直接 `kill -9` 把进程干掉,结果导致正在处理的请求突然中断,数据库连接没有正常关闭,留下了一堆僵尸连接。后来我们引入了信号处理机制,让服务能够“优雅退出”,这才解决了问题。
Python 的 `signal` 模块就是用来处理这些信号的。它让你能够捕获操作系统发送给进程的信号,并执行自定义的处理逻辑。比如当用户按下 Ctrl+C(发送 SIGINT 信号)时,你可以不让程序立即退出,而是先完成手头的工作,保存好数据,然后再干净利落地结束。
信号处理的核心思想很简单:**注册一个回调函数**。当特定信号到达时,这个函数就会被调用。这个函数接收两个参数:信号编号和当前的栈帧信息。栈帧信息对调试很有用,但在大多数实际场景中,我们更关心的是信号本身。
```python
import signal
import time
def graceful_shutdown(signum, frame):
"""优雅关闭的处理函数"""
print(f"\n收到信号 {signum},开始清理资源...")
# 这里可以执行清理操作:关闭数据库连接、保存临时文件等
print("资源清理完成,准备退出")
exit(0)
# 注册信号处理函数
signal.signal(signal.SIGINT, graceful_shutdown) # Ctrl+C
signal.signal(signal.SIGTERM, graceful_shutdown) # kill 默认信号
print("程序运行中,按 Ctrl+C 可以测试优雅退出")
print(f"进程ID: {os.getpid()}")
# 模拟一个长时间运行的任务
try:
while True:
print(".", end="", flush=True)
time.sleep(1)
except KeyboardInterrupt:
# 这个异常会在信号处理函数执行后被触发
pass
```
这段代码展示了最基本的信号处理模式。我建议你在自己的机器上运行一下,然后按 Ctrl+C 看看效果。你会发现程序不会立即退出,而是先执行清理逻辑。这对于需要保证数据一致性的服务来说至关重要。
## 2. 信号处理的核心函数与基础用法
### 2.1 signal.signal():注册你的回调
`signal.signal()` 是信号处理中最核心的函数,它的作用就是告诉系统:“当这个信号到来时,请调用我指定的函数”。函数签名很简单:`signal.signal(signalnum, handler)`。
`signalnum` 就是信号编号,Python 提供了很多预定义的常量,比如 `signal.SIGINT`(中断信号)、`signal.SIGTERM`(终止信号)等。`handler` 就是你的处理函数,它必须接受两个参数:信号编号和栈帧。
这里有个很重要的细节:**信号处理函数只能在主线程中注册**。如果你在子线程中调用 `signal.signal()`,Python 会抛出 `ValueError`。这是因为信号处理涉及进程级别的状态改变,必须由主线程统一管理。
```python
import signal
import threading
import time
def handler(signum, frame):
print(f"线程 {threading.current_thread().name} 收到信号 {signum}")
def worker():
try:
# 在子线程中注册信号处理器会失败
signal.signal(signal.SIGUSR1, handler)
except ValueError as e:
print(f"子线程注册失败: {e}")
# 在主线程注册
signal.signal(signal.SIGUSR1, handler)
# 创建并启动子线程
t = threading.Thread(target=worker, name="WorkerThread")
t.start()
t.join()
print("主线程准备接收信号...")
time.sleep(5)
```
### 2.2 signal.getsignal():查看当前的处理器
有时候你需要知道某个信号当前绑定了什么处理函数,特别是当你接手别人的代码或者调试复杂系统时。`signal.getsignal()` 就是干这个的。
```python
import signal
# 查看 SIGINT 的当前处理器
handler = signal.getsignal(signal.SIGINT)
print(f"SIGINT 当前处理器: {handler}")
# 如果是默认处理器,会显示 <built-in function default_int_handler>
# 如果是忽略信号,会显示 1(对应 SIG_IGN)
# 如果是自定义函数,会显示函数对象
# 你也可以检查所有信号
for name in dir(signal):
if name.startswith('SIG') and not name.startswith('SIG_'):
sig = getattr(signal, name)
try:
h = signal.getsignal(sig)
print(f"{name:15} -> {h}")
except (AttributeError, ValueError):
# 有些信号在某些平台上不可用
pass
```
### 2.3 signal.strsignal():获取信号的描述信息
这个函数在 Python 3.8 中引入,非常实用。它能把信号编号转换成人类可读的描述。
```python
import signal
# 获取信号描述
print(f"SIGINT: {signal.strsignal(signal.SIGINT)}") # 输出: Interrupt
print(f"SIGTERM: {signal.strsignal(signal.SIGTERM)}") # 输出: Terminated
print(f"SIGKILL: {signal.strsignal(signal.SIGKILL)}") # 输出: Killed
# 对于不存在的信号会返回 None
print(f"999: {signal.strsignal(999)}") # 输出: None
```
这个函数在写日志或者调试信息时特别有用,你不需要记住每个信号编号对应的含义,直接调用 `strsignal()` 就行。
## 3. 定时任务与超时控制实战
### 3.1 signal.alarm():最简单的定时器
`signal.alarm()` 是 Unix/Linux 系统特有的功能,它能在指定秒数后向进程自身发送 `SIGALRM` 信号。这个功能最常见的用途就是实现超时控制。
我最早用这个功能是在处理外部 API 调用时。有些第三方服务响应不稳定,有时候会挂起,如果不设置超时,整个程序都会卡住。用 `signal.alarm()` 就能优雅地解决这个问题。
```python
import signal
import time
class TimeoutError(Exception):
"""自定义超时异常"""
pass
def timeout_handler(signum, frame):
"""超时处理函数"""
raise TimeoutError("操作超时")
def risky_operation(duration):
"""模拟一个可能耗时的操作"""
print(f"开始执行耗时操作,预计 {duration} 秒")
time.sleep(duration)
return "操作成功"
def run_with_timeout(func, args=(), kwargs={}, timeout=5):
"""带超时限制执行函数"""
# 保存原来的信号处理器
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
try:
# 设置定时器
signal.alarm(timeout)
# 执行可能超时的操作
result = func(*args, **kwargs)
# 如果正常完成,取消定时器
signal.alarm(0)
return result
except TimeoutError:
print(f"函数 {func.__name__} 执行超时({timeout}秒)")
return None
finally:
# 恢复原来的信号处理器
signal.signal(signal.SIGALRM, old_handler)
# 测试:正常情况
print("测试1:3秒操作,5秒超时限制")
result = run_with_timeout(risky_operation, (3,), timeout=5)
print(f"结果: {result}")
print("\n" + "="*50 + "\n")
# 测试:超时情况
print("测试2:8秒操作,5秒超时限制")
result = run_with_timeout(risky_operation, (8,), timeout=5)
print(f"结果: {result}")
```
这个模式在实际项目中非常有用。不过要注意几个细节:
1. `signal.alarm()` 是单次的,触发后需要重新设置
2. 如果设置了新的 alarm,旧的会被取消
3. 记得在 finally 块中恢复原来的信号处理器,避免影响其他代码
### 3.2 更精细的定时器:setitimer()
如果你需要周期性的定时任务,或者更精细的时间控制(比如毫秒级),`signal.setitimer()` 是更好的选择。它支持三种类型的定时器:
| 定时器类型 | 对应信号 | 描述 |
|-----------|---------|------|
| `signal.ITIMER_REAL` | `SIGALRM` | 真实时间,无论进程是否运行 |
| `signal.ITIMER_VIRTUAL` | `SIGVTALRM` | 进程在用户态运行的时间 |
| `signal.ITIMER_PROF` | `SIGPROF` | 进程在用户态和内核态运行的总时间 |
```python
import signal
import time
def periodic_handler(signum, frame):
"""周期性定时器的处理函数"""
current_time = time.strftime("%H:%M:%S")
print(f"[{current_time}] 定时器触发,信号: {signum}")
def setup_periodic_timer(interval=2.5):
"""设置周期性定时器"""
# 2.5秒后第一次触发,之后每1秒触发一次
signal.setitimer(signal.ITIMER_REAL, 2.5, 1.0)
signal.signal(signal.SIGALRM, periodic_handler)
print(f"定时器已设置:{interval}秒后首次触发,之后每秒触发")
if __name__ == "__main__":
setup_periodic_timer()
try:
# 主程序做其他事情
count = 0
while count < 10:
print(f"主程序运行中... 计数: {count}")
time.sleep(0.5)
count += 1
finally:
# 清理定时器
signal.setitimer(signal.ITIMER_REAL, 0)
print("定时器已清除")
```
`setitimer()` 的返回值是一个元组 `(delay, interval)`,表示之前定时器的设置。这在需要临时修改定时器然后又恢复的场景下很有用。
## 4. 跨平台差异与实战避坑指南
### 4.1 Windows 与 Linux 的信号支持差异
这是 Python 信号处理中最容易踩坑的地方。很多信号在 Windows 上根本不存在,如果你写的代码要在多平台运行,必须特别注意。
我在一个跨平台项目中就遇到过这个问题:在 Linux 上运行好好的定时任务,在 Windows 上直接报 `AttributeError: module 'signal' has no attribute 'SIGALRM'`。后来才发现 `SIGALRM` 是 Unix/Linux 特有的。
下面这个表格整理了主要差异:
| 信号 | Linux/Mac | Windows | 说明 |
|------|-----------|---------|------|
| `SIGINT` | ✅ | ✅ | Ctrl+C,两者都支持 |
| `SIGTERM` | ✅ | ✅ | 终止信号,都支持 |
| `SIGALRM` | ✅ | ❌ | 定时器信号,仅Unix |
| `SIGCHLD` | ✅ | ❌ | 子进程状态改变,仅Unix |
| `SIGUSR1`/`SIGUSR2` | ✅ | ❌ | 用户自定义信号,仅Unix |
| `CTRL_C_EVENT` | ❌ | ✅ | Windows特有的Ctrl+C事件 |
| `CTRL_BREAK_EVENT` | ❌ | ✅ | Windows特有的Ctrl+Break |
写跨平台代码时,我通常的做法是用 `try-except` 包装,或者用 `hasattr()` 检查:
```python
import signal
import sys
def setup_signal_handlers():
"""跨平台的信号处理器设置"""
def graceful_exit(signum, frame):
print(f"\n收到退出信号,开始清理...")
# 执行清理逻辑
sys.exit(0)
# 公共信号(都支持的)
common_signals = [signal.SIGINT, signal.SIGTERM]
for sig in common_signals:
signal.signal(sig, graceful_exit)
# Unix特有信号
if sys.platform != 'win32':
try:
# SIGUSR1 常用于重新加载配置
signal.signal(signal.SIGUSR1, reload_config_handler)
# SIGALRM 用于超时控制
# 在Windows上这行代码会报错
signal.signal(signal.SIGALRM, timeout_handler)
except AttributeError:
print("警告:某些Unix特有信号在当前平台不可用")
# Windows特有信号处理
if sys.platform == 'win32':
try:
# Windows的Ctrl+C处理
signal.signal(signal.CTRL_C_EVENT, graceful_exit)
signal.signal(signal.CTRL_BREAK_EVENT, graceful_exit)
except AttributeError:
print("警告:Windows特有信号不可用")
def reload_config_handler(signum, frame):
"""重新加载配置(Unix特有)"""
print("收到SIGUSR1,重新加载配置文件...")
# 实际项目中这里会重新读取配置文件
```
### 4.2 信号处理中的线程安全问题
Python 的信号处理有个重要限制:**信号处理器总是在主线程中执行**,即使信号是在子线程中接收的。这意味着你不能用信号来做线程间通信。
我见过有人尝试这样做:
```python
import signal
import threading
import time
import os
def thread_signal_handler(signum, frame):
# 这个函数会在主线程执行,而不是在接收信号的线程
print(f"处理器在 {threading.current_thread().name} 中执行")
def worker():
print(f"工作线程 {threading.current_thread().name} 启动")
# 模拟工作
time.sleep(10)
print(f"工作线程 {threading.current_thread().name} 结束")
# 注册处理器
signal.signal(signal.SIGUSR1, thread_signal_handler)
# 启动工作线程
t = threading.Thread(target=worker, name="Worker")
t.start()
# 给工作线程发送信号(实际上信号是发给进程的)
print(f"主线程ID: {os.getpid()}")
print("尝试向工作线程发送信号...")
# 注意:os.kill() 是发给进程的,不是线程
os.kill(os.getpid(), signal.SIGUSR1)
t.join()
```
运行这段代码你会发现,即使信号是在工作线程运行时发送的,处理器还是在主线程中执行。如果你真的需要线程间通信,应该用 `threading` 模块的同步原语,比如 `Event`、`Condition` 等。
### 4.3 信号处理中的阻塞操作限制
信号处理器中应该避免执行阻塞操作或耗时计算。因为信号处理器会中断程序正常的执行流程,如果在处理器中执行耗时操作,会影响整个程序的响应性。
更糟糕的是,有些系统调用在信号处理器中是不安全的。比如在信号处理器中调用 `print()` 实际上是有风险的,因为 `print()` 可能会触发 I/O 操作,而 I/O 操作可能被信号中断。
安全的做法是:在信号处理器中只设置标志位,真正的处理逻辑放到主循环中:
```python
import signal
import time
import threading
class SignalAwareApplication:
def __init__(self):
self.should_exit = False
self.should_reload = False
self.lock = threading.Lock()
# 设置信号处理器
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
if hasattr(signal, 'SIGUSR1'):
signal.signal(signal.SIGUSR1, self._signal_handler)
def _signal_handler(self, signum, frame):
"""信号处理器:只设置标志位"""
with self.lock:
if signum in (signal.SIGINT, signal.SIGTERM):
print(f"收到退出信号 {signum}")
self.should_exit = True
elif signum == signal.SIGUSR1:
print("收到重载信号")
self.should_reload = True
def process_signals(self):
"""在主循环中处理信号标志"""
with self.lock:
if self.should_reload:
self.should_reload = False
self._reload_config()
if self.should_exit:
self._cleanup()
return False
return True
def _reload_config(self):
"""实际的重载配置逻辑"""
print("执行配置重载...")
# 这里执行实际的重载逻辑
time.sleep(0.5) # 模拟耗时操作
print("配置重载完成")
def _cleanup(self):
"""实际的清理逻辑"""
print("执行清理操作...")
time.sleep(0.5) # 模拟清理耗时
print("清理完成,准备退出")
def run(self):
"""主循环"""
print("应用启动,按 Ctrl+C 退出,或发送 SIGUSR1 重载配置")
print(f"进程ID: {os.getpid()}")
while self.process_signals():
# 正常的工作逻辑
print("处理正常业务...")
time.sleep(1)
if __name__ == "__main__":
import os
app = SignalAwareApplication()
app.run()
```
这种“标志位+主循环处理”的模式是信号处理的最佳实践。它既保证了信号处理的及时性,又避免了在信号处理器中执行复杂逻辑的风险。
## 5. 高级应用场景与工程实践
### 5.1 优雅退出:服务端程序的必备技能
对于长时间运行的服务端程序,优雅退出不是可选项,而是必选项。所谓优雅退出,就是在收到终止信号时,程序能够:
1. 停止接受新请求
2. 完成正在处理的请求
3. 释放资源(数据库连接、文件句柄、网络连接等)
4. 然后退出
下面是一个简单的 HTTP 服务器优雅退出的例子:
```python
import signal
import time
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
import socket
class GracefulHTTPServer:
def __init__(self, host='', port=8000):
self.host = host
self.port = port
self.server = None
self.is_shutting_down = False
self.shutdown_lock = threading.Lock()
# 设置信号处理器
signal.signal(signal.SIGINT, self.graceful_shutdown)
signal.signal(signal.SIGTERM, self.graceful_shutdown)
def graceful_shutdown(self, signum, frame):
"""优雅关闭的信号处理器"""
print(f"\n收到信号 {signum},开始优雅关闭...")
with self.shutdown_lock:
if self.is_shutting_down:
print("关闭流程已启动,忽略重复信号")
return
self.is_shutting_down = True
# 在单独的线程中执行关闭,避免阻塞信号处理器
shutdown_thread = threading.Thread(target=self._do_shutdown)
shutdown_thread.start()
def _do_shutdown(self):
"""实际执行关闭逻辑"""
print("停止接受新连接...")
if self.server:
self.server.shutdown()
print("等待现有请求处理完成...")
# 这里可以添加等待逻辑,比如等待所有请求处理完成
print("释放资源...")
# 关闭数据库连接、文件等
print("服务器关闭完成")
def run(self):
"""启动服务器"""
handler = SimpleHTTPRequestHandler
# 创建服务器
self.server = HTTPServer((self.host, self.port), handler)
print(f"服务器启动在 http://{self.host or 'localhost'}:{self.port}")
print(f"进程ID: {os.getpid()}")
print("按 Ctrl+C 优雅关闭服务器")
try:
self.server.serve_forever()
except KeyboardInterrupt:
# 这里也会被触发,但我们已经有了信号处理器
pass
finally:
if self.server:
self.server.server_close()
print("服务器已关闭")
if __name__ == "__main__":
import os
server = GracefulHTTPServer(port=8080)
server.run()
```
这个实现有几个关键点:
1. 使用锁防止重复关闭
2. 在单独的线程中执行实际关闭,避免阻塞信号处理器
3. 先停止接受新请求,再处理现有请求
4. 最后释放资源
### 5.2 超时控制:防止程序无限等待
在网络编程、文件操作、外部命令执行等场景中,超时控制是必不可少的。下面是一个更完整的超时控制装饰器:
```python
import signal
import functools
import time
class TimeoutException(Exception):
"""超时异常"""
pass
def timeout(seconds=10, error_message="操作超时"):
"""超时装饰器"""
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutException(error_message)
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 保存原来的信号处理器
old_handler = signal.signal(signal.SIGALRM, _handle_timeout)
# 设置定时器
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
return result
finally:
# 恢复原来的处理器
signal.signal(signal.SIGALRM, old_handler)
# 取消定时器
signal.alarm(0)
return wrapper
return decorator
# 使用示例
@timeout(seconds=3, error_message="数据库查询超时")
def query_database():
"""模拟数据库查询"""
print("开始查询数据库...")
time.sleep(5) # 模拟耗时查询
return "查询结果"
@timeout(seconds=2, error_message="HTTP请求超时")
def fetch_url(url):
"""模拟HTTP请求"""
print(f"开始请求 {url}...")
time.sleep(3) # 模拟网络延迟
return "响应内容"
# 测试
if __name__ == "__main__":
import os
# 只在Unix/Linux上测试,因为Windows没有SIGALRM
if os.name != 'nt':
try:
print("测试数据库查询(应该超时):")
result = query_database()
print(f"结果: {result}")
except TimeoutException as e:
print(f"捕获异常: {e}")
print("\n" + "="*50 + "\n")
try:
print("测试HTTP请求(应该超时):")
result = fetch_url("http://example.com")
print(f"结果: {result}")
except TimeoutException as e:
print(f"捕获异常: {e}")
else:
print("Windows平台不支持signal.alarm(),跳过测试")
```
这个装饰器可以方便地给任何函数添加超时控制。但要注意几个限制:
1. 只能用于主线程
2. 在 Windows 上不可用(因为缺少 SIGALRM)
3. 超时时间不能累加,每次调用都会重置
### 5.3 信号屏蔽与线程安全
在多线程程序中,你可能需要暂时屏蔽某些信号,防止它们在关键代码段中被触发。Python 提供了 `signal.pthread_sigmask()` 函数(注意:只在 Unix 系统可用)。
```python
import signal
import threading
import time
import os
def signal_handler(signum, frame):
print(f"[{time.time():.3f}] 线程 {threading.current_thread().name} 收到信号 {signum}")
def worker_with_mask():
"""在工作线程中屏蔽某些信号"""
thread_name = threading.current_thread().name
# 屏蔽 SIGINT 和 SIGTERM
mask = {signal.SIGINT, signal.SIGTERM}
old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, mask)
print(f"{thread_name}: 已屏蔽 SIGINT 和 SIGTERM")
try:
# 执行关键操作,不会被 SIGINT/SIGTERM 中断
for i in range(5):
print(f"{thread_name}: 关键操作 {i+1}/5")
time.sleep(1)
finally:
# 恢复原来的信号掩码
signal.pthread_sigmask(signal.SIG_SETMASK, old_mask)
print(f"{thread_name}: 已恢复信号掩码")
def worker_without_mask():
"""不屏蔽信号的普通工作线程"""
thread_name = threading.current_thread().name
for i in range(5):
print(f"{thread_name}: 普通操作 {i+1}/5")
time.sleep(1)
if __name__ == "__main__":
# 在主线程设置信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGUSR1, signal_handler)
print(f"主线程ID: {os.getpid()}")
print("启动两个工作线程,一个屏蔽信号,一个不屏蔽")
print("发送信号测试:")
print(" kill -USR1 <pid> # 两个线程都能收到")
print(" kill -INT <pid> # 只有不屏蔽的线程能收到")
# 启动线程
t1 = threading.Thread(target=worker_with_mask, name="屏蔽信号线程")
t2 = threading.Thread(target=worker_without_mask, name="普通线程")
t1.start()
t2.start()
t1.join()
t2.join()
print("所有线程完成")
```
`pthread_sigmask()` 支持三种操作:
- `signal.SIG_BLOCK`:将信号添加到阻塞集
- `signal.SIG_UNBLOCK`:从阻塞集中移除信号
- `signal.SIG_SETMASK`:直接设置阻塞集
这个功能在以下场景特别有用:
1. 在多线程程序中保护关键代码段
2. 防止信号处理重入(同一个信号在处理期间再次到达)
3. 实现原子的信号操作
### 5.4 实际项目中的信号处理架构
在大型项目中,信号处理通常需要更复杂的架构。下面是我在一个 Web 服务项目中实际使用的信号管理器:
```python
import signal
import logging
from typing import Dict, Callable, Any
import threading
from concurrent.futures import ThreadPoolExecutor
class SignalManager:
"""统一的信号管理器"""
def __init__(self):
self.handlers: Dict[int, Callable] = {}
self.original_handlers: Dict[int, Any] = {}
self.lock = threading.RLock()
self.executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="signal_handler")
self.logger = logging.getLogger(__name__)
# 默认处理器
self.default_handlers = {
signal.SIGINT: self._graceful_shutdown,
signal.SIGTERM: self._graceful_shutdown,
}
# 检查平台支持
self._check_platform_support()
def _check_platform_support(self):
"""检查平台支持的信号"""
self.supported_signals = set()
# 基础信号
base_signals = ['SIGINT', 'SIGTERM', 'SIGABRT']
for sig_name in base_signals:
if hasattr(signal, sig_name):
self.supported_signals.add(getattr(signal, sig_name))
# Unix特有信号
if hasattr(signal, 'SIGUSR1'):
self.supported_signals.add(signal.SIGUSR1)
self.default_handlers[signal.SIGUSR1] = self._reload_config
if hasattr(signal, 'SIGUSR2'):
self.supported_signals.add(signal.SIGUSR2)
self.default_handlers[signal.SIGUSR2] = self._dump_status
self.logger.info(f"平台支持的信号: {[signal.strsignal(s) for s in self.supported_signals if signal.strsignal(s)]}")
def register(self, signum: int, handler: Callable, use_default_executor=True):
"""注册信号处理器"""
with self.lock:
if signum not in self.supported_signals:
self.logger.warning(f"信号 {signum} 在当前平台可能不支持")
# 保存原来的处理器
if signum not in self.original_handlers:
self.original_handlers[signum] = signal.getsignal(signum)
# 包装处理器,确保在单独的线程中执行
def wrapped_handler(sig, frame):
try:
if use_default_executor:
# 提交到线程池执行,避免阻塞信号处理器
future = self.executor.submit(handler, sig, frame)
# 可以在这里添加超时控制
# result = future.result(timeout=10)
else:
# 直接执行(不推荐,除非处理器非常轻量)
handler(sig, frame)
except Exception as e:
self.logger.error(f"信号处理器执行失败: {e}", exc_info=True)
self.handlers[signum] = wrapped_handler
signal.signal(signum, wrapped_handler)
self.logger.debug(f"已注册信号 {signum} 的处理器")
def register_defaults(self):
"""注册默认处理器"""
for sig, handler in self.default_handlers.items():
if sig in self.supported_signals:
self.register(sig, handler)
def restore(self, signum: int = None):
"""恢复原来的信号处理器"""
with self.lock:
if signum is None:
# 恢复所有
for sig, original in self.original_handlers.items():
signal.signal(sig, original)
self.handlers.clear()
self.original_handlers.clear()
self.logger.info("已恢复所有信号的原始处理器")
else:
# 恢复单个
if signum in self.original_handlers:
signal.signal(signum, self.original_handlers[signum])
del self.handlers[signum]
del self.original_handlers[signum]
self.logger.debug(f"已恢复信号 {signum} 的原始处理器")
def _graceful_shutdown(self, sig, frame):
"""优雅关闭的默认处理器"""
self.logger.info(f"收到关闭信号 {sig},开始优雅关闭流程")
# 这里可以触发应用级别的关闭流程
# 比如通知所有组件开始清理
def _reload_config(self, sig, frame):
"""重新加载配置的默认处理器"""
self.logger.info(f"收到重载信号 {sig},重新加载配置")
# 触发配置重载
def _dump_status(self, sig, frame):
"""导出状态的默认处理器"""
self.logger.info(f"收到状态导出信号 {sig}")
# 导出应用状态信息
def __enter__(self):
"""上下文管理器入口"""
self.register_defaults()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
self.restore()
self.executor.shutdown(wait=True)
# 使用示例
if __name__ == "__main__":
import time
import os
# 配置日志
logging.basicConfig(level=logging.INFO)
# 使用上下文管理器,确保信号处理器被正确清理
with SignalManager() as sig_mgr:
print(f"程序运行中,PID: {os.getpid()}")
print("可用命令:")
print(" Ctrl+C 或 kill <pid> # 优雅关闭")
if hasattr(signal, 'SIGUSR1'):
print(f" kill -USR1 {os.getpid()} # 重载配置")
if hasattr(signal, 'SIGUSR2'):
print(f" kill -USR2 {os.getpid()} # 导出状态")
# 模拟长时间运行
try:
while True:
print(".", end="", flush=True)
time.sleep(1)
except KeyboardInterrupt:
print("\n通过KeyboardInterrupt退出")
```
这个信号管理器提供了几个重要特性:
1. **线程安全**:使用锁保护共享状态
2. **平台兼容**:自动检查平台支持的信号
3. **异步处理**:信号处理器在单独的线程中执行,避免阻塞
4. **资源管理**:使用上下文管理器确保正确清理
5. **错误处理**:捕获并记录处理器中的异常
6. **可扩展**:可以方便地添加新的信号处理器
在实际项目中,这样的信号管理器可以作为基础设施的一部分,为整个应用提供统一的信号处理能力。