# Python异步编程实践示例详解
Python异步编程通过asyncio库提供了强大的并发处理能力,特别适用于I/O密集型任务。以下是几个典型且实用的异步编程示例:
## 1. 基础异步任务执行
```python
import asyncio
import time
async def say_after(delay, message):
await asyncio.sleep(delay)
print(message)
return f"完成: {message}"
async def main():
print(f"开始时间: {time.strftime('%X')}")
# 顺序执行
result1 = await say_after(1, '你好')
result2 = await say_after(2, '世界')
print(f"结束时间: {time.strftime('%X')}")
print(f"结果: {result1}, {result2}")
# 运行示例
asyncio.run(main())
```
这个基础示例展示了`async/await`语法的基本用法,但此时两个任务是顺序执行的[ref_3]。
## 2. 并发任务执行优化
```python
import asyncio
import time
async def fetch_data(task_id, delay):
"""模拟数据获取任务"""
print(f'任务 {task_id} 开始执行, 需要 {delay} 秒')
await asyncio.sleep(delay)
print(f'任务 {task_id} 完成')
return {'task_id': task_id, 'data': f'样本数据 {task_id}'}
async def main_concurrent():
print(f"并发执行开始: {time.strftime('%X')}")
# 创建多个任务并发执行
tasks = [
fetch_data(1, 2),
fetch_data(2, 1),
fetch_data(3, 3)
]
# 使用asyncio.gather并发执行所有任务
results = await asyncio.gather(*tasks)
print(f"并发执行结束: {time.strftime('%X')}")
print(f"所有任务结果: {results}")
# 运行并发示例
asyncio.run(main_concurrent())
```
通过`asyncio.gather`实现真正的并发执行,三个任务总共只需要约3秒完成,而非顺序执行的6秒[ref_4]。
## 3. 异步网络请求爬虫
```python
import asyncio
import aiohttp
import time
async def fetch_url(session, url, semaphore):
"""使用信号量控制并发的URL抓取"""
async with semaphore:
try:
print(f'开始抓取: {url}')
async with session.get(url, timeout=10) as response:
data = await response.text()
print(f'完成抓取: {url}, 状态码: {response.status}')
return {'url': url, 'status': response.status, 'length': len(data)}
except Exception as e:
print(f'抓取失败: {url}, 错误: {e}')
return {'url': url, 'error': str(e)}
async def async_crawler():
"""异步网络爬虫示例"""
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/3',
'https://httpbin.org/delay/1'
]
# 使用信号量限制并发数为3
semaphore = asyncio.Semaphore(3)
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 运行爬虫示例
start_time = time.time()
results = asyncio.run(async_crawler())
end_time = time.time()
print(f"爬虫执行时间: {end_time - start_time:.2f}秒")
print(f"成功抓取数量: {len([r for r in results if 'error' not in r])}")
```
这个示例展示了异步爬虫在处理多个网络请求时的效率优势,通过信号量机制控制并发度避免过度请求[ref_2]。
## 4. 生产者-消费者异步模式
```python
import asyncio
import random
async def producer(queue, producer_id):
"""生产者协程"""
for i in range(5):
item = f'产品 {producer_id}-{i}'
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(item)
print(f'生产者 {producer_id} 生产了: {item}')
# 发送结束信号
await queue.put(None)
async def consumer(queue, consumer_id):
"""消费者协程"""
while True:
item = await queue.get()
if item is None:
# 将结束信号放回队列供其他消费者使用
await queue.put(None)
break
print(f'消费者 {consumer_id} 消费了: {item}')
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
print(f'消费者 {consumer_id} 结束工作')
async def producer_consumer_demo():
"""生产者-消费者模式演示"""
queue = asyncio.Queue(maxsize=10)
# 创建生产者和消费者任务
producers = [producer(queue, i) for i in range(2)]
consumers = [consumer(queue, i) for i in range(3)]
# 启动所有任务
await asyncio.gather(*producers)
# 等待所有产品被消费
await queue.join()
# 通知消费者结束
await queue.put(None)
# 运行生产者-消费者示例
asyncio.run(producer_consumer_demo())
```
这个模式展示了异步编程在处理复杂工作流时的优势,特别适用于数据处理管道场景[ref_5]。
## 5. 异步文件操作与性能监控
```python
import asyncio
import aiofiles
import time
from pathlib import Path
async def async_file_operation(filename, content):
"""异步文件操作示例"""
# 异步写入文件
async with aiofiles.open(filename, 'w') as f:
await f.write(content)
print(f'文件 {filename} 写入完成')
# 异步读取文件
async with aiofiles.open(filename, 'r') as f:
content_read = await f.read()
print(f'文件 {filename} 读取完成, 内容长度: {len(content_read)}')
return len(content_read)
async def benchmark_async_io():
"""异步IO性能基准测试"""
files_data = [
('test1.txt', '这是测试文件1的内容' * 1000),
('test2.txt', '这是测试文件2的内容' * 1500),
('test3.txt', '这是测试文件3的内容' * 2000),
]
# 同步执行基准
print("=== 同步执行基准 ===")
sync_start = time.time()
for filename, content in files_data:
# 模拟同步文件操作
with open(filename, 'w') as f:
f.write(content)
with open(filename, 'r') as f:
f.read()
Path(filename).unlink() # 清理文件
sync_duration = time.time() - sync_start
print(f"同步执行时间: {sync_duration:.4f}秒")
# 异步执行基准
print("=== 异步执行基准 ===")
async_start = time.time()
tasks = [async_file_operation(filename, content) for filename, content in files_data]
results = await asyncio.gather(*tasks)
async_duration = time.time() - async_start
print(f"异步执行时间: {async_duration:.4f}秒")
print(f"性能提升: {sync_duration/async_duration:.2f}倍")
# 清理文件
for filename, _ in files_data:
Path(filename).unlink()
return results
# 运行性能测试
asyncio.run(benchmark_async_io())
```
这个示例展示了异步文件操作在批量处理时的性能优势,同时演示了如何使用性能分析工具来优化异步代码[ref_1][ref_6]。
## 异步编程关键优势对比
| 特性 | 同步编程 | 异步编程 | 优势说明 |
|------|----------|----------|----------|
| **执行模式** | 阻塞式执行 | 非阻塞式执行 | 异步编程在等待I/O时释放线程资源[ref_2] |
| **并发能力** | 多线程/多进程 | 单线程事件循环 | 避免线程切换开销,资源利用率更高[ref_3] |
| **代码复杂度** | 相对简单 | 需要理解事件循环 | 学习曲线较陡但长期维护性更好[ref_5] |
| **适用场景** | CPU密集型 | I/O密集型 | 异步编程特别适合网络请求、文件操作等[ref_1] |
| **性能表现** | 上下文切换开销大 | 高效的协程调度 | 在高并发I/O场景下性能提升显著[ref_4] |
这些示例充分展示了Python异步编程在实际应用中的强大能力,从基础语法到高级模式,涵盖了网络请求、文件操作、任务协调等多个典型场景。通过合理运用异步编程技术,可以显著提升应用程序的吞吐量和响应性能。